DAG概述
什么是DAG?
DAG = Directed Acyclic Graph(有向无环图)
在Apollo Cyber RT中,DAG文件是模块配置文件,定义了:
- 要加载哪些动态库(.so文件)
- 要创建哪些Component实例
- 每个Component订阅哪些消息通道
- Component的配置参数
DAG的作用
DAG配置文件
↓
描述模块的组成结构
↓
Mainboard根据DAG加载模块
↓
动态创建Component实例
↓
自动建立消息订阅关系
为什么叫DAG?
虽然配置文件不直接表示图结构,但Apollo的模块之间通过消息传递形成的依赖关系构成了一个有向无环图:
Perception → Prediction → Planning → Control
↓ ↓ ↓ ↓
感知结果 预测轨迹 规划路径 控制指令
DAG确保了模块之间不会形成循环依赖,保证系统的稳定性。
DAG文件的位置
Apollo中所有模块的DAG文件都遵循统一的目录结构:
/apollo/modules/<模块名>/dag/<模块名>.dag
示例:
/apollo/modules/planning/dag/planning.dag
/apollo/modules/control/dag/control.dag
/apollo/modules/perception/production/dag/dag_streaming_perception.dag
DAG的Protobuf结构
DAG文件使用Protocol Buffers格式定义,具有严格的类型检查。
核心Proto定义
1. DagConfig (顶层结构)
文件位置: <code>/apollo/cyber/proto/dag_conf.proto</code>
message DagConfig {
repeated ModuleConfig module_config = 1;
}
一个DAG文件包含一个或多个ModuleConfig。
2. ModuleConfig (模块配置)
message ModuleConfig {
optional string module_library = 1; // 动态库路径
repeated ComponentInfo components = 2; // 普通组件列表
repeated TimerComponentInfo timer_components = 3; // 定时组件列表
}
字段说明:
- <code>module_library</code>: .so动态库的绝对路径或相对路径
- <code>components</code>: 消息驱动的组件(收到消息时触发)
- <code>timer_components</code>: 定时触发的组件(按固定频率触发)
3. ComponentInfo (组件信息)
message ComponentInfo {
optional string class_name = 1; // 组件类名
optional ComponentConfig config = 2; // 组件配置
}
4. ComponentConfig (组件配置)
文件位置: <code>/apollo/cyber/proto/component_conf.proto</code>
message ComponentConfig {
optional string name = 1; // 组件名称(唯一标识)
optional string config_file_path = 2; // Protobuf配置文件路径
optional string flag_file_path = 3; // GFlags配置文件路径
repeated ReaderOption readers = 4; // 订阅的消息通道列表
}
5. ReaderOption (消息订阅配置)
message ReaderOption {
optional string channel = 1; // 消息通道名
optional QosProfile qos_profile = 2; // 服务质量配置
optional uint32 pending_queue_size = 3 // 未处理消息队列大小
[default = 1];
}
QosProfile (服务质量):
message QosProfile {
optional uint32 depth = 1 [default = 1]; // 消息缓存深度
}
6. TimerComponentInfo (定时组件信息)
message TimerComponentInfo {
optional string class_name = 1;
optional TimerComponentConfig config = 2;
}
7. TimerComponentConfig (定时组件配置)
message TimerComponentConfig {
optional string name = 1; // 组件名称
optional string config_file_path = 2; // 配置文件路径
optional string flag_file_path = 3; // Flag文件路径
optional uint32 interval = 4; // 触发间隔(毫秒)
}
完整结构层次
DagConfig
└─ ModuleConfig (可多个)
├─ module_library: string
├─ ComponentInfo (可多个)
│ ├─ class_name: string
│ └─ ComponentConfig
│ ├─ name: string
│ ├─ config_file_path: string
│ ├─ flag_file_path: string
│ └─ ReaderOption (可多个)
│ ├─ channel: string
│ ├─ qos_profile
│ │ └─ depth: uint32
│ └─ pending_queue_size: uint32
└─ TimerComponentInfo (可多个)
├─ class_name: string
└─ TimerComponentConfig
├─ name: string
├─ config_file_path: string
├─ flag_file_path: string
└─ interval: uint32
Component类型详解
Apollo Cyber RT提供了两种类型的Component:
1. Component (消息驱动组件)
特点:
- 订阅一个或多个消息通道
- 当消息到达时触发处理
- 支持1-4个输入通道
类模板:
template <typename M0 = NullType,
typename M1 = NullType,
typename M2 = NullType,
typename M3 = NullType>
class Component : public ComponentBase;
使用场景:
- Planning: 订阅Prediction、Chassis、Localization
- Control: 订阅Planning的轨迹输出
- Prediction: 订阅Perception的障碍物信息
示例:
class PlanningComponent : public Component<
prediction::PredictionObstacles, // M0
canbus::Chassis, // M1
localization::LocalizationEstimate // M2
> {
public:
bool Init() override;
// 三个消息到齐后触发
bool Proc(const std::shared_ptr<prediction::PredictionObstacles>& pred,
const std::shared_ptr<canbus::Chassis>& chassis,
const std::shared_ptr<localization::LocalizationEstimate>& loc) override;
};
触发机制:
- 单输入: 收到消息立即触发
- 多输入: 等待所有输入通道至少有一条消息后触发
2. TimerComponent (定时触发组件)
特点:
- 不订阅消息通道
- 按固定时间间隔触发
- 主动从其他模块拉取数据
基类:
class TimerComponent : public ComponentBase {
public:
bool Initialize(const TimerComponentConfig& config) override;
virtual bool Proc() = 0; // 定时调用
private:
uint64_t interval_ = 0; // 触发间隔(毫秒)
std::unique_ptr<Timer> timer_;
};
使用场景:
- Control: 以固定频率(10ms)输出控制指令
- Monitor: 定期检查系统状态
- 周期性任务(日志记录、健康检查等)
示例:
class ControlComponent : public TimerComponent {
public:
bool Init() override;
// 每10ms调用一次
bool Proc() override;
};
Component vs TimerComponent对比
特性 | Component | TimerComponent |
---|---|---|
触发方式 | 消息驱动 | 定时触发 |
输入 | 1-4个通道 | 无(主动拉取) |
配置字段 | readers | interval |
适用场景 | 响应式处理 | 周期性任务 |
延迟 | 取决于消息到达 | 固定周期 |
CPU占用 | 有消息时执行 | 持续执行 |
DAG配置文件详解
示例1: Planning模块(消息驱动)
文件: <code>/apollo/modules/planning/dag/planning.dag</code>
# Define all components in DAG streaming.
module_config {
# 1. 指定动态库路径
module_library : "/apollo/bazel-bin/modules/planning/libplanning_component.so"
# 2. 定义组件
components {
# 2.1 组件类名(必须在库中注册)
class_name : "PlanningComponent"
# 2.2 组件配置
config {
# 组件名称(唯一标识)
name: "planning"
# Protobuf配置文件
config_file_path: "/apollo/modules/planning/conf/planning_config.pb.txt"
# GFlags配置文件
flag_file_path: "/apollo/modules/planning/conf/planning.conf"
# 订阅的消息通道
readers: [
{
channel: "/apollo/prediction"
},
{
channel: "/apollo/canbus/chassis"
qos_profile: {
depth : 15 # 缓存最近15条消息
}
pending_queue_size: 50 # 未处理队列容量50
},
{
channel: "/apollo/localization/pose"
qos_profile: {
depth : 15
}
pending_queue_size: 50
}
]
}
}
}
详细解释:
-
module_library:
- 编译后的动态库路径
- 通常以<code>lib*.so</code>形式
- 可以是绝对路径或相对于<code>/apollo</code>的路径
-
class_name:
- 必须与代码中<code>CYBER_REGISTER_COMPONENT</code>注册的类名一致
- 区分大小写
-
name:
- Component的唯一标识符
- 用于创建Cyber Node
- 日志中会显示此名称
-
config_file_path:
- Protobuf格式的业务配置文件
- 通过<code>GetProtoConfig()</code>方法读取
- 示例:规划算法参数、场景配置等
-
flag_file_path:
- GFlags格式的配置文件
- 包含命令行标志参数
- 可以覆盖代码中的默认值
-
readers:
- 定义订阅的消息通道列表
- 顺序对应Component模板参数顺序(M0, M1, M2…)
- 每个Reader可以配置QoS和队列大小
-
QoS配置:
- <code>depth</code>: 消息历史记录深度(用于晚加入的订阅者)
- <code>pending_queue_size</code>: 未处理消息的队列容量(避免消息堆积)
示例2: Control模块(定时驱动)
文件: <code>/apollo/modules/control/dag/control.dag</code>
module_config {
module_library : "/apollo/bazel-bin/modules/control/libcontrol_component.so"
# 使用timer_components而不是components
timer_components {
class_name : "ControlComponent"
config {
name: "control"
flag_file_path: "/apollo/modules/control/conf/control.conf"
# 定时间隔:10毫秒(100Hz)
interval: 10
}
}
}
关键区别:
- 使用<code>timer_components</code>字段
- 配置中有<code>interval</code>参数(毫秒)
- 没有<code>readers</code>字段(不订阅消息)
示例3: 多组件模块
一个DAG文件可以包含多个组件:
module_config {
module_library : "/apollo/bazel-bin/modules/perception/libperception_component.so"
# 第一个组件:相机感知
components {
class_name : "CameraPerceptionComponent"
config {
name: "camera_perception"
readers: [
{ channel: "/apollo/sensor/camera/front" }
]
}
}
# 第二个组件:激光雷达感知
components {
class_name : "LidarPerceptionComponent"
config {
name: "lidar_perception"
readers: [
{ channel: "/apollo/sensor/lidar/compensator/PointCloud2" }
]
}
}
# 第三个组件:融合组件
components {
class_name : "FusionComponent"
config {
name: "fusion"
readers: [
{ channel: "/apollo/perception/obstacles/camera" },
{ channel: "/apollo/perception/obstacles/lidar" }
]
}
}
}
示例4: 多库多组件
一个DAG文件也可以加载多个动态库:
# 第一个模块
module_config {
module_library : "/apollo/bazel-bin/modules/drivers/camera/libcamera_component.so"
components {
class_name : "CameraComponent"
config {
name: "camera_front"
}
}
}
# 第二个模块
module_config {
module_library : "/apollo/bazel-bin/modules/drivers/lidar/liblidar_component.so"
components {
class_name : "LidarComponent"
config {
name: "lidar_top"
}
}
}
Mainboard加载DAG流程
完整加载流程
1. Mainboard启动
└─ 解析命令行参数: -d dag_file.dag
2. ModuleController::LoadAll()
├─ 解析DAG文件路径(支持相对/绝对路径)
├─ 遍历每个DAG文件
└─ 调用LoadModule(path)
3. ModuleController::LoadModule(path)
├─ 读取DAG文件内容
├─ 解析为Protobuf对象: DagConfig
└─ 调用LoadModule(dag_config)
4. ModuleController::LoadModule(dag_config)
├─ 遍历每个module_config
│ ├─ 获取module_library路径
│ └─ ClassLoaderManager::LoadLibrary(path)
│ └─ dlopen加载.so文件
│
├─ 遍历components
│ ├─ 获取class_name
│ ├─ ClassLoaderManager::CreateClassObj<ComponentBase>(class_name)
│ │ └─ 查找全局注册表,调用工厂函数创建实例
│ ├─ component->Initialize(config)
│ │ ├─ 创建Node(config.name)
│ │ ├─ LoadConfigFiles(config)
│ │ │ ├─ 加载config_file_path
│ │ │ └─ 加载flag_file_path
│ │ ├─ 调用Init() [用户实现]
│ │ ├─ 创建Reader订阅消息
│ │ │ └─ node_->CreateReader<M>(channel, callback)
│ │ └─ Scheduler::CreateTask()
│ │ └─ 注册到调度器
│ └─ 保存到component_list_
│
└─ 遍历timer_components
└─ 类似处理(创建Timer而非Reader)
5. 所有组件加载完成
└─ 等待消息到达或定时器触发
关键步骤详解
步骤1: DAG文件路径解析
// 支持三种路径格式
// 格式1: 纯文件名 → 在 /apollo/dag 下查找
mainboard -d planning.dag
// 实际路径: /apollo/dag/planning.dag
// 格式2: 绝对路径
mainboard -d /apollo/modules/planning/dag/planning.dag
// 格式3: 相对路径(相对于当前目录或/apollo)
mainboard -d modules/planning/dag/planning.dag
实现代码:
for (auto& dag_conf : args_.GetDAGConfList()) {
std::string module_path = "";
if (dag_conf == common::GetFileName(dag_conf)) {
// 情况1: 纯文件名
module_path = common::GetAbsolutePath(dag_root_path, dag_conf);
} else if (dag_conf[0] == '/') {
// 情况2: 绝对路径
module_path = dag_conf;
} else {
// 情况3: 相对路径
module_path = common::GetAbsolutePath(current_path, dag_conf);
if (!common::PathExists(module_path)) {
module_path = common::GetAbsolutePath(work_root, dag_conf);
}
}
paths.emplace_back(std::move(module_path));
}
步骤2: Protobuf解析
DagConfig dag_config;
if (!common::GetProtoFromFile(path, &dag_config)) {
AERROR << "Get proto failed, file: " << path;
return false;
}
DAG文件虽然看起来像文本格式,但实际上是Protobuf的Text Format,可以自动转换为对象。
步骤3: 动态库加载
for (auto module_config : dag_config.module_config()) {
std::string load_path = module_config.module_library();
// 加载动态库
class_loader_manager_.LoadLibrary(load_path);
}
ClassLoaderManager的作用:
- 避免重复加载同一个库
- 管理库的生命周期
- 提供统一的类创建接口
步骤4: 组件创建与初始化
for (auto& component : module_config.components()) {
const std::string& class_name = component.class_name();
// 1. 创建实例(通过注册表)
std::shared_ptr<ComponentBase> base =
class_loader_manager_.CreateClassObj<ComponentBase>(class_name);
if (base == nullptr) {
AERROR << "Failed to create component: " << class_name;
return false;
}
// 2. 初始化组件
if (!base->Initialize(component.config())) {
AERROR << "Failed to initialize component: " << class_name;
return false;
}
// 3. 保存到列表(保持生命周期)
component_list_.emplace_back(std::move(base));
}
Initialize做了什么:
bool Component<M0>::Initialize(const ComponentConfig& config) {
// 1. 创建Cyber Node(通信节点)
node_.reset(new Node(config.name()));
// 2. 加载配置文件
LoadConfigFiles(config);
// ├─ 加载config_file_path(业务配置)
// └─ 加载flag_file_path(GFlags)
// 3. 调用用户的Init()方法
if (!Init()) {
return false;
}
// 4. 创建Reader订阅消息
ReaderConfig reader_cfg;
reader_cfg.channel_name = config.readers(0).channel();
reader_cfg.qos_profile.CopyFrom(config.readers(0).qos_profile());
auto callback = [self](const std::shared_ptr<M0>& msg) {
self->Process(msg); // 收到消息时调用
};
auto reader = node_->CreateReader<M0>(reader_cfg, callback);
readers_.emplace_back(std::move(reader));
// 5. 注册到Scheduler
return scheduler::Instance()->CreateTask(factory, node_->Name());
}
组件注册机制
CYBER_REGISTER_COMPONENT宏
每个Component类必须在源文件中注册:
// planning_component.cc
#include "modules/planning/planning_component.h"
CYBER_REGISTER_COMPONENT(PlanningComponent)
宏展开
#define CYBER_REGISTER_COMPONENT(name) \
CLASS_LOADER_REGISTER_CLASS(name, apollo::cyber::ComponentBase)
进一步展开:
CLASS_LOADER_REGISTER_CLASS(PlanningComponent, apollo::cyber::ComponentBase)
// 展开为:
namespace {
struct PlanningComponent_Registrar {
PlanningComponent_Registrar() {
// 注册类名到工厂函数的映射
apollo::cyber::class_loader::utility::RegisterClass(
"PlanningComponent",
[]() -> apollo::cyber::ComponentBase* {
return new PlanningComponent();
}
);
}
};
// 全局对象,在库加载时自动执行构造函数
static PlanningComponent_Registrar g_PlanningComponent_registrar;
}
注册流程
1. 动态库被dlopen加载
↓
2. 执行全局对象的构造函数
↓
3. 调用RegisterClass()
↓
4. 将类名和工厂函数存入全局注册表
↓
5. ClassLoader可以通过类名查找工厂函数
↓
6. 调用工厂函数创建实例
全局注册表
// 简化的注册表实现
std::map<std::string, std::function<ComponentBase*()>> g_class_registry;
void RegisterClass(const std::string& class_name,
std::function<ComponentBase*()> factory) {
g_class_registry[class_name] = factory;
}
ComponentBase* CreateClassObj(const std::string& class_name) {
auto it = g_class_registry.find(class_name);
if (it != g_class_registry.end()) {
return it->second(); // 调用工厂函数
}
return nullptr;
}
为什么需要注册?
- 解耦: DAG文件不需要include头文件
- 动态性: 运行时才决定创建哪个类
- 扩展性: 新增Component无需修改mainboard代码
Reader与Writer配置
Reader配置详解
基本配置
readers: [
{
channel: "/apollo/prediction"
}
]
最简单的配置,使用默认QoS和队列大小。
完整配置
readers: [
{
# 消息通道名称
channel: "/apollo/canbus/chassis"
# 服务质量配置
qos_profile: {
depth : 15 # 历史消息深度
}
# 待处理队列大小
pending_queue_size: 50
}
]
字段说明:
-
channel: 消息通道名称
- 必须与发布者的channel一致
- 通常以<code>/apollo/</code>开头
-
qos_profile.depth: 消息历史深度
- 保存最近N条消息
- 用于晚加入的订阅者获取历史数据
- 默认值: 1
-
pending_queue_size: 未处理消息队列容量
- 当处理速度慢于接收速度时的缓冲
- 超出容量时会丢弃旧消息
- 默认值: 1
QoS配置的影响
depth = 1 (默认):
发布者: [M1] → [M2] → [M3]
订阅者: ↓ 只能收到M3
depth = 3:
发布者: [M1] → [M2] → [M3]
订阅者: ↓ ↓ ↓ 可以收到M1, M2, M3
使用场景:
- <code>depth=1</code>: 只关心最新数据(如传感器读数)
- <code>depth>1</code>: 需要历史数据(如轨迹预测)
pending_queue_size的作用
消息到达速度: 100 msg/s
处理速度: 50 msg/s
pending_queue_size = 10:
队列: [M1][M2][M3]...[M10] → [M11到达,M1被丢弃]
设置建议:
- 处理速度快: 较小值(1-10)
- 处理速度慢: 较大值(30-100)
- 不允许丢消息: 设置足够大的值
Writer配置
Writer不在DAG中配置,而是在Component代码中创建:
// planning_component.cc
bool PlanningComponent::Init() {
// 创建Writer发布规划结果
planning_writer_ = node_->CreateWriter<ADCTrajectory>(
"/apollo/planning" // channel名称
);
return true;
}
bool PlanningComponent::Proc(...) {
// 创建消息
auto trajectory = std::make_shared<ADCTrajectory>();
// ... 填充数据
// 发布消息
planning_writer_->Write(trajectory);
return true;
}
Reader与Writer的通信
Planning Component Control Component
↓ ↑
Writer("/apollo/planning") Reader("/apollo/planning")
↓ ↑
└────── CyberRT Transport ──────────┘
通信流程:
- Planning的Writer发布消息到<code>/apollo/planning</code>
- CyberRT的Transport层接收消息
- 查找订阅了此channel的Reader
- 将消息分发给所有Reader
- Reader触发Component的Process/Proc方法
实战示例
示例1: 创建一个简单的Component
目标
创建一个接收Driver消息并打印的Component。
步骤1: 定义Proto消息
// examples.proto
syntax = "proto2";
package apollo.cyber.examples.proto;
message Driver {
optional string content = 1;
optional uint64 msg_id = 2;
optional uint64 timestamp = 3;
}
步骤2: 实现Component
// simple_component.h
#pragma once
#include "cyber/component/component.h"
#include "cyber/examples/proto/examples.pb.h"
using apollo::cyber::Component;
using apollo::cyber::examples::proto::Driver;
class SimpleComponent : public Component<Driver> {
public:
bool Init() override;
bool Proc(const std::shared_ptr<Driver>& msg) override;
};
CYBER_REGISTER_COMPONENT(SimpleComponent)
// simple_component.cc
#include "simple_component.h"
bool SimpleComponent::Init() {
AINFO << "SimpleComponent Init";
return true;
}
bool SimpleComponent::Proc(const std::shared_ptr<Driver>& msg) {
AINFO << "Received message " << msg->msg_id()
<< " with content: " << msg->content();
return true;
}
步骤3: 编写BUILD文件
cc_binary(
name = "libsimple_component.so",
linkshared = True,
linkstatic = False,
srcs = [
"simple_component.h",
"simple_component.cc",
],
deps = [
"//cyber",
"//cyber/examples/proto:examples_cc_proto",
],
)
步骤4: 创建DAG配置
# simple.dag
module_config {
module_library : "/apollo/bazel-bin/examples/libsimple_component.so"
components {
class_name : "SimpleComponent"
config {
name: "simple"
readers: [
{
channel: "/apollo/test/driver"
}
]
}
}
}
步骤5: 编译和运行
# 编译
bazel build //examples:libsimple_component.so
# 启动Component
mainboard -d simple.dag
# 在另一个终端发布测试消息
cyber_channel pub /apollo/test/driver \
examples.proto.Driver \
'{"content": "Hello", "msg_id": 1}'
示例2: 多输入Component
目标
创建一个融合两路传感器数据的Component。
实现
// fusion_component.h
#pragma once
#include "cyber/component/component.h"
#include "sensor_msgs/image.pb.h"
#include "sensor_msgs/pointcloud.pb.h"
class FusionComponent : public Component<
sensor_msgs::Image, // M0: 相机数据
sensor_msgs::PointCloud // M1: 激光雷达数据
> {
public:
bool Init() override;
bool Proc(const std::shared_ptr<sensor_msgs::Image>& image,
const std::shared_ptr<sensor_msgs::PointCloud>& cloud) override;
};
CYBER_REGISTER_COMPONENT(FusionComponent)
// fusion_component.cc
bool FusionComponent::Init() {
AINFO << "FusionComponent Init";
return true;
}
bool FusionComponent::Proc(
const std::shared_ptr<sensor_msgs::Image>& image,
const std::shared_ptr<sensor_msgs::PointCloud>& cloud) {
AINFO << "Fusing image (timestamp: " << image->header().timestamp_sec()
<< ") with pointcloud (timestamp: " << cloud->header().timestamp_sec()
<< ")";
// 执行融合算法...
return true;
}
DAG配置
# fusion.dag
module_config {
module_library : "/apollo/bazel-bin/perception/libfusion_component.so"
components {
class_name : "FusionComponent"
config {
name: "sensor_fusion"
# 注意:readers的顺序必须与Component模板参数顺序一致
readers: [
{
channel: "/apollo/sensor/camera/front/image" # M0
},
{
channel: "/apollo/sensor/lidar/pointcloud" # M1
}
]
}
}
}
重要: readers的顺序必须与Component模板参数一致!
示例3: TimerComponent
目标
创建一个每秒打印一次心跳的Component。
实现
// heartbeat_component.h
#pragma once
#include "cyber/component/timer_component.h"
class HeartbeatComponent : public apollo::cyber::TimerComponent {
public:
bool Init() override;
bool Proc() override;
private:
uint64_t count_ = 0;
};
CYBER_REGISTER_COMPONENT(HeartbeatComponent)
// heartbeat_component.cc
#include "heartbeat_component.h"
bool HeartbeatComponent::Init() {
AINFO << "HeartbeatComponent Init";
return true;
}
bool HeartbeatComponent::Proc() {
count_++;
AINFO << "Heartbeat #" << count_ << " at " << apollo::cyber::Time::Now();
return true;
}
DAG配置
# heartbeat.dag
module_config {
module_library : "/apollo/bazel-bin/monitor/libheartbeat_component.so"
timer_components {
class_name : "HeartbeatComponent"
config {
name: "heartbeat"
interval: 1000 # 1秒 = 1000毫秒
}
}
}
运行
mainboard -d heartbeat.dag
# 输出:
# [INFO] HeartbeatComponent Init
# [INFO] Heartbeat #1 at 1634567890.123
# [INFO] Heartbeat #2 at 1634567891.123
# [INFO] Heartbeat #3 at 1634567892.123
# ...
示例4: 混合使用Component和TimerComponent
场景
一个模块包含数据采集组件(消息驱动)和数据处理组件(定时触发)。
DAG配置
module_config {
module_library : "/apollo/bazel-bin/mymodule/libmymodule.so"
# 组件1: 接收传感器数据(消息驱动)
components {
class_name : "SensorComponent"
config {
name: "sensor_receiver"
readers: [
{ channel: "/apollo/sensor/data" }
]
}
}
# 组件2: 定期处理数据(定时驱动)
timer_components {
class_name : "ProcessorComponent"
config {
name: "data_processor"
interval: 100 # 100ms处理一次
}
}
}
最佳实践
1. DAG文件组织
建议结构
/apollo/modules/<模块名>/
├── dag/
│ ├── <模块名>.dag # 主DAG文件
│ ├── <模块名>_test.dag # 测试DAG
│ └── <模块名>_<变体>.dag # 变体配置
├── conf/
│ ├── <模块名>_config.pb.txt # Protobuf配置
│ └── <模块名>.conf # GFlags配置
└── proto/
└── <模块名>_config.proto # 配置Proto定义
命名规范
- DAG文件: 小写,下划线分隔
- Component类名: 大驼峰,以Component结尾
- 组件名称: 小写,下划线分隔,简洁明了
2. Reader配置优化
根据场景选择QoS
# 场景1: 实时性要求高(控制指令)
readers: [
{
channel: "/apollo/control/command"
qos_profile: { depth: 1 } # 只要最新的
pending_queue_size: 1 # 不积压
}
]
# 场景2: 需要历史数据(障碍物追踪)
readers: [
{
channel: "/apollo/perception/obstacles"
qos_profile: { depth: 10 } # 保留最近10帧
pending_queue_size: 20 # 允许一定积压
}
]
# 场景3: 不允许丢消息(日志记录)
readers: [
{
channel: "/apollo/monitor/events"
qos_profile: { depth: 100 } # 保留更多历史
pending_queue_size: 1000 # 大缓冲区
}
]
3. Component设计原则
单一职责
// 好的设计:一个Component做一件事
class LaneDetectionComponent : public Component<Image> {
bool Proc(const std::shared_ptr<Image>& img) {
// 只做车道线检测
}
};
class ObstacleDetectionComponent : public Component<Image> {
bool Proc(const std::shared_ptr<Image>& img) {
// 只做障碍物检测
}
};
// 不好的设计:一个Component做太多事
class PerceptionComponent : public Component<Image> {
bool Proc(const std::shared_ptr<Image>& img) {
// 车道线检测
// 障碍物检测
// 红绿灯识别
// 可行驶区域分割
// ...
}
};
配置外部化
// 好的设计:参数从配置文件读取
bool PlanningComponent::Init() {
PlanningConfig config;
if (!GetProtoConfig(&config)) {
return false;
}
max_speed_ = config.max_speed();
safe_distance_ = config.safe_distance();
return true;
}
// 不好的设计:硬编码参数
bool PlanningComponent::Init() {
max_speed_ = 30.0; // 写死在代码里
safe_distance_ = 5.0;
return true;
}
4. 错误处理
Init()中的验证
bool MyComponent::Init() {
// 1. 读取配置
MyConfig config;
if (!GetProtoConfig(&config)) {
AERROR << "Failed to load config";
return false; // 初始化失败会阻止模块启动
}
// 2. 验证参数
if (config.max_speed() <= 0) {
AERROR << "Invalid max_speed: " << config.max_speed();
return false;
}
// 3. 初始化资源
writer_ = node_->CreateWriter<MyMsg>("/apollo/my/channel");
if (writer_ == nullptr) {
AERROR << "Failed to create writer";
return false;
}
return true;
}
Proc()中的异常处理
bool MyComponent::Proc(const std::shared_ptr<InputMsg>& msg) {
// 1. 输入验证
if (msg == nullptr) {
AERROR << "Received null message";
return false; // 返回false会记录错误日志
}
// 2. 业务逻辑
try {
auto result = ProcessData(*msg);
// 3. 发布结果
writer_->Write(result);
return true;
} catch (const std::exception& e) {
AERROR << "Processing failed: " << e.what();
return false;
}
}
5. 性能优化
避免频繁内存分配
// 好的设计:复用对象
class MyComponent : public Component<Input> {
private:
OutputMsg output_; // 成员变量,复用
public:
bool Proc(const std::shared_ptr<Input>& input) {
output_.Clear(); // 清空而非重新分配
// 填充output_
writer_->Write(output_);
return true;
}
};
// 不好的设计:每次都new
class MyComponent : public Component<Input> {
public:
bool Proc(const std::shared_ptr<Input>& input) {
auto output = std::make_shared<OutputMsg>(); // 频繁分配
// 填充output
writer_->Write(output);
return true;
}
};
控制日志频率
bool MyComponent::Proc(const std::shared_ptr<Input>& input) {
static uint64_t count = 0;
count++;
// 每100次打印一次,而非每次都打印
if (count % 100 == 0) {
AINFO << "Processed " << count << " messages";
}
// 业务逻辑...
return true;
}
6. 调试技巧
使用cyber_monitor监控
# 启动模块
mainboard -d planning.dag
# 在另一个终端监控消息
cyber_monitor
# 或查看特定channel
cyber_channel echo /apollo/planning
使用cyber_recorder录制和回放
# 录制消息
cyber_recorder record -a # 录制所有channel
# 回放消息
cyber_recorder play -f <record_file>
# 这样可以离线调试Component,无需实际硬件
添加详细日志
bool MyComponent::Proc(const std::shared_ptr<Input>& input) {
ADEBUG << "========== Proc Start ==========";
ADEBUG << "Input timestamp: " << input->timestamp();
ADEBUG << "Input size: " << input->data_size();
// 业务逻辑...
ADEBUG << "Output generated: " << output.DebugString();
ADEBUG << "========== Proc End ==========";
return true;
}
7. 配置版本管理
使用Git管理DAG和配置
# 提交前检查
git diff modules/planning/dag/planning.dag
git diff modules/planning/conf/planning_config.pb.txt
# 添加有意义的commit message
git commit -m "feat(planning): add emergency stop scenario
- Add emergency_stop.dag
- Update planning_config with new parameters
- Increase QoS depth for obstacle channel"
配置文件注释
# planning.dag
module_config {
module_library : "/apollo/bazel-bin/modules/planning/libplanning_component.so"
components {
class_name : "PlanningComponent"
config {
name: "planning"
readers: [
{
# 订阅预测模块的输出
# 注意:此channel必须在prediction模块启动后才可用
channel: "/apollo/prediction"
},
{
# 底盘状态,高频更新(100Hz)
# depth=15保证不丢失最近的状态
channel: "/apollo/canbus/chassis"
qos_profile: { depth : 15 }
pending_queue_size: 50
}
]
}
}
}
常见问题
Q1: Component创建失败怎么办?
错误信息:
[ERROR] CreateClassObj failed, ensure class has been registered.
classname: MyComponent, lib: /apollo/bazel-bin/mymodule/libmymodule.so
可能原因:
- 忘记添加<code>CYBER_REGISTER_COMPONENT(MyComponent)</code>
- class_name拼写错误(区分大小写)
- 动态库未正确编译或链接
解决方法:
// 1. 确保注册宏存在
CYBER_REGISTER_COMPONENT(MyComponent) // 在.cc文件中
// 2. 检查类名是否一致
class_name : "MyComponent" // DAG中
CYBER_REGISTER_COMPONENT(MyComponent) // 代码中
// 3. 重新编译
bazel build //mymodule:libmymodule.so
Q2: Reader收不到消息?
可能原因:
- channel名称不匹配
- 消息类型不匹配
- 发布者未启动
- QoS配置不当
调试步骤:
# 1. 检查channel是否存在
cyber_channel list
# 2. 检查消息是否发布
cyber_channel echo /apollo/my/channel
# 3. 检查消息类型
cyber_channel info /apollo/my/channel
# 4. 确认Component已启动
cyber_node list
Q3: 多输入Component何时触发?
规则:
- 所有输入通道至少收到1条消息后才触发
- 之后每当第一个通道收到新消息时触发
- 其他通道使用最新的消息
示例:
Component<M0, M1>
时刻1: M0收到消息 → 不触发(M1还没有消息)
时刻2: M1收到消息 → 触发Proc(M0最新, M1最新)
时刻3: M0收到消息 → 触发Proc(M0新, M1旧)
时刻4: M1收到消息 → 不触发(M1不是第一个输入)
时刻5: M0收到消息 → 触发Proc(M0新, M1新)
Q4: 如何在Component之间共享数据?
方法1: 通过消息通道(推荐)
// ComponentA
bool ComponentA::Proc(...) {
auto data = std::make_shared<MyData>();
// 填充data
writer_->Write(data); // 发布到channel
return true;
}
// ComponentB
bool ComponentB::Proc(const std::shared_ptr<MyData>& data) {
// 使用data
return true;
}
方法2: 通过全局单例(不推荐,仅用于只读配置)
class SharedData {
public:
static SharedData& Instance() {
static SharedData instance;
return instance;
}
const Config& GetConfig() const { return config_; }
private:
Config config_;
};
Q5: DAG文件可以动态修改吗?
答案: 不可以。
DAG文件在mainboard启动时加载,运行时修改不会生效。
如果需要修改:
- 停止mainboard(Ctrl+C)
- 修改DAG文件
- 重新启动mainboard
动态配置的正确方式:
- 使用Protobuf配置文件(可以在Init中重新加载)
- 使用GFlags(可以通过cyber_launch修改)
- 订阅配置更新消息
总结
DAG配置的核心概念
- 声明式配置: DAG文件描述"要加载什么",而非"如何加载"
- 模块化设计: 一个模块一个DAG,便于管理和复用
- 松耦合: 模块之间通过消息通道通信,不直接依赖
Mainboard启动机制
命令行 → 解析DAG → 加载库 → 创建实例 → 初始化 → 订阅消息 → 等待触发
Component两种模式
- 消息驱动: 响应式处理,按需执行
- 定时驱动: 周期性处理,固定频率
关键设计模式
- 工厂模式: 动态创建Component
- 注册模式: 类名到工厂函数的映射
- 发布订阅: Reader/Writer解耦通信
使用建议
- ✅ 合理划分Component粒度
- ✅ 配置参数外部化
- ✅ 完善错误处理
- ✅ 添加详细日志
- ✅ 注意性能优化
- ✅ 版本化管理配置文件
参考文件
-
Proto定义:
- <code>/apollo/cyber/proto/dag_conf.proto:1-26</code>
- <code>/apollo/cyber/proto/component_conf.proto:1-28</code>
-
源码实现:
- <code>/apollo/cyber/mainboard/module_controller.cc:37-137</code>
- <code>/apollo/cyber/component/component.h:56-557</code>
- <code>/apollo/cyber/component/timer_component.h:35-67</code>
-
配置示例:
- <code>/apollo/modules/planning/dag/planning.dag:1-31</code>
- <code>/apollo/modules/control/dag/control.dag:1-12</code>
- <code>/apollo/cyber/examples/common_component_example/common.dag:1-17</code>