DAG概述
什么是DAG?
DAG = Directed Acyclic Graph(有向无环图)
在Apollo Cyber RT中,DAG文件是模块配置文件,定义了:
- 要加载哪些动态库(.so文件)
 - 要创建哪些Component实例
 - 每个Component订阅哪些消息通道
 - Component的配置参数
 
DAG的作用
DAG配置文件
    ↓
描述模块的组成结构
    ↓
Mainboard根据DAG加载模块
    ↓
动态创建Component实例
    ↓
自动建立消息订阅关系
为什么叫DAG?
虽然配置文件不直接表示图结构,但Apollo的模块之间通过消息传递形成的依赖关系构成了一个有向无环图:
Perception → Prediction → Planning → Control
     ↓           ↓           ↓          ↓
  感知结果    预测轨迹    规划路径   控制指令
DAG确保了模块之间不会形成循环依赖,保证系统的稳定性。
DAG文件的位置
Apollo中所有模块的DAG文件都遵循统一的目录结构:
/apollo/modules/<模块名>/dag/<模块名>.dag
示例:
/apollo/modules/planning/dag/planning.dag
/apollo/modules/control/dag/control.dag
/apollo/modules/perception/production/dag/dag_streaming_perception.dag
DAG的Protobuf结构
DAG文件使用Protocol Buffers格式定义,具有严格的类型检查。
核心Proto定义
1. DagConfig (顶层结构)
文件位置: <code>/apollo/cyber/proto/dag_conf.proto</code>
message DagConfig {
  repeated ModuleConfig module_config = 1;
}
一个DAG文件包含一个或多个ModuleConfig。
2. ModuleConfig (模块配置)
message ModuleConfig {
  optional string module_library = 1;              // 动态库路径
  repeated ComponentInfo components = 2;           // 普通组件列表
  repeated TimerComponentInfo timer_components = 3; // 定时组件列表
}
字段说明:
- <code>module_library</code>: .so动态库的绝对路径或相对路径
 - <code>components</code>: 消息驱动的组件(收到消息时触发)
 - <code>timer_components</code>: 定时触发的组件(按固定频率触发)
 
3. ComponentInfo (组件信息)
message ComponentInfo {
  optional string class_name = 1;        // 组件类名
  optional ComponentConfig config = 2;   // 组件配置
}
4. ComponentConfig (组件配置)
文件位置: <code>/apollo/cyber/proto/component_conf.proto</code>
message ComponentConfig {
  optional string name = 1;                // 组件名称(唯一标识)
  optional string config_file_path = 2;    // Protobuf配置文件路径
  optional string flag_file_path = 3;      // GFlags配置文件路径
  repeated ReaderOption readers = 4;       // 订阅的消息通道列表
}
5. ReaderOption (消息订阅配置)
message ReaderOption {
  optional string channel = 1;               // 消息通道名
  optional QosProfile qos_profile = 2;       // 服务质量配置
  optional uint32 pending_queue_size = 3     // 未处理消息队列大小
      [default = 1];
}
QosProfile (服务质量):
message QosProfile {
  optional uint32 depth = 1 [default = 1];  // 消息缓存深度
}
6. TimerComponentInfo (定时组件信息)
message TimerComponentInfo {
  optional string class_name = 1;
  optional TimerComponentConfig config = 2;
}
7. TimerComponentConfig (定时组件配置)
message TimerComponentConfig {
  optional string name = 1;                // 组件名称
  optional string config_file_path = 2;    // 配置文件路径
  optional string flag_file_path = 3;      // Flag文件路径
  optional uint32 interval = 4;            // 触发间隔(毫秒)
}
完整结构层次
DagConfig
  └─ ModuleConfig (可多个)
      ├─ module_library: string
      ├─ ComponentInfo (可多个)
      │   ├─ class_name: string
      │   └─ ComponentConfig
      │       ├─ name: string
      │       ├─ config_file_path: string
      │       ├─ flag_file_path: string
      │       └─ ReaderOption (可多个)
      │           ├─ channel: string
      │           ├─ qos_profile
      │           │   └─ depth: uint32
      │           └─ pending_queue_size: uint32
      └─ TimerComponentInfo (可多个)
          ├─ class_name: string
          └─ TimerComponentConfig
              ├─ name: string
              ├─ config_file_path: string
              ├─ flag_file_path: string
              └─ interval: uint32
Component类型详解
Apollo Cyber RT提供了两种类型的Component:
1. Component (消息驱动组件)
特点:
- 订阅一个或多个消息通道
 - 当消息到达时触发处理
 - 支持1-4个输入通道
 
类模板:
template <typename M0 = NullType,
          typename M1 = NullType,
          typename M2 = NullType,
          typename M3 = NullType>
class Component : public ComponentBase;
使用场景:
- Planning: 订阅Prediction、Chassis、Localization
 - Control: 订阅Planning的轨迹输出
 - Prediction: 订阅Perception的障碍物信息
 
