Back to Openpilot

Hkg Canfd Gear Message

tools/car_porting/examples/hkg_canfd_gear_message.ipynb

0.11.03.0 KB
Original Source
python
from opendbc.car import structs
from opendbc.car.hyundai.values import CAR, HyundaiFlags
from opendbc.car.hyundai.fingerprints import FW_VERSIONS

TEST_PLATFORMS = set(CAR.with_flags(HyundaiFlags.CANFD)) & set(CAR.with_flags(HyundaiFlags.EV))  # CAN-FD electric vehicles only
#TEST_PLATFORMS = set(CAR.with_flags(HyundaiFlags.CANFD)) - set(CAR.with_flags(HyundaiFlags.EV))  # CAN-FD hybrid and ICE vehicles only

print(f"Found {len(TEST_PLATFORMS)} qualifying vehicles:")
for platform in TEST_PLATFORMS:
  print(f"  {platform}")
python
import random

from openpilot.tools.lib.logreader import LogReader
from openpilot.tools.lib.comma_car_segments import get_comma_car_segments_database
from opendbc.car.hyundai.values import CAR

database = get_comma_car_segments_database()
TEST_SEGMENTS = []

MAX_SEGS_PER_PLATFORM = 5  # TODO: Increase this to search more segments

print("Collecting segments from commaCarSegments dataset:")
for platform in TEST_PLATFORMS:
  assert(platform in database)
  #if platform not in database:
  #  print(f"Skipping platform: {platform}, no data available")
  #  continue

  all_segments = database[platform]

  NUM_SEGMENTS = min(len(all_segments), MAX_SEGS_PER_PLATFORM)

  print(f"Got {len(all_segments)} segments for platform {platform}, sampling {NUM_SEGMENTS} segments")

  TEST_SEGMENTS.extend(random.sample(all_segments, NUM_SEGMENTS))

print(f"Collected {len(TEST_SEGMENTS)} segments for analysis")

python
import copy
import matplotlib.pyplot as plt
import numpy as np

from opendbc.can.parser import CANParser
from opendbc.car.hyundai.values import DBC
from opendbc.car.hyundai.hyundaicanfd import CanBus

from openpilot.selfdrive.pandad import can_capnp_to_list
from openpilot.tools.lib.logreader import LogReader, comma_car_segments_source

message_names = ["GEAR_SHIFTER", "ACCELERATOR", "GEAR", "GEAR_ALT", "GEAR_ALT_2"]

for segment in TEST_SEGMENTS:
  lr = LogReader(segment, sources=[comma_car_segments_source])
  CP = lr.first("carParams")
  if CP is None:
    continue

  can_msgs = [msg for msg in lr if msg.which() == "can"]
  parser_messages = []
  for name in message_names:
    parser_messages.append((name, 0))
  cp = CANParser(DBC[platform]["pt"], parser_messages, CanBus(CP).ECAN)

  parsed_message_history = []
  examples = []

  for msg in can_msgs:
    cp.update_strings(can_capnp_to_list([msg.as_builder().to_bytes()]))
    parsed_message_history.append(copy.copy(cp.vl))

  print(f"Analyzing segment {segment:<44} for {CP.carFingerprint}")
  for name in message_names:
    if parsed_message_history[0][name]["CHECKSUM"] != 0:  # Message is present for this segment
      gear_prev = parsed_message_history[0][name]["GEAR"]
      print(f"  {name:<15} gear={gear_prev}")
      for i, parsed_messages in enumerate(parsed_message_history):
        gear = parsed_messages[name]["GEAR"]
        if gear != gear_prev:
          print(f"  *** Signal transition found! ***")
          examples.append(i)
        gear_prev = gear

print(f"Analysis finished")