tools/car_porting/examples/hkg_canfd_gear_message.ipynb
from opendbc.car import structs
from opendbc.car.hyundai.values import CAR, HyundaiFlags
from opendbc.car.hyundai.fingerprints import FW_VERSIONS
TEST_PLATFORMS = set(CAR.with_flags(HyundaiFlags.CANFD)) & set(CAR.with_flags(HyundaiFlags.EV)) # CAN-FD electric vehicles only
#TEST_PLATFORMS = set(CAR.with_flags(HyundaiFlags.CANFD)) - set(CAR.with_flags(HyundaiFlags.EV)) # CAN-FD hybrid and ICE vehicles only
print(f"Found {len(TEST_PLATFORMS)} qualifying vehicles:")
for platform in TEST_PLATFORMS:
print(f" {platform}")
import random
from openpilot.tools.lib.logreader import LogReader
from openpilot.tools.lib.comma_car_segments import get_comma_car_segments_database
from opendbc.car.hyundai.values import CAR
database = get_comma_car_segments_database()
TEST_SEGMENTS = []
MAX_SEGS_PER_PLATFORM = 5 # TODO: Increase this to search more segments
print("Collecting segments from commaCarSegments dataset:")
for platform in TEST_PLATFORMS:
assert(platform in database)
#if platform not in database:
# print(f"Skipping platform: {platform}, no data available")
# continue
all_segments = database[platform]
NUM_SEGMENTS = min(len(all_segments), MAX_SEGS_PER_PLATFORM)
print(f"Got {len(all_segments)} segments for platform {platform}, sampling {NUM_SEGMENTS} segments")
TEST_SEGMENTS.extend(random.sample(all_segments, NUM_SEGMENTS))
print(f"Collected {len(TEST_SEGMENTS)} segments for analysis")
import copy
import matplotlib.pyplot as plt
import numpy as np
from opendbc.can.parser import CANParser
from opendbc.car.hyundai.values import DBC
from opendbc.car.hyundai.hyundaicanfd import CanBus
from openpilot.selfdrive.pandad import can_capnp_to_list
from openpilot.tools.lib.logreader import LogReader, comma_car_segments_source
message_names = ["GEAR_SHIFTER", "ACCELERATOR", "GEAR", "GEAR_ALT", "GEAR_ALT_2"]
for segment in TEST_SEGMENTS:
lr = LogReader(segment, sources=[comma_car_segments_source])
CP = lr.first("carParams")
if CP is None:
continue
can_msgs = [msg for msg in lr if msg.which() == "can"]
parser_messages = []
for name in message_names:
parser_messages.append((name, 0))
cp = CANParser(DBC[platform]["pt"], parser_messages, CanBus(CP).ECAN)
parsed_message_history = []
examples = []
for msg in can_msgs:
cp.update_strings(can_capnp_to_list([msg.as_builder().to_bytes()]))
parsed_message_history.append(copy.copy(cp.vl))
print(f"Analyzing segment {segment:<44} for {CP.carFingerprint}")
for name in message_names:
if parsed_message_history[0][name]["CHECKSUM"] != 0: # Message is present for this segment
gear_prev = parsed_message_history[0][name]["GEAR"]
print(f" {name:<15} gear={gear_prev}")
for i, parsed_messages in enumerate(parsed_message_history):
gear = parsed_messages[name]["GEAR"]
if gear != gear_prev:
print(f" *** Signal transition found! ***")
examples.append(i)
gear_prev = gear
print(f"Analysis finished")