示例:
class PlanningComponent : public Component<
    prediction::PredictionObstacles,  // M0
    canbus::Chassis,                  // M1
    localization::LocalizationEstimate // M2
> {
 public:
  bool Init() override;
  // 三个消息到齐后触发
  bool Proc(const std::shared_ptr<prediction::PredictionObstacles>& pred,
            const std::shared_ptr<canbus::Chassis>& chassis,
            const std::shared_ptr<localization::LocalizationEstimate>& loc) override;
};
触发机制:
- 单输入: 收到消息立即触发
 - 多输入: 等待所有输入通道至少有一条消息后触发
 
2. TimerComponent (定时触发组件)
特点:
- 不订阅消息通道
 - 按固定时间间隔触发
 - 主动从其他模块拉取数据
 
基类:
class TimerComponent : public ComponentBase {
 public:
  bool Initialize(const TimerComponentConfig& config) override;
  virtual bool Proc() = 0;  // 定时调用
 private:
  uint64_t interval_ = 0;    // 触发间隔(毫秒)
  std::unique_ptr<Timer> timer_;
};
使用场景:
- Control: 以固定频率(10ms)输出控制指令
 - Monitor: 定期检查系统状态
 - 周期性任务(日志记录、健康检查等)
 
示例:
class ControlComponent : public TimerComponent {
 public:
  bool Init() override;
  // 每10ms调用一次
  bool Proc() override;
};
Component vs TimerComponent对比
| 特性 | Component | TimerComponent | 
|---|---|---|
| 触发方式 | 消息驱动 | 定时触发 | 
| 输入 | 1-4个通道 | 无(主动拉取) | 
| 配置字段 | readers | interval | 
| 适用场景 | 响应式处理 | 周期性任务 | 
| 延迟 | 取决于消息到达 | 固定周期 | 
| CPU占用 | 有消息时执行 | 持续执行 | 
DAG配置文件详解
示例1: Planning模块(消息驱动)
文件: <code>/apollo/modules/planning/dag/planning.dag</code>
# Define all components in DAG streaming.
module_config {
  # 1. 指定动态库路径
  module_library : "/apollo/bazel-bin/modules/planning/libplanning_component.so"
  # 2. 定义组件
  components {
    # 2.1 组件类名(必须在库中注册)
    class_name : "PlanningComponent"
    # 2.2 组件配置
    config {
      # 组件名称(唯一标识)
      name: "planning"
      # Protobuf配置文件
      config_file_path: "/apollo/modules/planning/conf/planning_config.pb.txt"
      # GFlags配置文件
      flag_file_path: "/apollo/modules/planning/conf/planning.conf"
      # 订阅的消息通道
      readers: [
        {
          channel: "/apollo/prediction"
        },
        {
          channel: "/apollo/canbus/chassis"
          qos_profile: {
            depth : 15  # 缓存最近15条消息
          }
          pending_queue_size: 50  # 未处理队列容量50
        },
        {
          channel: "/apollo/localization/pose"
          qos_profile: {
            depth : 15
          }
          pending_queue_size: 50
        }
      ]
    }
  }
}
详细解释:
- 
module_library:
- 编译后的动态库路径
 - 通常以<code>lib*.so</code>形式
 - 可以是绝对路径或相对于<code>/apollo</code>的路径
 
 - 
class_name:
- 必须与代码中<code>CYBER_REGISTER_COMPONENT</code>注册的类名一致
 - 区分大小写
 
 - 
name:
- Component的唯一标识符
 - 用于创建Cyber Node
 - 日志中会显示此名称
 
 - 
config_file_path:
- Protobuf格式的业务配置文件
 - 通过<code>GetProtoConfig()</code>方法读取
 - 示例:规划算法参数、场景配置等
 
 - 
flag_file_path:
- GFlags格式的配置文件
 - 包含命令行标志参数
 - 可以覆盖代码中的默认值
 
 - 
readers:
- 定义订阅的消息通道列表
 - 顺序对应Component模板参数顺序(M0, M1, M2…)
 - 每个Reader可以配置QoS和队列大小
 
 - 
QoS配置:
- <code>depth</code>: 消息历史记录深度(用于晚加入的订阅者)
 - <code>pending_queue_size</code>: 未处理消息的队列容量(避免消息堆积)
 
 
示例2: Control模块(定时驱动)
文件: <code>/apollo/modules/control/dag/control.dag</code>
module_config {
  module_library : "/apollo/bazel-bin/modules/control/libcontrol_component.so"
  # 使用timer_components而不是components
  timer_components {
    class_name : "ControlComponent"
    config {
      name: "control"
      flag_file_path: "/apollo/modules/control/conf/control.conf"
      # 定时间隔:10毫秒(100Hz)
      interval: 10
    }
  }
}
关键区别:
- 使用<code>timer_components</code>字段
 - 配置中有<code>interval</code>参数(毫秒)
 - 没有<code>readers</code>字段(不订阅消息)
 
示例3: 多组件模块
一个DAG文件可以包含多个组件:
module_config {
  module_library : "/apollo/bazel-bin/modules/perception/libperception_component.so"
  # 第一个组件:相机感知
  components {
    class_name : "CameraPerceptionComponent"
    config {
      name: "camera_perception"
      readers: [
        { channel: "/apollo/sensor/camera/front" }
      ]
    }
  }
  # 第二个组件:激光雷达感知
  components {
    class_name : "LidarPerceptionComponent"
    config {
      name: "lidar_perception"
      readers: [
        { channel: "/apollo/sensor/lidar/compensator/PointCloud2" }
      ]
    }
  }
  # 第三个组件:融合组件
  components {
    class_name : "FusionComponent"
    config {
      name: "fusion"
      readers: [
        { channel: "/apollo/perception/obstacles/camera" },
        { channel: "/apollo/perception/obstacles/lidar" }
      ]
    }
  }
}
示例4: 多库多组件
一个DAG文件也可以加载多个动态库:
# 第一个模块
module_config {
  module_library : "/apollo/bazel-bin/modules/drivers/camera/libcamera_component.so"
  components {
    class_name : "CameraComponent"
    config {
      name: "camera_front"
    }
  }
}
# 第二个模块
module_config {
  module_library : "/apollo/bazel-bin/modules/drivers/lidar/liblidar_component.so"
  components {
    class_name : "LidarComponent"
    config {
      name: "lidar_top"
    }
  }
}
Mainboard加载DAG流程
完整加载流程
1. Mainboard启动
   └─ 解析命令行参数: -d dag_file.dag
2. ModuleController::LoadAll()
   ├─ 解析DAG文件路径(支持相对/绝对路径)
   ├─ 遍历每个DAG文件
   └─ 调用LoadModule(path)
3. ModuleController::LoadModule(path)
   ├─ 读取DAG文件内容
   ├─ 解析为Protobuf对象: DagConfig
   └─ 调用LoadModule(dag_config)
4. ModuleController::LoadModule(dag_config)
   ├─ 遍历每个module_config
   │   ├─ 获取module_library路径
   │   └─ ClassLoaderManager::LoadLibrary(path)
   │       └─ dlopen加载.so文件
   │
   ├─ 遍历components
   │   ├─ 获取class_name
   │   ├─ ClassLoaderManager::CreateClassObj<ComponentBase>(class_name)
   │   │   └─ 查找全局注册表,调用工厂函数创建实例
   │   ├─ component->Initialize(config)
   │   │   ├─ 创建Node(config.name)
   │   │   ├─ LoadConfigFiles(config)
   │   │   │   ├─ 加载config_file_path
   │   │   │   └─ 加载flag_file_path
   │   │   ├─ 调用Init() [用户实现]
   │   │   ├─ 创建Reader订阅消息
   │   │   │   └─ node_->CreateReader<M>(channel, callback)
   │   │   └─ Scheduler::CreateTask()
   │   │       └─ 注册到调度器
   │   └─ 保存到component_list_
   │
   └─ 遍历timer_components
       └─ 类似处理(创建Timer而非Reader)
5. 所有组件加载完成
   └─ 等待消息到达或定时器触发
关键步骤详解
步骤1: DAG文件路径解析
// 支持三种路径格式
// 格式1: 纯文件名 → 在 /apollo/dag 下查找
mainboard -d planning.dag
// 实际路径: /apollo/dag/planning.dag
// 格式2: 绝对路径
mainboard -d /apollo/modules/planning/dag/planning.dag
// 格式3: 相对路径(相对于当前目录或/apollo)
mainboard -d modules/planning/dag/planning.dag
实现代码:
for (auto& dag_conf : args_.GetDAGConfList()) {
  std::string module_path = "";
  if (dag_conf == common::GetFileName(dag_conf)) {
    // 情况1: 纯文件名
    module_path = common::GetAbsolutePath(dag_root_path, dag_conf);
  } else if (dag_conf[0] == '/') {
    // 情况2: 绝对路径
    module_path = dag_conf;
  } else {
    // 情况3: 相对路径
    module_path = common::GetAbsolutePath(current_path, dag_conf);
    if (!common::PathExists(module_path)) {
      module_path = common::GetAbsolutePath(work_root, dag_conf);
    }
  }
  paths.emplace_back(std::move(module_path));
}
步骤2: Protobuf解析
DagConfig dag_config;
if (!common::GetProtoFromFile(path, &dag_config)) {
  AERROR << "Get proto failed, file: " << path;
  return false;
}
DAG文件虽然看起来像文本格式,但实际上是Protobuf的Text Format,可以自动转换为对象。
步骤3: 动态库加载
for (auto module_config : dag_config.module_config()) {
  std::string load_path = module_config.module_library();
  // 加载动态库
  class_loader_manager_.LoadLibrary(load_path);
}
ClassLoaderManager的作用:
- 避免重复加载同一个库
 - 管理库的生命周期
 - 提供统一的类创建接口
 
步骤4: 组件创建与初始化
for (auto& component : module_config.components()) {
  const std::string& class_name = component.class_name();
  // 1. 创建实例(通过注册表)
  std::shared_ptr<ComponentBase> base =
      class_loader_manager_.CreateClassObj<ComponentBase>(class_name);
  if (base == nullptr) {
    AERROR << "Failed to create component: " << class_name;
    return false;
  }
  // 2. 初始化组件
  if (!base->Initialize(component.config())) {
    AERROR << "Failed to initialize component: " << class_name;
    return false;
  }
  // 3. 保存到列表(保持生命周期)
  component_list_.emplace_back(std::move(base));
}
Initialize做了什么:
bool Component<M0>::Initialize(const ComponentConfig& config) {
  // 1. 创建Cyber Node(通信节点)
  node_.reset(new Node(config.name()));
  // 2. 加载配置文件
  LoadConfigFiles(config);
  // ├─ 加载config_file_path(业务配置)
  // └─ 加载flag_file_path(GFlags)
  // 3. 调用用户的Init()方法
  if (!Init()) {
    return false;
  }
  // 4. 创建Reader订阅消息
  ReaderConfig reader_cfg;
  reader_cfg.channel_name = config.readers(0).channel();
  reader_cfg.qos_profile.CopyFrom(config.readers(0).qos_profile());
  auto callback = [self](const std::shared_ptr<M0>& msg) {
    self->Process(msg);  // 收到消息时调用
  };
  auto reader = node_->CreateReader<M0>(reader_cfg, callback);
  readers_.emplace_back(std::move(reader));
  // 5. 注册到Scheduler
  return scheduler::Instance()->CreateTask(factory, node_->Name());
}
组件注册机制
CYBER_REGISTER_COMPONENT宏
每个Component类必须在源文件中注册:
// planning_component.cc
#include "modules/planning/planning_component.h"
CYBER_REGISTER_COMPONENT(PlanningComponent)
宏展开
#define CYBER_REGISTER_COMPONENT(name) \
  CLASS_LOADER_REGISTER_CLASS(name, apollo::cyber::ComponentBase)
进一步展开:
CLASS_LOADER_REGISTER_CLASS(PlanningComponent, apollo::cyber::ComponentBase)
// 展开为:
namespace {
  struct PlanningComponent_Registrar {
    PlanningComponent_Registrar() {
      // 注册类名到工厂函数的映射
      apollo::cyber::class_loader::utility::RegisterClass(
          "PlanningComponent",
          []() -> apollo::cyber::ComponentBase* {
            return new PlanningComponent();
          }
      );
    }
  };
  // 全局对象,在库加载时自动执行构造函数
  static PlanningComponent_Registrar g_PlanningComponent_registrar;
}
注册流程
1. 动态库被dlopen加载
   ↓
2. 执行全局对象的构造函数
   ↓
3. 调用RegisterClass()
   ↓
4. 将类名和工厂函数存入全局注册表
   ↓
5. ClassLoader可以通过类名查找工厂函数
   ↓
6. 调用工厂函数创建实例
全局注册表
// 简化的注册表实现
std::map<std::string, std::function<ComponentBase*()>> g_class_registry;
void RegisterClass(const std::string& class_name,
                   std::function<ComponentBase*()> factory) {
  g_class_registry[class_name] = factory;
}
ComponentBase* CreateClassObj(const std::string& class_name) {
  auto it = g_class_registry.find(class_name);
  if (it != g_class_registry.end()) {
    return it->second();  // 调用工厂函数
  }
  return nullptr;
}
为什么需要注册?
- 解耦: DAG文件不需要include头文件
 - 动态性: 运行时才决定创建哪个类
 - 扩展性: 新增Component无需修改mainboard代码
 
Reader与Writer配置
Reader配置详解
基本配置
readers: [
  {
    channel: "/apollo/prediction"
  }
]
最简单的配置,使用默认QoS和队列大小。
完整配置
readers: [
  {
    # 消息通道名称
    channel: "/apollo/canbus/chassis"
    # 服务质量配置
    qos_profile: {
      depth : 15  # 历史消息深度
    }
    # 待处理队列大小
    pending_queue_size: 50
  }
]
字段说明:
- 
channel: 消息通道名称
- 必须与发布者的channel一致
 - 通常以<code>/apollo/</code>开头
 
 - 
qos_profile.depth: 消息历史深度
- 保存最近N条消息
 - 用于晚加入的订阅者获取历史数据
 - 默认值: 1
 
 - 
