docs/应用实践/开发调试教程/Cyber RT实践/应用Cyber RT开发自动驾驶调试工具/Apollo工具开发之交通灯仿真控制工具制作.md
本实验旨在使用 Apollo Cyber RT 开发交通灯仿真控制工具,通过该工具控制红绿灯状态。
https://apollo.baidu.com/community/course/39
通过本实验,学习如何使用 Apollo Cyber RT 开发交通灯仿真控制工具,并利用该工具控制红绿灯状态。
在终端中,执行 DreamView+ 启动指令,执行成功后,点击菜单栏 dreamview+ 按钮,进入 dreamview+ 界面。
aem bootstrap start --plus
当出现如下,即表示 dreamview+ 启动成功了。
左侧导航栏打开 Mode Settings 面板,模式选择 PnC Mode,操作选择 Sim_Control,进入仿真模式。
选择地图。
从左侧 Modules 模块种启动 Planning 模块。
在/apollo_workspace目录创建一个名为cyber_trafficlight.py的文件。
touch cyber_trafficlight.py
导入必要的库和模块。
首先,导入所需的 Python 库和 Apollo Cyber RT 模块。这些库和模块将帮助我们初始化 Cyber 框架并使用 Cyber RT 工具。
import argparse
from cyber.python.cyber_py3 import cyber
from cyber.python.cyber_py3 import cyber_time
import modules.common_msgs.perception_msgs.traffic_light_detection_pb2 as traffic_light_detection_pb2
import threading
import time
定义相关功能函数。
在这一步中,我们将定义几个功能函数,用于实现脚本的不同功能。
1)添加交通灯信息的函数。
def add_traffic_light(color, traffic_light_pb):
light = traffic_light_pb.traffic_light.add()
light.color = color
light.id = "红绿灯ID" # 需要自行设置,这是交通灯的唯一标识
light.tracking_time = 10.0 # 设置跟踪时间,单位秒
2)添加消息头信息的函数。
seq_num = 0
def add_header(msg):
global seq_num
msg.header.sequence_num = seq_num
msg.header.timestamp_sec = cyber_time.Time.now().to_sec()
msg.header.module_name = "manual_traffic_light"
seq_num += 1 # 递增序列号
3)发布交通灯消息的函数。
def publish_traffic_light(writer, traffic_light_msg):
while not cyber.is_shutdown():
add_header(traffic_light_msg) # 添加消息头信息
writer.write(traffic_light_msg) # 发布交通灯消息
time.sleep(0.1) # 控制消息发布速率
初始化 Cyber 框架和相关对象。
在主函数中,我们需要初始化 Cyber 框架和相关对象,以便使用 Cyber RT 工具。
if __name__ == '__main__':
# 初始化交通灯检测消息对象
traffic_light_msg = traffic_light_detection_pb2.TrafficLightDetection()
# 初始化Cyber框架
cyber.init()
node = cyber.Node("traffic_light_command")
# 创建消息写入器
writer = node.create_writer(
"/apollo/perception/traffic_light", traffic_light_detection_pb2.TrafficLightDetection)
# 创建发布交通灯消息的线程
thread = threading.Thread(target=publish_traffic_light, args=(writer, traffic_light_msg))
thread.start()
进入用户输入循环。
在一个无限循环中,等待用户的输入。用户可以输入不同的命令,执行相应的操作。
while not cyber.is_shutdown():
user_input = input(
"1: 设置交通灯为红色 2: 设置交通灯为绿色 3: 退出\n")
traffic_light_msg.ClearField('traffic_light') # 清空交通灯信息
if user_input == '1':
# 用户输入1,设置交通灯为红色
add_traffic_light(
traffic_light_detection_pb2.TrafficLight.RED, traffic_light_msg)
elif user_input == "2":
# 用户输入2,设置交通灯为绿色
add_traffic_light(
traffic_light_detection_pb2.TrafficLight.GREEN, traffic_light_msg)
elif user_input == "3":
# 用户输入3,退出循环
break
# 关闭Cyber框架
cyber.shutdown()
总体:
from cyber.python.cyber_py3 import cyber
from cyber.python.cyber_py3 import cyber_time
import modules.common_msgs.perception_msgs.traffic_light_detection_pb2 as traffic_light_detection_pb2
import threading
import time
def add_forward_light(color, traffic_light_pb):
light = traffic_light_pb.traffic_light.add()
light.color = color
light.id = "红绿灯ID" #需要自行设置
light.tracking_time = 10.0
seq_num = 0
def add_header(msg):
global seq_num
msg.header.sequence_num = seq_num
msg.header.timestamp_sec = cyber_time.Time.now().to_sec()
msg.header.module_name = "manual_traffic_light"
seq_num = seq_num + 1
def pub_func(writer):
while not cyber.is_shutdown():
global traffic_light_msg
add_header(traffic_light_msg)
writer.write(traffic_light_msg)
time.sleep(0.1)
traffic_light_msg = None
if __name__ == '__main__':
traffic_light_msg = traffic_light_detection_pb2.TrafficLightDetection()
cyber.init()
node = cyber.Node("traffic_light_command")
writer = node.create_writer(
"/apollo/perception/traffic_light", traffic_light_detection_pb2.TrafficLightDetection)
thread = threading.Thread(target=pub_func, args=(writer,))
thread.start()
while not cyber.is_shutdown():
m = input(
"1: forward red 2: forward green\n")
traffic_light_msg.ClearField('traffic_light')
print(m)
if m == '1':
add_forward_light(
traffic_light_detection_pb2.TrafficLight.RED, traffic_light_msg)
elif m == "2":
add_forward_light(
traffic_light_detection_pb2.TrafficLight.GREEN, traffic_light_msg)
cyber.shutdown()
打开 Dreamview+ routing editing。
在地图上选择起点、终点并保存。
点击播放按钮,车辆启动:
查看红绿灯 ID。
回到终端,打开 cyber_monitor
cyber_monitor
找到 planning 的 channel。
通过键盘的 PgUp 键进行翻页找到车辆停止原因。
注意:TL_后面的数字就是红绿灯的ID。
修改红绿灯 ID。
在工具中添加交通灯 ID:
def add_traffic_light(color, traffic_light_pb):
light = traffic_light_pb.traffic_light.add()
light.color = color
#light.id = "红绿灯ID" #修改前
light.id = "451089196 #修改后
light.tracking_time = 10.0
运行交通灯仿真控制工具。
python cyber_trafficlight.py
注意:数字 1 是控制交通灯为红灯;数字 2 是控制交通灯为绿灯。
数字 1:
数字 2: