tools/car_porting/examples/find_segments_with_message.ipynb
# Import all cars from opendbc
from opendbc.car import structs
from opendbc.car.values import PLATFORMS as TEST_PLATFORMS
# Example: add additional platforms/segments to test outside of commaCarSegments
EXTRA_SEGMENTS = {
# "81dd9e9fe256c397/0000001f--97c42cf98d", # Volkswagen ID.4 test route, new car port, not in public dataset
}
import random
from openpilot.tools.lib.logreader import LogReader
from openpilot.tools.lib.comma_car_segments import get_comma_car_segments_database
MAX_SEGS_PER_PLATFORM = 3 # Increase this to search more segments
database = get_comma_car_segments_database()
TEST_SEGMENTS = []
print(f"Searching {len(TEST_PLATFORMS)} platforms")
for platform in TEST_PLATFORMS:
if platform not in database:
print(f"No segments available for {platform}")
continue
all_segments = database[platform]
NUM_SEGMENTS = min(len(all_segments), MAX_SEGS_PER_PLATFORM)
TEST_SEGMENTS.extend(random.sample(all_segments, NUM_SEGMENTS))
TEST_SEGMENTS.extend(EXTRA_SEGMENTS)
print(f"Searching {len(TEST_SEGMENTS)} segments")
from openpilot.tools.lib.logreader import LogReader, comma_car_segments_source
from tqdm.notebook import tqdm, tnrange
# Example search for CAN ignition messages
# Be careful when filtering by bus, account for odd harness arrangements on Honda/HKG
BUSES_TO_SEARCH = [0, 1, 2]
# Support for external Red Panda
EXTERNAL_PANDA_BUSES = [bus + 4 for bus in BUSES_TO_SEARCH]
MESSAGES_TO_FIND = {
0x1F1: "GM CAN Ign",
0x152: "Rivian CAN Ign",
0x221: "Tesla 3/Y CAN Ign",
0x9E: "Mazda CAN Ign",
0x3C0: "VW CAN Ign",
}
progress_bar = tnrange(len(TEST_SEGMENTS), desc="segments searched")
for segment in TEST_SEGMENTS:
lr = LogReader(segment, sources=[comma_car_segments_source])
CP = lr.first("carParams")
if CP is None:
progress_bar.update()
continue
can_packets = [msg for msg in lr if msg.which() == "can"]
matched_messages = set()
for packet in can_packets:
for msg in packet.can:
if msg.address in MESSAGES_TO_FIND and msg.src in (BUSES_TO_SEARCH + EXTERNAL_PANDA_BUSES):
# print(msg)
matched_messages.add(msg.address)
if len(matched_messages) > 0:
message_names = [MESSAGES_TO_FIND[message] for message in matched_messages]
print(f"Match found: {segment:<45} {CP.carFingerprint:<38} {message_names}")
progress_bar.update()