pending_queue_size: 未处理消息队列容量
- 当处理速度慢于接收速度时的缓冲
 - 超出容量时会丢弃旧消息
 - 默认值: 1
 
 
QoS配置的影响
depth = 1 (默认):
发布者: [M1] → [M2] → [M3]
订阅者:                   ↓ 只能收到M3
depth = 3:
发布者: [M1] → [M2] → [M3]
订阅者: ↓      ↓      ↓     可以收到M1, M2, M3
使用场景:
- <code>depth=1</code>: 只关心最新数据(如传感器读数)
 - <code>depth>1</code>: 需要历史数据(如轨迹预测)
 
pending_queue_size的作用
消息到达速度: 100 msg/s
处理速度:      50 msg/s
pending_queue_size = 10:
队列: [M1][M2][M3]...[M10] → [M11到达,M1被丢弃]
设置建议:
- 处理速度快: 较小值(1-10)
 - 处理速度慢: 较大值(30-100)
 - 不允许丢消息: 设置足够大的值
 
Writer配置
Writer不在DAG中配置,而是在Component代码中创建:
// planning_component.cc
bool PlanningComponent::Init() {
  // 创建Writer发布规划结果
  planning_writer_ = node_->CreateWriter<ADCTrajectory>(
      "/apollo/planning"  // channel名称
  );
  return true;
}
bool PlanningComponent::Proc(...) {
  // 创建消息
  auto trajectory = std::make_shared<ADCTrajectory>();
  // ... 填充数据
  // 发布消息
  planning_writer_->Write(trajectory);
  return true;
}
Reader与Writer的通信
Planning Component                Control Component
     ↓                                   ↑
Writer("/apollo/planning")    Reader("/apollo/planning")
     ↓                                   ↑
     └────── CyberRT Transport ──────────┘
通信流程:
- Planning的Writer发布消息到<code>/apollo/planning</code>
 - CyberRT的Transport层接收消息
 - 查找订阅了此channel的Reader
 - 将消息分发给所有Reader
 - Reader触发Component的Process/Proc方法
 
实战示例
示例1: 创建一个简单的Component
目标
创建一个接收Driver消息并打印的Component。
步骤1: 定义Proto消息
// examples.proto
syntax = "proto2";
package apollo.cyber.examples.proto;
message Driver {
  optional string content = 1;
  optional uint64 msg_id = 2;
  optional uint64 timestamp = 3;
}
步骤2: 实现Component
// simple_component.h
#pragma once
#include "cyber/component/component.h"
#include "cyber/examples/proto/examples.pb.h"
using apollo::cyber::Component;
using apollo::cyber::examples::proto::Driver;
class SimpleComponent : public Component<Driver> {
 public:
  bool Init() override;
  bool Proc(const std::shared_ptr<Driver>& msg) override;
};
CYBER_REGISTER_COMPONENT(SimpleComponent)
// simple_component.cc
#include "simple_component.h"
bool SimpleComponent::Init() {
  AINFO << "SimpleComponent Init";
  return true;
}
bool SimpleComponent::Proc(const std::shared_ptr<Driver>& msg) {
  AINFO << "Received message " << msg->msg_id()
        << " with content: " << msg->content();
  return true;
}
步骤3: 编写BUILD文件
cc_binary(
    name = "libsimple_component.so",
    linkshared = True,
    linkstatic = False,
    srcs = [
        "simple_component.h",
        "simple_component.cc",
    ],
    deps = [
        "//cyber",
        "//cyber/examples/proto:examples_cc_proto",
    ],
)
步骤4: 创建DAG配置
# simple.dag
module_config {
  module_library : "/apollo/bazel-bin/examples/libsimple_component.so"
  components {
    class_name : "SimpleComponent"
    config {
      name: "simple"
      readers: [
        {
          channel: "/apollo/test/driver"
        }
      ]
    }
  }
}
步骤5: 编译和运行
# 编译
bazel build //examples:libsimple_component.so
# 启动Component
mainboard -d simple.dag
# 在另一个终端发布测试消息
cyber_channel pub /apollo/test/driver \
  examples.proto.Driver \
  '{"content": "Hello", "msg_id": 1}'
示例2: 多输入Component
目标
创建一个融合两路传感器数据的Component。
实现
// fusion_component.h
#pragma once
#include "cyber/component/component.h"
#include "sensor_msgs/image.pb.h"
#include "sensor_msgs/pointcloud.pb.h"
class FusionComponent : public Component<
    sensor_msgs::Image,       // M0: 相机数据
    sensor_msgs::PointCloud   // M1: 激光雷达数据
