Back to Openpilot

Import all cars from opendbc

tools/car_porting/examples/find_segments_with_message.ipynb

0.11.02.3 KB
Original Source
python
# Import all cars from opendbc

from opendbc.car import structs
from opendbc.car.values import PLATFORMS as TEST_PLATFORMS

# Example: add additional platforms/segments to test outside of commaCarSegments

EXTRA_SEGMENTS = {
  # "81dd9e9fe256c397/0000001f--97c42cf98d",  # Volkswagen ID.4 test route, new car port, not in public dataset
}
python
import random

from openpilot.tools.lib.logreader import LogReader
from openpilot.tools.lib.comma_car_segments import get_comma_car_segments_database


MAX_SEGS_PER_PLATFORM = 3  # Increase this to search more segments

database = get_comma_car_segments_database()
TEST_SEGMENTS = []

print(f"Searching {len(TEST_PLATFORMS)} platforms")

for platform in TEST_PLATFORMS:
  if platform not in database:
    print(f"No segments available for {platform}")
    continue

  all_segments = database[platform]
  NUM_SEGMENTS = min(len(all_segments), MAX_SEGS_PER_PLATFORM)
  TEST_SEGMENTS.extend(random.sample(all_segments, NUM_SEGMENTS))

TEST_SEGMENTS.extend(EXTRA_SEGMENTS)

print(f"Searching {len(TEST_SEGMENTS)} segments")
python
from openpilot.tools.lib.logreader import LogReader, comma_car_segments_source
from tqdm.notebook import tqdm, tnrange

# Example search for CAN ignition messages
# Be careful when filtering by bus, account for odd harness arrangements on Honda/HKG

BUSES_TO_SEARCH = [0, 1, 2]

# Support for external Red Panda
EXTERNAL_PANDA_BUSES = [bus + 4 for bus in BUSES_TO_SEARCH]

MESSAGES_TO_FIND = {
  0x1F1: "GM CAN Ign",
  0x152: "Rivian CAN Ign",
  0x221: "Tesla 3/Y CAN Ign",
  0x9E:  "Mazda CAN Ign",
  0x3C0: "VW CAN Ign",
}

progress_bar = tnrange(len(TEST_SEGMENTS), desc="segments searched")

for segment in TEST_SEGMENTS:
  lr = LogReader(segment, sources=[comma_car_segments_source])
  CP = lr.first("carParams")
  if CP is None:
    progress_bar.update()
    continue

  can_packets = [msg for msg in lr if msg.which() == "can"]
  matched_messages = set()

  for packet in can_packets:
    for msg in packet.can:
      if msg.address in MESSAGES_TO_FIND and msg.src in (BUSES_TO_SEARCH + EXTERNAL_PANDA_BUSES):
        # print(msg)
        matched_messages.add(msg.address)

  if len(matched_messages) > 0:
    message_names = [MESSAGES_TO_FIND[message] for message in matched_messages]
    print(f"Match found: {segment:<45} {CP.carFingerprint:<38} {message_names}")

  progress_bar.update()