Back to Ardupilot

Open .bin log

Tools/terrain-tools/terrain_rangefinder_postprocess.ipynb

3.5.66.5 KB
Original Source

Edit the below cell's input_log variable to the path to your BIN (dataflash) log. The BIN log parser currently assumes the first rangefinder is downward facing.

python

import math

from collections import namedtuple
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path

import pandas as pd

from pymavlink import mavutil
from pyproj import Geod
from scipy.spatial.transform import Rotation as R

input_log = Path("00000125.BIN")
stem = input_log.stem  # '00000125' from '00000125.BIN'
output_csv = Path(f"projected_{stem}.csv")

# Open .bin log
log = mavutil.mavlink_connection(str(input_log))

# Create a geodesic calculator
geod = Geod(ellps='WGS84')

# Store messages as lists of tuples
attitudes = []
ahrs_positions = []
rangefinder = []
terrain_data = []


Ahrs = namedtuple("Ahrs", ["timestamp", "lat", "lon", "alt", "roll", "pitch", "yaw"])
Rangefinder = namedtuple("Rangefinder", ["timestamp", "distance"])
Terrain = namedtuple("Terrain", ["timestamp", "terr_height"])

@dataclass
class GroundPoint:
    timestamp: float
    rng_lat: float
    rng_lon: float
    rng_elev: float
    ahrs_lat: float
    ahrs_lon: float
    ahrs_alt: float
    distance: float
    terr_height: float
    

while True:
    msg = log.recv_match(blocking=False)
    if msg is None:
        break

    t = datetime.fromtimestamp(msg._timestamp)
    mtype = msg.get_type()

    if msg.get_type() == "AHR2":
        # Lat and Lon in degrees, alt in m
        ahrs_positions.append(Ahrs(
            timestamp=t,
            lat=msg.Lat,
            lon=msg.Lng,
            alt=msg.Alt,
            roll=math.radians(msg.Roll),
            pitch=math.radians(msg.Pitch),
            yaw=math.radians(msg.Yaw),
        ))
    elif msg.get_type() == "RFND" and msg.Stat == 4: # Only use healthy readings, see AP_Rangefinder::Status::Good
        rangefinder.append(Rangefinder(
            timestamp=t,
            distance=msg.Dist
        ))
    elif msg.get_type() == "TERR":
        terrain_data.append(Terrain(
            timestamp=t,
            terr_height=msg.TerrH
        ))

ahrs_df = pd.DataFrame(ahrs_positions).set_index("timestamp").sort_index()
rfnd_df = pd.DataFrame(rangefinder).set_index("timestamp").sort_index()
terr_df = pd.DataFrame(terrain_data).set_index("timestamp").sort_index()
python
# Compute estimated ground points
ground_points: list[GroundPoint] = []

assert ahrs_positions, "No AHR2 messages found"
assert rangefinder, "No valid RFND messages found"
assert terrain_data, "No TERR messages found"

for t, row in rfnd_df.iterrows():
    dist = row.distance

    # Efficient nearest neighbor lookup with .asof() for datetime index
    try:
        ahrs_row = ahrs_df.loc[ahrs_df.index.asof(t)]
        terr_row = terr_df.loc[terr_df.index.asof(t)]
    except KeyError:
        continue  # skip if no data available at or before timestamp

    # Unpack required fields
    lat, lon, alt, roll, pitch, yaw  = ahrs_row.lat, ahrs_row.lon, ahrs_row.alt, ahrs_row.roll, ahrs_row.pitch, ahrs_row.yaw
    terr_height = terr_row.terr_height

    # Build the rotation using Euler angles (in radians)
    rotation = R.from_euler('xyz', [roll, pitch, yaw])

    # Rotate the down vector (in body frame)
    ned_vector = rotation.apply([0, 0, dist])

    # Calculate ground point in NED frame
    north_offset = ned_vector[0]
    east_offset = ned_vector[1]
    down_offset = ned_vector[2]

    ground_alt = alt - down_offset  # down_offset is positive, so subtract it to get estimated ground height

    # Project horizontal lat/lon offset
    azimuth = math.degrees(math.atan2(east_offset, north_offset))
    horizontal_distance = math.hypot(north_offset, east_offset)
    g_lon, g_lat, _ = geod.fwd(lon, lat, azimuth, horizontal_distance)

    ground_points.append(GroundPoint(
        timestamp=t,
        rng_lat=g_lat,
        rng_lon=g_lon,
        rng_elev=ground_alt,
        ahrs_lat=lat,
        ahrs_lon=lon,
        ahrs_alt=alt,
        distance=dist,
        terr_height=terr_height
    ))

if not ground_points:
    print("Error: No projected ground point data found.")
    exit(1)
python
# Convert to DataFrame

import pandas as pd

df = pd.DataFrame([gp.__dict__ for gp in ground_points])
python
# Get the latitude/longitude extent for input to OpenTopography

min_lat = df["rng_lat"].min()
max_lat = df["rng_lat"].max()
min_lon = df["rng_lon"].min()
max_lon = df["rng_lon"].max()

assert min_lat < max_lat
assert min_lon < max_lon

print(f"Latitude range: ({min_lat},{max_lat})")
print(f"Longitude range: ({min_lon},{max_lon})")

To compare to other elevation data sources, these must be manually downloaded from OpenTopography. Once extracted into this directory, rename them and update paths in the below block. Add as many datasets for comparison as you like. GDAL supports many different file formats.

python

# Load elevation from GeoTIFF using GDAL

from osgeo import gdal

dataset_paths = [Path("cop30.tif"), Path("AW3D30.tif")]
for dataset_path in dataset_paths:
    dataset = gdal.Open(str(dataset_path))
    assert dataset is not None, f"Dataset '{dataset_path}' failed to load"
    band = dataset.GetRasterBand(1)
    transform = dataset.GetGeoTransform()

    # Convert lat/lon to row/col
    def latlon_to_rowcol(lat, lon):
        inv_transform = gdal.InvGeoTransform(transform)
        px, py = gdal.ApplyGeoTransform(inv_transform, lon, lat)
        return int(py), int(px)

    elevations = []
    for _, row in df.iterrows():
        r, c = latlon_to_rowcol(row.rng_lat, row.rng_lon)
        grid = band.ReadAsArray(c, r, 1, 1)
        if grid is not None:
            elevation = grid[0][0]
        else:
            # Out of range of the dataset, common with LOG_REPLAY before the EKF initializes
            elevation = math.nan
        elevations.append(elevation)

    df[dataset_path.stem + "_elevation"] = elevations
python
# plot using plotly, good for smaller logs (<30 minutes)

import plotly.graph_objs as go

fig = go.Figure()
for col in df.columns:
    stems = [f"{p.stem}_elevation" for p in dataset_paths]
    cols = ("ahrs_alt", "rng_elev", "terr_height", *stems)
    if col in cols:
        print(f"Adding {col}")
        y = pd.to_numeric(df[col], errors='coerce')
        
        fig.add_trace(go.Scattergl(x=df["timestamp"], y=y, mode='lines+markers', name=col))

fig.update_layout(title=f"Terrain Comparison Over Time for {stem}", xaxis_title="Time", yaxis_title="Altitude (m)", height=600)
fig.show()
python
# Save plot

import plotly.io as pio

pio.write_html(fig, f'terrain_pyplot_{stem}.html')