> {
 public:
  bool Init() override;
  bool Proc(const std::shared_ptr<sensor_msgs::Image>& image,
            const std::shared_ptr<sensor_msgs::PointCloud>& cloud) override;
};
CYBER_REGISTER_COMPONENT(FusionComponent)
// fusion_component.cc
bool FusionComponent::Init() {
  AINFO << "FusionComponent Init";
  return true;
}
bool FusionComponent::Proc(
    const std::shared_ptr<sensor_msgs::Image>& image,
    const std::shared_ptr<sensor_msgs::PointCloud>& cloud) {
  AINFO << "Fusing image (timestamp: " << image->header().timestamp_sec()
        << ") with pointcloud (timestamp: " << cloud->header().timestamp_sec()
        << ")";
  // 执行融合算法...
  return true;
}
DAG配置
# fusion.dag
module_config {
  module_library : "/apollo/bazel-bin/perception/libfusion_component.so"
  components {
    class_name : "FusionComponent"
    config {
      name: "sensor_fusion"
      # 注意:readers的顺序必须与Component模板参数顺序一致
      readers: [
        {
          channel: "/apollo/sensor/camera/front/image"  # M0
        },
        {
          channel: "/apollo/sensor/lidar/pointcloud"    # M1
        }
      ]
    }
  }
}
重要: readers的顺序必须与Component模板参数一致!
示例3: TimerComponent
目标
创建一个每秒打印一次心跳的Component。
实现
// heartbeat_component.h
#pragma once
#include "cyber/component/timer_component.h"
class HeartbeatComponent : public apollo::cyber::TimerComponent {
 public:
  bool Init() override;
  bool Proc() override;
 private:
  uint64_t count_ = 0;
};
CYBER_REGISTER_COMPONENT(HeartbeatComponent)
// heartbeat_component.cc
#include "heartbeat_component.h"
bool HeartbeatComponent::Init() {
  AINFO << "HeartbeatComponent Init";
  return true;
}
bool HeartbeatComponent::Proc() {
  count_++;
  AINFO << "Heartbeat #" << count_ << " at " << apollo::cyber::Time::Now();
  return true;
}
DAG配置
# heartbeat.dag
module_config {
  module_library : "/apollo/bazel-bin/monitor/libheartbeat_component.so"
  timer_components {
    class_name : "HeartbeatComponent"
    config {
      name: "heartbeat"
      interval: 1000  # 1秒 = 1000毫秒
    }
  }
}
运行
mainboard -d heartbeat.dag
# 输出:
# [INFO] HeartbeatComponent Init
# [INFO] Heartbeat #1 at 1634567890.123
# [INFO] Heartbeat #2 at 1634567891.123
# [INFO] Heartbeat #3 at 1634567892.123
# ...
示例4: 混合使用Component和TimerComponent
场景
一个模块包含数据采集组件(消息驱动)和数据处理组件(定时触发)。
DAG配置
module_config {
  module_library : "/apollo/bazel-bin/mymodule/libmymodule.so"
  # 组件1: 接收传感器数据(消息驱动)
  components {
    class_name : "SensorComponent"
    config {
      name: "sensor_receiver"
      readers: [
        { channel: "/apollo/sensor/data" }
      ]
    }
  }
  # 组件2: 定期处理数据(定时驱动)
  timer_components {
    class_name : "ProcessorComponent"
    config {
      name: "data_processor"
      interval: 100  # 100ms处理一次
    }
  }
}
最佳实践
1. DAG文件组织
建议结构
/apollo/modules/<模块名>/
  ├── dag/
  │   ├── <模块名>.dag              # 主DAG文件
  │   ├── <模块名>_test.dag         # 测试DAG
  │   └── <模块名>_<变体>.dag       # 变体配置
  ├── conf/
  │   ├── <模块名>_config.pb.txt    # Protobuf配置
  │   └── <模块名>.conf              # GFlags配置
  └── proto/
      └── <模块名>_config.proto      # 配置Proto定义
命名规范
- DAG文件: 小写,下划线分隔
 - Component类名: 大驼峰,以Component结尾
 - 组件名称: 小写,下划线分隔,简洁明了
 
