Tools/terrain-tools/terrain_rangefinder_postprocess.ipynb
Edit the below cell's input_log variable to the path to your BIN (dataflash) log.
The BIN log parser currently assumes the first rangefinder is downward facing.
import math
from collections import namedtuple
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
import pandas as pd
from pymavlink import mavutil
from pyproj import Geod
from scipy.spatial.transform import Rotation as R
input_log = Path("00000125.BIN")
stem = input_log.stem # '00000125' from '00000125.BIN'
output_csv = Path(f"projected_{stem}.csv")
# Open .bin log
log = mavutil.mavlink_connection(str(input_log))
# Create a geodesic calculator
geod = Geod(ellps='WGS84')
# Store messages as lists of tuples
attitudes = []
ahrs_positions = []
rangefinder = []
terrain_data = []
Ahrs = namedtuple("Ahrs", ["timestamp", "lat", "lon", "alt", "roll", "pitch", "yaw"])
Rangefinder = namedtuple("Rangefinder", ["timestamp", "distance"])
Terrain = namedtuple("Terrain", ["timestamp", "terr_height"])
@dataclass
class GroundPoint:
timestamp: float
rng_lat: float
rng_lon: float
rng_elev: float
ahrs_lat: float
ahrs_lon: float
ahrs_alt: float
distance: float
terr_height: float
while True:
msg = log.recv_match(blocking=False)
if msg is None:
break
t = datetime.fromtimestamp(msg._timestamp)
mtype = msg.get_type()
if msg.get_type() == "AHR2":
# Lat and Lon in degrees, alt in m
ahrs_positions.append(Ahrs(
timestamp=t,
lat=msg.Lat,
lon=msg.Lng,
alt=msg.Alt,
roll=math.radians(msg.Roll),
pitch=math.radians(msg.Pitch),
yaw=math.radians(msg.Yaw),
))
elif msg.get_type() == "RFND" and msg.Stat == 4: # Only use healthy readings, see AP_Rangefinder::Status::Good
rangefinder.append(Rangefinder(
timestamp=t,
distance=msg.Dist
))
elif msg.get_type() == "TERR":
terrain_data.append(Terrain(
timestamp=t,
terr_height=msg.TerrH
))
ahrs_df = pd.DataFrame(ahrs_positions).set_index("timestamp").sort_index()
rfnd_df = pd.DataFrame(rangefinder).set_index("timestamp").sort_index()
terr_df = pd.DataFrame(terrain_data).set_index("timestamp").sort_index()
# Compute estimated ground points
ground_points: list[GroundPoint] = []
assert ahrs_positions, "No AHR2 messages found"
assert rangefinder, "No valid RFND messages found"
assert terrain_data, "No TERR messages found"
for t, row in rfnd_df.iterrows():
dist = row.distance
# Efficient nearest neighbor lookup with .asof() for datetime index
try:
ahrs_row = ahrs_df.loc[ahrs_df.index.asof(t)]
terr_row = terr_df.loc[terr_df.index.asof(t)]
except KeyError:
continue # skip if no data available at or before timestamp
# Unpack required fields
lat, lon, alt, roll, pitch, yaw = ahrs_row.lat, ahrs_row.lon, ahrs_row.alt, ahrs_row.roll, ahrs_row.pitch, ahrs_row.yaw
terr_height = terr_row.terr_height
# Build the rotation using Euler angles (in radians)
rotation = R.from_euler('xyz', [roll, pitch, yaw])
# Rotate the down vector (in body frame)
ned_vector = rotation.apply([0, 0, dist])
# Calculate ground point in NED frame
north_offset = ned_vector[0]
east_offset = ned_vector[1]
down_offset = ned_vector[2]
ground_alt = alt - down_offset # down_offset is positive, so subtract it to get estimated ground height
# Project horizontal lat/lon offset
azimuth = math.degrees(math.atan2(east_offset, north_offset))
horizontal_distance = math.hypot(north_offset, east_offset)
g_lon, g_lat, _ = geod.fwd(lon, lat, azimuth, horizontal_distance)
ground_points.append(GroundPoint(
timestamp=t,
rng_lat=g_lat,
rng_lon=g_lon,
rng_elev=ground_alt,
ahrs_lat=lat,
ahrs_lon=lon,
ahrs_alt=alt,
distance=dist,
terr_height=terr_height
))
if not ground_points:
print("Error: No projected ground point data found.")
exit(1)
# Convert to DataFrame
import pandas as pd
df = pd.DataFrame([gp.__dict__ for gp in ground_points])
# Get the latitude/longitude extent for input to OpenTopography
min_lat = df["rng_lat"].min()
max_lat = df["rng_lat"].max()
min_lon = df["rng_lon"].min()
max_lon = df["rng_lon"].max()
assert min_lat < max_lat
assert min_lon < max_lon
print(f"Latitude range: ({min_lat},{max_lat})")
print(f"Longitude range: ({min_lon},{max_lon})")
To compare to other elevation data sources, these must be manually downloaded from OpenTopography. Once extracted into this directory, rename them and update paths in the below block. Add as many datasets for comparison as you like. GDAL supports many different file formats.
# Load elevation from GeoTIFF using GDAL
from osgeo import gdal
dataset_paths = [Path("cop30.tif"), Path("AW3D30.tif")]
for dataset_path in dataset_paths:
dataset = gdal.Open(str(dataset_path))
assert dataset is not None, f"Dataset '{dataset_path}' failed to load"
band = dataset.GetRasterBand(1)
transform = dataset.GetGeoTransform()
# Convert lat/lon to row/col
def latlon_to_rowcol(lat, lon):
inv_transform = gdal.InvGeoTransform(transform)
px, py = gdal.ApplyGeoTransform(inv_transform, lon, lat)
return int(py), int(px)
elevations = []
for _, row in df.iterrows():
r, c = latlon_to_rowcol(row.rng_lat, row.rng_lon)
grid = band.ReadAsArray(c, r, 1, 1)
if grid is not None:
elevation = grid[0][0]
else:
# Out of range of the dataset, common with LOG_REPLAY before the EKF initializes
elevation = math.nan
elevations.append(elevation)
df[dataset_path.stem + "_elevation"] = elevations
# plot using plotly, good for smaller logs (<30 minutes)
import plotly.graph_objs as go
fig = go.Figure()
for col in df.columns:
stems = [f"{p.stem}_elevation" for p in dataset_paths]
cols = ("ahrs_alt", "rng_elev", "terr_height", *stems)
if col in cols:
print(f"Adding {col}")
y = pd.to_numeric(df[col], errors='coerce')
fig.add_trace(go.Scattergl(x=df["timestamp"], y=y, mode='lines+markers', name=col))
fig.update_layout(title=f"Terrain Comparison Over Time for {stem}", xaxis_title="Time", yaxis_title="Altitude (m)", height=600)
fig.show()
# Save plot
import plotly.io as pio
pio.write_html(fig, f'terrain_pyplot_{stem}.html')