2. Reader配置优化
根据场景选择QoS
# 场景1: 实时性要求高(控制指令)
readers: [
  {
    channel: "/apollo/control/command"
    qos_profile: { depth: 1 }         # 只要最新的
    pending_queue_size: 1              # 不积压
  }
]
# 场景2: 需要历史数据(障碍物追踪)
readers: [
  {
    channel: "/apollo/perception/obstacles"
    qos_profile: { depth: 10 }        # 保留最近10帧
    pending_queue_size: 20             # 允许一定积压
  }
]
# 场景3: 不允许丢消息(日志记录)
readers: [
  {
    channel: "/apollo/monitor/events"
    qos_profile: { depth: 100 }       # 保留更多历史
    pending_queue_size: 1000           # 大缓冲区
  }
]
3. Component设计原则
单一职责
// 好的设计:一个Component做一件事
class LaneDetectionComponent : public Component<Image> {
  bool Proc(const std::shared_ptr<Image>& img) {
    // 只做车道线检测
  }
};
class ObstacleDetectionComponent : public Component<Image> {
  bool Proc(const std::shared_ptr<Image>& img) {
    // 只做障碍物检测
  }
};
// 不好的设计:一个Component做太多事
class PerceptionComponent : public Component<Image> {
  bool Proc(const std::shared_ptr<Image>& img) {
    // 车道线检测
    // 障碍物检测
    // 红绿灯识别
    // 可行驶区域分割
    // ...
  }
};
配置外部化
// 好的设计:参数从配置文件读取
bool PlanningComponent::Init() {
  PlanningConfig config;
  if (!GetProtoConfig(&config)) {
    return false;
  }
  max_speed_ = config.max_speed();
  safe_distance_ = config.safe_distance();
  return true;
}
// 不好的设计:硬编码参数
bool PlanningComponent::Init() {
  max_speed_ = 30.0;  // 写死在代码里
  safe_distance_ = 5.0;
  return true;
}
4. 错误处理
Init()中的验证
bool MyComponent::Init() {
  // 1. 读取配置
  MyConfig config;
  if (!GetProtoConfig(&config)) {
    AERROR << "Failed to load config";
    return false;  // 初始化失败会阻止模块启动
  }
  // 2. 验证参数
  if (config.max_speed() <= 0) {
    AERROR << "Invalid max_speed: " << config.max_speed();
    return false;
  }
  // 3. 初始化资源
  writer_ = node_->CreateWriter<MyMsg>("/apollo/my/channel");
  if (writer_ == nullptr) {
    AERROR << "Failed to create writer";
    return false;
  }
  return true;
}
Proc()中的异常处理
bool MyComponent::Proc(const std::shared_ptr<InputMsg>& msg) {
  // 1. 输入验证
  if (msg == nullptr) {
    AERROR << "Received null message";
    return false;  // 返回false会记录错误日志
  }
  // 2. 业务逻辑
  try {
    auto result = ProcessData(*msg);
    // 3. 发布结果
    writer_->Write(result);
    return true;
  } catch (const std::exception& e) {
    AERROR << "Processing failed: " << e.what();
    return false;
  }
}
5. 性能优化
避免频繁内存分配
// 好的设计:复用对象
class MyComponent : public Component<Input> {
 private:
  OutputMsg output_;  // 成员变量,复用
 public:
  bool Proc(const std::shared_ptr<Input>& input) {
    output_.Clear();  // 清空而非重新分配
    // 填充output_
    writer_->Write(output_);
    return true;
  }
};
// 不好的设计:每次都new
class MyComponent : public Component<Input> {
 public:
  bool Proc(const std::shared_ptr<Input>& input) {
    auto output = std::make_shared<OutputMsg>();  // 频繁分配
    // 填充output
    writer_->Write(output);
    return true;
  }
};
控制日志频率
bool MyComponent::Proc(const std::shared_ptr<Input>& input) {
  static uint64_t count = 0;
  count++;
  // 每100次打印一次,而非每次都打印
  if (count % 100 == 0) {
    AINFO << "Processed " << count << " messages";
  }
  // 业务逻辑...
  return true;
}
6. 调试技巧
使用cyber_monitor监控
# 启动模块
mainboard -d planning.dag
# 在另一个终端监控消息
cyber_monitor
# 或查看特定channel
cyber_channel echo /apollo/planning
使用cyber_recorder录制和回放
# 录制消息
cyber_recorder record -a  # 录制所有channel
# 回放消息
cyber_recorder play -f <record_file>
# 这样可以离线调试Component,无需实际硬件
添加详细日志
bool MyComponent::Proc(const std::shared_ptr<Input>& input) {
  ADEBUG << "========== Proc Start ==========";
  ADEBUG << "Input timestamp: " << input->timestamp();
  ADEBUG << "Input size: " << input->data_size();
  // 业务逻辑...
  ADEBUG << "Output generated: " << output.DebugString();
  ADEBUG << "========== Proc End ==========";
  return true;
}
7. 配置版本管理
使用Git管理DAG和配置
# 提交前检查
git diff modules/planning/dag/planning.dag
git diff modules/planning/conf/planning_config.pb.txt
# 添加有意义的commit message
git commit -m "feat(planning): add emergency stop scenario
- Add emergency_stop.dag
- Update planning_config with new parameters
- Increase QoS depth for obstacle channel"
配置文件注释
# planning.dag
module_config {
  module_library : "/apollo/bazel-bin/modules/planning/libplanning_component.so"
  components {
    class_name : "PlanningComponent"
    config {
      name: "planning"
      readers: [
        {
          # 订阅预测模块的输出
          # 注意:此channel必须在prediction模块启动后才可用
          channel: "/apollo/prediction"
        },
        {
          # 底盘状态,高频更新(100Hz)
          # depth=15保证不丢失最近的状态
          channel: "/apollo/canbus/chassis"
          qos_profile: { depth : 15 }
          pending_queue_size: 50
        }
      ]
    }
  }
}
常见问题
Q1: Component创建失败怎么办?
错误信息:
[ERROR] CreateClassObj failed, ensure class has been registered.
classname: MyComponent, lib: /apollo/bazel-bin/mymodule/libmymodule.so
可能原因:
- 忘记添加<code>CYBER_REGISTER_COMPONENT(MyComponent)</code>
 - class_name拼写错误(区分大小写)
 - 动态库未正确编译或链接
 
解决方法:
// 1. 确保注册宏存在
CYBER_REGISTER_COMPONENT(MyComponent)  // 在.cc文件中
// 2. 检查类名是否一致
class_name : "MyComponent"  // DAG中
CYBER_REGISTER_COMPONENT(MyComponent)  // 代码中
// 3. 重新编译
bazel build //mymodule:libmymodule.so
Q2: Reader收不到消息?
可能原因:
- channel名称不匹配
 - 消息类型不匹配
 - 发布者未启动
 - QoS配置不当
 
调试步骤:
# 1. 检查channel是否存在
cyber_channel list
# 2. 检查消息是否发布
cyber_channel echo /apollo/my/channel
# 3. 检查消息类型
cyber_channel info /apollo/my/channel
# 4. 确认Component已启动
cyber_node list
Q3: 多输入Component何时触发?
规则:
- 所有输入通道至少收到1条消息后才触发
 - 之后每当第一个通道收到新消息时触发
 - 其他通道使用最新的消息
 
示例:
Component<M0, M1>
时刻1: M0收到消息 → 不触发(M1还没有消息)
时刻2: M1收到消息 → 触发Proc(M0最新, M1最新)
时刻3: M0收到消息 → 触发Proc(M0新, M1旧)
时刻4: M1收到消息 → 不触发(M1不是第一个输入)
时刻5: M0收到消息 → 触发Proc(M0新, M1新)
Q4: 如何在Component之间共享数据?
方法1: 通过消息通道(推荐)
// ComponentA
bool ComponentA::Proc(...) {
  auto data = std::make_shared<MyData>();
  // 填充data
  writer_->Write(data);  // 发布到channel
  return true;
}
// ComponentB
bool ComponentB::Proc(const std::shared_ptr<MyData>& data) {
  // 使用data
  return true;
}
方法2: 通过全局单例(不推荐,仅用于只读配置)
class SharedData {
 public:
  static SharedData& Instance() {
    static SharedData instance;
    return instance;
  }
  const Config& GetConfig() const { return config_; }
 private:
  Config config_;
};
Q5: DAG文件可以动态修改吗?
答案: 不可以。
DAG文件在mainboard启动时加载,运行时修改不会生效。
如果需要修改:
- 停止mainboard(Ctrl+C)
 - 修改DAG文件
 - 重新启动mainboard
 
动态配置的正确方式:
- 使用Protobuf配置文件(可以在Init中重新加载)
 - 使用GFlags(可以通过cyber_launch修改)
 - 订阅配置更新消息
 
总结
DAG配置的核心概念
- 声明式配置: DAG文件描述"要加载什么",而非"如何加载"
 - 模块化设计: 一个模块一个DAG,便于管理和复用
 - 松耦合: 模块之间通过消息通道通信,不直接依赖
 
Mainboard启动机制
命令行 → 解析DAG → 加载库 → 创建实例 → 初始化 → 订阅消息 → 等待触发
Component两种模式
- 消息驱动: 响应式处理,按需执行
 - 定时驱动: 周期性处理,固定频率
 
关键设计模式
- 工厂模式: 动态创建Component
 - 注册模式: 类名到工厂函数的映射
 - 发布订阅: Reader/Writer解耦通信
 
使用建议
- ✅ 合理划分Component粒度
 - ✅ 配置参数外部化
 - ✅ 完善错误处理
 - ✅ 添加详细日志
 - ✅ 注意性能优化
 - ✅ 版本化管理配置文件
 
参考文件
- 
Proto定义:
- <code>/apollo/cyber/proto/dag_conf.proto:1-26</code>
 - <code>/apollo/cyber/proto/component_conf.proto:1-28</code>
 
 - 
源码实现:
- <code>/apollo/cyber/mainboard/module_controller.cc:37-137</code>
 - <code>/apollo/cyber/component/component.h:56-557</code>
 - <code>/apollo/cyber/component/timer_component.h:35-67</code>
 
 - 
配置示例:
- <code>/apollo/modules/planning/dag/planning.dag:1-31</code>
 - <code>/apollo/modules/control/dag/control.dag:1-12</code>
 - <code>/apollo/cyber/examples/common_component_example/common.dag:1-17</code>
 
 
