【Apollo】Apollo DAG 配置文件详解与模块启动机制

DAG概述

什么是DAG?

DAG = Directed Acyclic Graph(有向无环图)

在Apollo Cyber RT中,DAG文件是模块配置文件,定义了:

  • 要加载哪些动态库(.so文件)
  • 要创建哪些Component实例
  • 每个Component订阅哪些消息通道
  • Component的配置参数

DAG的作用

DAG配置文件
    ↓
描述模块的组成结构
    ↓
Mainboard根据DAG加载模块
    ↓
动态创建Component实例
    ↓
自动建立消息订阅关系

为什么叫DAG?

虽然配置文件不直接表示图结构,但Apollo的模块之间通过消息传递形成的依赖关系构成了一个有向无环图:

Perception → Prediction → Planning → Control
     ↓           ↓           ↓          ↓
  感知结果    预测轨迹    规划路径   控制指令

DAG确保了模块之间不会形成循环依赖,保证系统的稳定性。

DAG文件的位置

Apollo中所有模块的DAG文件都遵循统一的目录结构:

/apollo/modules/<模块名>/dag/<模块名>.dag

示例:
/apollo/modules/planning/dag/planning.dag
/apollo/modules/control/dag/control.dag
/apollo/modules/perception/production/dag/dag_streaming_perception.dag

DAG的Protobuf结构

DAG文件使用Protocol Buffers格式定义,具有严格的类型检查。

核心Proto定义

1. DagConfig (顶层结构)

文件位置: <code>/apollo/cyber/proto/dag_conf.proto</code>

message DagConfig {
  repeated ModuleConfig module_config = 1;
}

一个DAG文件包含一个或多个ModuleConfig

2. ModuleConfig (模块配置)

message ModuleConfig {
  optional string module_library = 1;              // 动态库路径
  repeated ComponentInfo components = 2;           // 普通组件列表
  repeated TimerComponentInfo timer_components = 3; // 定时组件列表
}

字段说明:

  • <code>module_library</code>: .so动态库的绝对路径或相对路径
  • <code>components</code>: 消息驱动的组件(收到消息时触发)
  • <code>timer_components</code>: 定时触发的组件(按固定频率触发)

3. ComponentInfo (组件信息)

message ComponentInfo {
  optional string class_name = 1;        // 组件类名
  optional ComponentConfig config = 2;   // 组件配置
}

4. ComponentConfig (组件配置)

文件位置: <code>/apollo/cyber/proto/component_conf.proto</code>

message ComponentConfig {
  optional string name = 1;                // 组件名称(唯一标识)
  optional string config_file_path = 2;    // Protobuf配置文件路径
  optional string flag_file_path = 3;      // GFlags配置文件路径
  repeated ReaderOption readers = 4;       // 订阅的消息通道列表
}

5. ReaderOption (消息订阅配置)

message ReaderOption {
  optional string channel = 1;               // 消息通道名
  optional QosProfile qos_profile = 2;       // 服务质量配置
  optional uint32 pending_queue_size = 3     // 未处理消息队列大小
      [default = 1];
}

QosProfile (服务质量):

message QosProfile {
  optional uint32 depth = 1 [default = 1];  // 消息缓存深度
}

6. TimerComponentInfo (定时组件信息)

message TimerComponentInfo {
  optional string class_name = 1;
  optional TimerComponentConfig config = 2;
}

7. TimerComponentConfig (定时组件配置)

message TimerComponentConfig {
  optional string name = 1;                // 组件名称
  optional string config_file_path = 2;    // 配置文件路径
  optional string flag_file_path = 3;      // Flag文件路径
  optional uint32 interval = 4;            // 触发间隔(毫秒)
}

完整结构层次

DagConfig
  └─ ModuleConfig (可多个)
      ├─ module_library: string
      ├─ ComponentInfo (可多个)
      │   ├─ class_name: string
      │   └─ ComponentConfig
      │       ├─ name: string
      │       ├─ config_file_path: string
      │       ├─ flag_file_path: string
      │       └─ ReaderOption (可多个)
      │           ├─ channel: string
      │           ├─ qos_profile
      │           │   └─ depth: uint32
      │           └─ pending_queue_size: uint32
      └─ TimerComponentInfo (可多个)
          ├─ class_name: string
          └─ TimerComponentConfig
              ├─ name: string
              ├─ config_file_path: string
              ├─ flag_file_path: string
              └─ interval: uint32

Component类型详解

Apollo Cyber RT提供了两种类型的Component:

1. Component (消息驱动组件)

特点:

  • 订阅一个或多个消息通道
  • 当消息到达时触发处理
  • 支持1-4个输入通道

类模板:

template &lt;typename M0 = NullType,
          typename M1 = NullType,
          typename M2 = NullType,
          typename M3 = NullType&gt;
class Component : public ComponentBase;

使用场景:

  • Planning: 订阅Prediction、Chassis、Localization
  • Control: 订阅Planning的轨迹输出
  • Prediction: 订阅Perception的障碍物信息

示例:

class PlanningComponent : public Component&lt;
    prediction::PredictionObstacles,  // M0
    canbus::Chassis,                  // M1
    localization::LocalizationEstimate // M2
> {
 public:
  bool Init() override;

  // 三个消息到齐后触发
  bool Proc(const std::shared_ptr&lt;prediction::PredictionObstacles&gt;&amp; pred,
            const std::shared_ptr&lt;canbus::Chassis&gt;&amp; chassis,
            const std::shared_ptr&lt;localization::LocalizationEstimate&gt;&amp; loc) override;
};

触发机制:

  • 单输入: 收到消息立即触发
  • 多输入: 等待所有输入通道至少有一条消息后触发

2. TimerComponent (定时触发组件)

特点:

  • 不订阅消息通道
  • 按固定时间间隔触发
  • 主动从其他模块拉取数据

基类:

class TimerComponent : public ComponentBase {
 public:
  bool Initialize(const TimerComponentConfig&amp; config) override;
  virtual bool Proc() = 0;  // 定时调用

 private:
  uint64_t interval_ = 0;    // 触发间隔(毫秒)
  std::unique_ptr&lt;Timer&gt; timer_;
};

使用场景:

  • Control: 以固定频率(10ms)输出控制指令
  • Monitor: 定期检查系统状态
  • 周期性任务(日志记录、健康检查等)

示例:

class ControlComponent : public TimerComponent {
 public:
  bool Init() override;

  // 每10ms调用一次
  bool Proc() override;
};

Component vs TimerComponent对比

特性 Component TimerComponent
触发方式 消息驱动 定时触发
输入 1-4个通道 无(主动拉取)
配置字段 readers interval
适用场景 响应式处理 周期性任务
延迟 取决于消息到达 固定周期
CPU占用 有消息时执行 持续执行

DAG配置文件详解

示例1: Planning模块(消息驱动)

文件: <code>/apollo/modules/planning/dag/planning.dag</code>

# Define all components in DAG streaming.
module_config {
  # 1. 指定动态库路径
  module_library : &quot;/apollo/bazel-bin/modules/planning/libplanning_component.so&quot;

  # 2. 定义组件
  components {
    # 2.1 组件类名(必须在库中注册)
    class_name : &quot;PlanningComponent&quot;

    # 2.2 组件配置
    config {
      # 组件名称(唯一标识)
      name: &quot;planning&quot;

      # Protobuf配置文件
      config_file_path: &quot;/apollo/modules/planning/conf/planning_config.pb.txt&quot;

      # GFlags配置文件
      flag_file_path: &quot;/apollo/modules/planning/conf/planning.conf&quot;

      # 订阅的消息通道
      readers: [
        {
          channel: &quot;/apollo/prediction&quot;
        },
        {
          channel: &quot;/apollo/canbus/chassis&quot;
          qos_profile: {
            depth : 15  # 缓存最近15条消息
          }
          pending_queue_size: 50  # 未处理队列容量50
        },
        {
          channel: &quot;/apollo/localization/pose&quot;
          qos_profile: {
            depth : 15
          }
          pending_queue_size: 50
        }
      ]
    }
  }
}

详细解释:

  1. module_library:

    • 编译后的动态库路径
    • 通常以<code>lib*.so</code>形式
    • 可以是绝对路径或相对于<code>/apollo</code>的路径
  2. class_name:

    • 必须与代码中<code>CYBER_REGISTER_COMPONENT</code>注册的类名一致
    • 区分大小写
  3. name:

    • Component的唯一标识符
    • 用于创建Cyber Node
    • 日志中会显示此名称
  4. config_file_path:

    • Protobuf格式的业务配置文件
    • 通过<code>GetProtoConfig()</code>方法读取
    • 示例:规划算法参数、场景配置等
  5. flag_file_path:

    • GFlags格式的配置文件
    • 包含命令行标志参数
    • 可以覆盖代码中的默认值
  6. readers:

    • 定义订阅的消息通道列表
    • 顺序对应Component模板参数顺序(M0, M1, M2…)
    • 每个Reader可以配置QoS和队列大小
  7. QoS配置:

    • <code>depth</code>: 消息历史记录深度(用于晚加入的订阅者)
    • <code>pending_queue_size</code>: 未处理消息的队列容量(避免消息堆积)

示例2: Control模块(定时驱动)

文件: <code>/apollo/modules/control/dag/control.dag</code>

module_config {
  module_library : &quot;/apollo/bazel-bin/modules/control/libcontrol_component.so&quot;

  # 使用timer_components而不是components
  timer_components {
    class_name : &quot;ControlComponent&quot;

    config {
      name: &quot;control&quot;
      flag_file_path: &quot;/apollo/modules/control/conf/control.conf&quot;

      # 定时间隔:10毫秒(100Hz)
      interval: 10
    }
  }
}

关键区别:

  • 使用<code>timer_components</code>字段
  • 配置中有<code>interval</code>参数(毫秒)
  • 没有<code>readers</code>字段(不订阅消息)

示例3: 多组件模块

一个DAG文件可以包含多个组件:

module_config {
  module_library : &quot;/apollo/bazel-bin/modules/perception/libperception_component.so&quot;

  # 第一个组件:相机感知
  components {
    class_name : &quot;CameraPerceptionComponent&quot;
    config {
      name: &quot;camera_perception&quot;
      readers: [
        { channel: &quot;/apollo/sensor/camera/front&quot; }
      ]
    }
  }

  # 第二个组件:激光雷达感知
  components {
    class_name : &quot;LidarPerceptionComponent&quot;
    config {
      name: &quot;lidar_perception&quot;
      readers: [
        { channel: &quot;/apollo/sensor/lidar/compensator/PointCloud2&quot; }
      ]
    }
  }

  # 第三个组件:融合组件
  components {
    class_name : &quot;FusionComponent&quot;
    config {
      name: &quot;fusion&quot;
      readers: [
        { channel: &quot;/apollo/perception/obstacles/camera&quot; },
        { channel: &quot;/apollo/perception/obstacles/lidar&quot; }
      ]
    }
  }
}

示例4: 多库多组件

一个DAG文件也可以加载多个动态库:

# 第一个模块
module_config {
  module_library : &quot;/apollo/bazel-bin/modules/drivers/camera/libcamera_component.so&quot;
  components {
    class_name : &quot;CameraComponent&quot;
    config {
      name: &quot;camera_front&quot;
    }
  }
}

# 第二个模块
module_config {
  module_library : &quot;/apollo/bazel-bin/modules/drivers/lidar/liblidar_component.so&quot;
  components {
    class_name : &quot;LidarComponent&quot;
    config {
      name: &quot;lidar_top&quot;
    }
  }
}

Mainboard加载DAG流程

完整加载流程

1. Mainboard启动
   └─ 解析命令行参数: -d dag_file.dag

2. ModuleController::LoadAll()
   ├─ 解析DAG文件路径(支持相对/绝对路径)
   ├─ 遍历每个DAG文件
   └─ 调用LoadModule(path)

3. ModuleController::LoadModule(path)
   ├─ 读取DAG文件内容
   ├─ 解析为Protobuf对象: DagConfig
   └─ 调用LoadModule(dag_config)

4. ModuleController::LoadModule(dag_config)
   ├─ 遍历每个module_config
   │   ├─ 获取module_library路径
   │   └─ ClassLoaderManager::LoadLibrary(path)
   │       └─ dlopen加载.so文件
   │
   ├─ 遍历components
   │   ├─ 获取class_name
   │   ├─ ClassLoaderManager::CreateClassObj&lt;ComponentBase&gt;(class_name)
   │   │   └─ 查找全局注册表,调用工厂函数创建实例
   │   ├─ component-&gt;Initialize(config)
   │   │   ├─ 创建Node(config.name)
   │   │   ├─ LoadConfigFiles(config)
   │   │   │   ├─ 加载config_file_path
   │   │   │   └─ 加载flag_file_path
   │   │   ├─ 调用Init() [用户实现]
   │   │   ├─ 创建Reader订阅消息
   │   │   │   └─ node_-&gt;CreateReader&lt;M&gt;(channel, callback)
   │   │   └─ Scheduler::CreateTask()
   │   │       └─ 注册到调度器
   │   └─ 保存到component_list_
   │
   └─ 遍历timer_components
       └─ 类似处理(创建Timer而非Reader)

5. 所有组件加载完成
   └─ 等待消息到达或定时器触发

关键步骤详解

步骤1: DAG文件路径解析

// 支持三种路径格式

// 格式1: 纯文件名 → 在 /apollo/dag 下查找
mainboard -d planning.dag
// 实际路径: /apollo/dag/planning.dag

// 格式2: 绝对路径
mainboard -d /apollo/modules/planning/dag/planning.dag

// 格式3: 相对路径(相对于当前目录或/apollo)
mainboard -d modules/planning/dag/planning.dag

实现代码:

for (auto&amp; dag_conf : args_.GetDAGConfList()) {
  std::string module_path = &quot;&quot;;
  if (dag_conf == common::GetFileName(dag_conf)) {
    // 情况1: 纯文件名
    module_path = common::GetAbsolutePath(dag_root_path, dag_conf);
  } else if (dag_conf[0] == &#039;/&#039;) {
    // 情况2: 绝对路径
    module_path = dag_conf;
  } else {
    // 情况3: 相对路径
    module_path = common::GetAbsolutePath(current_path, dag_conf);
    if (!common::PathExists(module_path)) {
      module_path = common::GetAbsolutePath(work_root, dag_conf);
    }
  }
  paths.emplace_back(std::move(module_path));
}

步骤2: Protobuf解析

DagConfig dag_config;
if (!common::GetProtoFromFile(path, &amp;dag_config)) {
  AERROR &lt;&lt; &quot;Get proto failed, file: &quot; &lt;&lt; path;
  return false;
}

DAG文件虽然看起来像文本格式,但实际上是Protobuf的Text Format,可以自动转换为对象。

步骤3: 动态库加载

for (auto module_config : dag_config.module_config()) {
  std::string load_path = module_config.module_library();

  // 加载动态库
  class_loader_manager_.LoadLibrary(load_path);
}

ClassLoaderManager的作用:

  • 避免重复加载同一个库
  • 管理库的生命周期
  • 提供统一的类创建接口

步骤4: 组件创建与初始化

for (auto&amp; component : module_config.components()) {
  const std::string&amp; class_name = component.class_name();

  // 1. 创建实例(通过注册表)
  std::shared_ptr&lt;ComponentBase&gt; base =
      class_loader_manager_.CreateClassObj&lt;ComponentBase&gt;(class_name);

  if (base == nullptr) {
    AERROR &lt;&lt; &quot;Failed to create component: &quot; &lt;&lt; class_name;
    return false;
  }

  // 2. 初始化组件
  if (!base-&gt;Initialize(component.config())) {
    AERROR &lt;&lt; &quot;Failed to initialize component: &quot; &lt;&lt; class_name;
    return false;
  }

  // 3. 保存到列表(保持生命周期)
  component_list_.emplace_back(std::move(base));
}

Initialize做了什么:

bool Component&lt;M0&gt;::Initialize(const ComponentConfig&amp; config) {
  // 1. 创建Cyber Node(通信节点)
  node_.reset(new Node(config.name()));

  // 2. 加载配置文件
  LoadConfigFiles(config);
  // ├─ 加载config_file_path(业务配置)
  // └─ 加载flag_file_path(GFlags)

  // 3. 调用用户的Init()方法
  if (!Init()) {
    return false;
  }

  // 4. 创建Reader订阅消息
  ReaderConfig reader_cfg;
  reader_cfg.channel_name = config.readers(0).channel();
  reader_cfg.qos_profile.CopyFrom(config.readers(0).qos_profile());

  auto callback = [self](const std::shared_ptr&lt;M0&gt;&amp; msg) {
    self-&gt;Process(msg);  // 收到消息时调用
  };

  auto reader = node_-&gt;CreateReader&lt;M0&gt;(reader_cfg, callback);
  readers_.emplace_back(std::move(reader));

  // 5. 注册到Scheduler
  return scheduler::Instance()-&gt;CreateTask(factory, node_-&gt;Name());
}

组件注册机制

CYBER_REGISTER_COMPONENT宏

每个Component类必须在源文件中注册:

// planning_component.cc
#include &quot;modules/planning/planning_component.h&quot;

CYBER_REGISTER_COMPONENT(PlanningComponent)

宏展开

#define CYBER_REGISTER_COMPONENT(name) \
  CLASS_LOADER_REGISTER_CLASS(name, apollo::cyber::ComponentBase)

进一步展开:

CLASS_LOADER_REGISTER_CLASS(PlanningComponent, apollo::cyber::ComponentBase)

// 展开为:
namespace {
  struct PlanningComponent_Registrar {
    PlanningComponent_Registrar() {
      // 注册类名到工厂函数的映射
      apollo::cyber::class_loader::utility::RegisterClass(
          &quot;PlanningComponent&quot;,
          []() -&gt; apollo::cyber::ComponentBase* {
            return new PlanningComponent();
          }
      );
    }
  };

  // 全局对象,在库加载时自动执行构造函数
  static PlanningComponent_Registrar g_PlanningComponent_registrar;
}

注册流程

1. 动态库被dlopen加载
   ↓
2. 执行全局对象的构造函数
   ↓
3. 调用RegisterClass()
   ↓
4. 将类名和工厂函数存入全局注册表
   ↓
5. ClassLoader可以通过类名查找工厂函数
   ↓
6. 调用工厂函数创建实例

全局注册表

// 简化的注册表实现
std::map&lt;std::string, std::function&lt;ComponentBase*()&gt;&gt; g_class_registry;

void RegisterClass(const std::string&amp; class_name,
                   std::function&lt;ComponentBase*()&gt; factory) {
  g_class_registry[class_name] = factory;
}

ComponentBase* CreateClassObj(const std::string&amp; class_name) {
  auto it = g_class_registry.find(class_name);
  if (it != g_class_registry.end()) {
    return it-&gt;second();  // 调用工厂函数
  }
  return nullptr;
}

为什么需要注册?

  1. 解耦: DAG文件不需要include头文件
  2. 动态性: 运行时才决定创建哪个类
  3. 扩展性: 新增Component无需修改mainboard代码

Reader与Writer配置

Reader配置详解

基本配置

readers: [
  {
    channel: &quot;/apollo/prediction&quot;
  }
]

最简单的配置,使用默认QoS和队列大小。

完整配置

readers: [
  {
    # 消息通道名称
    channel: &quot;/apollo/canbus/chassis&quot;

    # 服务质量配置
    qos_profile: {
      depth : 15  # 历史消息深度
    }

    # 待处理队列大小
    pending_queue_size: 50
  }
]

字段说明:

  1. channel: 消息通道名称

    • 必须与发布者的channel一致
    • 通常以<code>/apollo/</code>开头
  2. qos_profile.depth: 消息历史深度

    • 保存最近N条消息
    • 用于晚加入的订阅者获取历史数据
    • 默认值: 1
  3. pending_queue_size: 未处理消息队列容量

    • 当处理速度慢于接收速度时的缓冲
    • 超出容量时会丢弃旧消息
    • 默认值: 1

QoS配置的影响

depth = 1 (默认):

发布者: [M1] → [M2] → [M3]
订阅者:                   ↓ 只能收到M3

depth = 3:

发布者: [M1] → [M2] → [M3]
订阅者: ↓      ↓      ↓     可以收到M1, M2, M3

使用场景:

  • <code>depth=1</code>: 只关心最新数据(如传感器读数)
  • <code>depth>1</code>: 需要历史数据(如轨迹预测)

pending_queue_size的作用

消息到达速度: 100 msg/s
处理速度:      50 msg/s

pending_queue_size = 10:
队列: [M1][M2][M3]...[M10] → [M11到达,M1被丢弃]

设置建议:

  • 处理速度快: 较小值(1-10)
  • 处理速度慢: 较大值(30-100)
  • 不允许丢消息: 设置足够大的值

Writer配置

Writer不在DAG中配置,而是在Component代码中创建:

// planning_component.cc
bool PlanningComponent::Init() {
  // 创建Writer发布规划结果
  planning_writer_ = node_-&gt;CreateWriter&lt;ADCTrajectory&gt;(
      &quot;/apollo/planning&quot;  // channel名称
  );

  return true;
}

bool PlanningComponent::Proc(...) {
  // 创建消息
  auto trajectory = std::make_shared&lt;ADCTrajectory&gt;();
  // ... 填充数据

  // 发布消息
  planning_writer_-&gt;Write(trajectory);

  return true;
}

Reader与Writer的通信

Planning Component                Control Component
     ↓                                   ↑
Writer(&quot;/apollo/planning&quot;)    Reader(&quot;/apollo/planning&quot;)
     ↓                                   ↑
     └────── CyberRT Transport ──────────┘

通信流程:

  1. Planning的Writer发布消息到<code>/apollo/planning</code>
  2. CyberRT的Transport层接收消息
  3. 查找订阅了此channel的Reader
  4. 将消息分发给所有Reader
  5. Reader触发Component的Process/Proc方法

实战示例

示例1: 创建一个简单的Component

目标

创建一个接收Driver消息并打印的Component。

步骤1: 定义Proto消息

// examples.proto
syntax = &quot;proto2&quot;;
package apollo.cyber.examples.proto;

message Driver {
  optional string content = 1;
  optional uint64 msg_id = 2;
  optional uint64 timestamp = 3;
}

步骤2: 实现Component

// simple_component.h
#pragma once

#include &quot;cyber/component/component.h&quot;
#include &quot;cyber/examples/proto/examples.pb.h&quot;

using apollo::cyber::Component;
using apollo::cyber::examples::proto::Driver;

class SimpleComponent : public Component&lt;Driver&gt; {
 public:
  bool Init() override;
  bool Proc(const std::shared_ptr&lt;Driver&gt;&amp; msg) override;
};

CYBER_REGISTER_COMPONENT(SimpleComponent)
// simple_component.cc
#include &quot;simple_component.h&quot;

bool SimpleComponent::Init() {
  AINFO &lt;&lt; &quot;SimpleComponent Init&quot;;
  return true;
}

bool SimpleComponent::Proc(const std::shared_ptr&lt;Driver&gt;&amp; msg) {
  AINFO &lt;&lt; &quot;Received message &quot; &lt;&lt; msg-&gt;msg_id()
        &lt;&lt; &quot; with content: &quot; &lt;&lt; msg-&gt;content();
  return true;
}

步骤3: 编写BUILD文件

cc_binary(
    name = &quot;libsimple_component.so&quot;,
    linkshared = True,
    linkstatic = False,
    srcs = [
        &quot;simple_component.h&quot;,
        &quot;simple_component.cc&quot;,
    ],
    deps = [
        &quot;//cyber&quot;,
        &quot;//cyber/examples/proto:examples_cc_proto&quot;,
    ],
)

步骤4: 创建DAG配置

# simple.dag
module_config {
  module_library : &quot;/apollo/bazel-bin/examples/libsimple_component.so&quot;

  components {
    class_name : &quot;SimpleComponent&quot;
    config {
      name: &quot;simple&quot;
      readers: [
        {
          channel: &quot;/apollo/test/driver&quot;
        }
      ]
    }
  }
}

步骤5: 编译和运行

# 编译
bazel build //examples:libsimple_component.so

# 启动Component
mainboard -d simple.dag

# 在另一个终端发布测试消息
cyber_channel pub /apollo/test/driver \
  examples.proto.Driver \
  &#039;{&quot;content&quot;: &quot;Hello&quot;, &quot;msg_id&quot;: 1}&#039;

示例2: 多输入Component

目标

创建一个融合两路传感器数据的Component。

实现

// fusion_component.h
#pragma once

#include &quot;cyber/component/component.h&quot;
#include &quot;sensor_msgs/image.pb.h&quot;
#include &quot;sensor_msgs/pointcloud.pb.h&quot;

class FusionComponent : public Component&lt;
    sensor_msgs::Image,       // M0: 相机数据
    sensor_msgs::PointCloud   // M1: 激光雷达数据
> {
 public:
  bool Init() override;

  bool Proc(const std::shared_ptr&lt;sensor_msgs::Image&gt;&amp; image,
            const std::shared_ptr&lt;sensor_msgs::PointCloud&gt;&amp; cloud) override;
};

CYBER_REGISTER_COMPONENT(FusionComponent)
// fusion_component.cc
bool FusionComponent::Init() {
  AINFO &lt;&lt; &quot;FusionComponent Init&quot;;
  return true;
}

bool FusionComponent::Proc(
    const std::shared_ptr&lt;sensor_msgs::Image&gt;&amp; image,
    const std::shared_ptr&lt;sensor_msgs::PointCloud&gt;&amp; cloud) {

  AINFO &lt;&lt; &quot;Fusing image (timestamp: &quot; &lt;&lt; image-&gt;header().timestamp_sec()
        &lt;&lt; &quot;) with pointcloud (timestamp: &quot; &lt;&lt; cloud-&gt;header().timestamp_sec()
        &lt;&lt; &quot;)&quot;;

  // 执行融合算法...

  return true;
}

DAG配置

# fusion.dag
module_config {
  module_library : &quot;/apollo/bazel-bin/perception/libfusion_component.so&quot;

  components {
    class_name : &quot;FusionComponent&quot;
    config {
      name: &quot;sensor_fusion&quot;

      # 注意:readers的顺序必须与Component模板参数顺序一致
      readers: [
        {
          channel: &quot;/apollo/sensor/camera/front/image&quot;  # M0
        },
        {
          channel: &quot;/apollo/sensor/lidar/pointcloud&quot;    # M1
        }
      ]
    }
  }
}

重要: readers的顺序必须与Component模板参数一致!

示例3: TimerComponent

目标

创建一个每秒打印一次心跳的Component。

实现

// heartbeat_component.h
#pragma once

#include &quot;cyber/component/timer_component.h&quot;

class HeartbeatComponent : public apollo::cyber::TimerComponent {
 public:
  bool Init() override;
  bool Proc() override;

 private:
  uint64_t count_ = 0;
};

CYBER_REGISTER_COMPONENT(HeartbeatComponent)
// heartbeat_component.cc
#include &quot;heartbeat_component.h&quot;

bool HeartbeatComponent::Init() {
  AINFO &lt;&lt; &quot;HeartbeatComponent Init&quot;;
  return true;
}

bool HeartbeatComponent::Proc() {
  count_++;
  AINFO &lt;&lt; &quot;Heartbeat #&quot; &lt;&lt; count_ &lt;&lt; &quot; at &quot; &lt;&lt; apollo::cyber::Time::Now();
  return true;
}

DAG配置

# heartbeat.dag
module_config {
  module_library : &quot;/apollo/bazel-bin/monitor/libheartbeat_component.so&quot;

  timer_components {
    class_name : &quot;HeartbeatComponent&quot;
    config {
      name: &quot;heartbeat&quot;
      interval: 1000  # 1秒 = 1000毫秒
    }
  }
}

运行

mainboard -d heartbeat.dag

# 输出:
# [INFO] HeartbeatComponent Init
# [INFO] Heartbeat #1 at 1634567890.123
# [INFO] Heartbeat #2 at 1634567891.123
# [INFO] Heartbeat #3 at 1634567892.123
# ...

示例4: 混合使用Component和TimerComponent

场景

一个模块包含数据采集组件(消息驱动)和数据处理组件(定时触发)。

DAG配置

module_config {
  module_library : &quot;/apollo/bazel-bin/mymodule/libmymodule.so&quot;

  # 组件1: 接收传感器数据(消息驱动)
  components {
    class_name : &quot;SensorComponent&quot;
    config {
      name: &quot;sensor_receiver&quot;
      readers: [
        { channel: &quot;/apollo/sensor/data&quot; }
      ]
    }
  }

  # 组件2: 定期处理数据(定时驱动)
  timer_components {
    class_name : &quot;ProcessorComponent&quot;
    config {
      name: &quot;data_processor&quot;
      interval: 100  # 100ms处理一次
    }
  }
}

最佳实践

1. DAG文件组织

建议结构

/apollo/modules/&lt;模块名&gt;/
  ├── dag/
  │   ├── &lt;模块名&gt;.dag              # 主DAG文件
  │   ├── &lt;模块名&gt;_test.dag         # 测试DAG
  │   └── &lt;模块名&gt;_&lt;变体&gt;.dag       # 变体配置
  ├── conf/
  │   ├── &lt;模块名&gt;_config.pb.txt    # Protobuf配置
  │   └── &lt;模块名&gt;.conf              # GFlags配置
  └── proto/
      └── &lt;模块名&gt;_config.proto      # 配置Proto定义

命名规范

  • DAG文件: 小写,下划线分隔
  • Component类名: 大驼峰,以Component结尾
  • 组件名称: 小写,下划线分隔,简洁明了

2. Reader配置优化

根据场景选择QoS

# 场景1: 实时性要求高(控制指令)
readers: [
  {
    channel: &quot;/apollo/control/command&quot;
    qos_profile: { depth: 1 }         # 只要最新的
    pending_queue_size: 1              # 不积压
  }
]

# 场景2: 需要历史数据(障碍物追踪)
readers: [
  {
    channel: &quot;/apollo/perception/obstacles&quot;
    qos_profile: { depth: 10 }        # 保留最近10帧
    pending_queue_size: 20             # 允许一定积压
  }
]

# 场景3: 不允许丢消息(日志记录)
readers: [
  {
    channel: &quot;/apollo/monitor/events&quot;
    qos_profile: { depth: 100 }       # 保留更多历史
    pending_queue_size: 1000           # 大缓冲区
  }
]

3. Component设计原则

单一职责

// 好的设计:一个Component做一件事
class LaneDetectionComponent : public Component&lt;Image&gt; {
  bool Proc(const std::shared_ptr&lt;Image&gt;&amp; img) {
    // 只做车道线检测
  }
};

class ObstacleDetectionComponent : public Component&lt;Image&gt; {
  bool Proc(const std::shared_ptr&lt;Image&gt;&amp; img) {
    // 只做障碍物检测
  }
};

// 不好的设计:一个Component做太多事
class PerceptionComponent : public Component&lt;Image&gt; {
  bool Proc(const std::shared_ptr&lt;Image&gt;&amp; img) {
    // 车道线检测
    // 障碍物检测
    // 红绿灯识别
    // 可行驶区域分割
    // ...
  }
};

配置外部化

// 好的设计:参数从配置文件读取
bool PlanningComponent::Init() {
  PlanningConfig config;
  if (!GetProtoConfig(&amp;config)) {
    return false;
  }

  max_speed_ = config.max_speed();
  safe_distance_ = config.safe_distance();
  return true;
}

// 不好的设计:硬编码参数
bool PlanningComponent::Init() {
  max_speed_ = 30.0;  // 写死在代码里
  safe_distance_ = 5.0;
  return true;
}

4. 错误处理

Init()中的验证

bool MyComponent::Init() {
  // 1. 读取配置
  MyConfig config;
  if (!GetProtoConfig(&amp;config)) {
    AERROR &lt;&lt; &quot;Failed to load config&quot;;
    return false;  // 初始化失败会阻止模块启动
  }

  // 2. 验证参数
  if (config.max_speed() &lt;= 0) {
    AERROR &lt;&lt; &quot;Invalid max_speed: &quot; &lt;&lt; config.max_speed();
    return false;
  }

  // 3. 初始化资源
  writer_ = node_-&gt;CreateWriter&lt;MyMsg&gt;(&quot;/apollo/my/channel&quot;);
  if (writer_ == nullptr) {
    AERROR &lt;&lt; &quot;Failed to create writer&quot;;
    return false;
  }

  return true;
}

Proc()中的异常处理

bool MyComponent::Proc(const std::shared_ptr&lt;InputMsg&gt;&amp; msg) {
  // 1. 输入验证
  if (msg == nullptr) {
    AERROR &lt;&lt; &quot;Received null message&quot;;
    return false;  // 返回false会记录错误日志
  }

  // 2. 业务逻辑
  try {
    auto result = ProcessData(*msg);

    // 3. 发布结果
    writer_-&gt;Write(result);

    return true;
  } catch (const std::exception&amp; e) {
    AERROR &lt;&lt; &quot;Processing failed: &quot; &lt;&lt; e.what();
    return false;
  }
}

5. 性能优化

避免频繁内存分配

// 好的设计:复用对象
class MyComponent : public Component&lt;Input&gt; {
 private:
  OutputMsg output_;  // 成员变量,复用

 public:
  bool Proc(const std::shared_ptr&lt;Input&gt;&amp; input) {
    output_.Clear();  // 清空而非重新分配
    // 填充output_
    writer_-&gt;Write(output_);
    return true;
  }
};

// 不好的设计:每次都new
class MyComponent : public Component&lt;Input&gt; {
 public:
  bool Proc(const std::shared_ptr&lt;Input&gt;&amp; input) {
    auto output = std::make_shared&lt;OutputMsg&gt;();  // 频繁分配
    // 填充output
    writer_-&gt;Write(output);
    return true;
  }
};

控制日志频率

bool MyComponent::Proc(const std::shared_ptr&lt;Input&gt;&amp; input) {
  static uint64_t count = 0;
  count++;

  // 每100次打印一次,而非每次都打印
  if (count % 100 == 0) {
    AINFO &lt;&lt; &quot;Processed &quot; &lt;&lt; count &lt;&lt; &quot; messages&quot;;
  }

  // 业务逻辑...
  return true;
}

6. 调试技巧

使用cyber_monitor监控

# 启动模块
mainboard -d planning.dag

# 在另一个终端监控消息
cyber_monitor

# 或查看特定channel
cyber_channel echo /apollo/planning

使用cyber_recorder录制和回放

# 录制消息
cyber_recorder record -a  # 录制所有channel

# 回放消息
cyber_recorder play -f &lt;record_file&gt;

# 这样可以离线调试Component,无需实际硬件

添加详细日志

bool MyComponent::Proc(const std::shared_ptr&lt;Input&gt;&amp; input) {
  ADEBUG &lt;&lt; &quot;========== Proc Start ==========&quot;;
  ADEBUG &lt;&lt; &quot;Input timestamp: &quot; &lt;&lt; input-&gt;timestamp();
  ADEBUG &lt;&lt; &quot;Input size: &quot; &lt;&lt; input-&gt;data_size();

  // 业务逻辑...

  ADEBUG &lt;&lt; &quot;Output generated: &quot; &lt;&lt; output.DebugString();
  ADEBUG &lt;&lt; &quot;========== Proc End ==========&quot;;
  return true;
}

7. 配置版本管理

使用Git管理DAG和配置

# 提交前检查
git diff modules/planning/dag/planning.dag
git diff modules/planning/conf/planning_config.pb.txt

# 添加有意义的commit message
git commit -m &quot;feat(planning): add emergency stop scenario
- Add emergency_stop.dag
- Update planning_config with new parameters
- Increase QoS depth for obstacle channel&quot;

配置文件注释

# planning.dag
module_config {
  module_library : &quot;/apollo/bazel-bin/modules/planning/libplanning_component.so&quot;

  components {
    class_name : &quot;PlanningComponent&quot;
    config {
      name: &quot;planning&quot;

      readers: [
        {
          # 订阅预测模块的输出
          # 注意:此channel必须在prediction模块启动后才可用
          channel: &quot;/apollo/prediction&quot;
        },
        {
          # 底盘状态,高频更新(100Hz)
          # depth=15保证不丢失最近的状态
          channel: &quot;/apollo/canbus/chassis&quot;
          qos_profile: { depth : 15 }
          pending_queue_size: 50
        }
      ]
    }
  }
}

常见问题

Q1: Component创建失败怎么办?

错误信息:

[ERROR] CreateClassObj failed, ensure class has been registered.
classname: MyComponent, lib: /apollo/bazel-bin/mymodule/libmymodule.so

可能原因:

  1. 忘记添加<code>CYBER_REGISTER_COMPONENT(MyComponent)</code>
  2. class_name拼写错误(区分大小写)
  3. 动态库未正确编译或链接

解决方法:

// 1. 确保注册宏存在
CYBER_REGISTER_COMPONENT(MyComponent)  // 在.cc文件中

// 2. 检查类名是否一致
class_name : &quot;MyComponent&quot;  // DAG中
CYBER_REGISTER_COMPONENT(MyComponent)  // 代码中

// 3. 重新编译
bazel build //mymodule:libmymodule.so

Q2: Reader收不到消息?

可能原因:

  1. channel名称不匹配
  2. 消息类型不匹配
  3. 发布者未启动
  4. QoS配置不当

调试步骤:

# 1. 检查channel是否存在
cyber_channel list

# 2. 检查消息是否发布
cyber_channel echo /apollo/my/channel

# 3. 检查消息类型
cyber_channel info /apollo/my/channel

# 4. 确认Component已启动
cyber_node list

Q3: 多输入Component何时触发?

规则:

  • 所有输入通道至少收到1条消息后才触发
  • 之后每当第一个通道收到新消息时触发
  • 其他通道使用最新的消息

示例:

Component&lt;M0, M1&gt;

时刻1: M0收到消息 → 不触发(M1还没有消息)
时刻2: M1收到消息 → 触发Proc(M0最新, M1最新)
时刻3: M0收到消息 → 触发Proc(M0新, M1旧)
时刻4: M1收到消息 → 不触发(M1不是第一个输入)
时刻5: M0收到消息 → 触发Proc(M0新, M1新)

Q4: 如何在Component之间共享数据?

方法1: 通过消息通道(推荐)

// ComponentA
bool ComponentA::Proc(...) {
  auto data = std::make_shared&lt;MyData&gt;();
  // 填充data
  writer_-&gt;Write(data);  // 发布到channel
  return true;
}

// ComponentB
bool ComponentB::Proc(const std::shared_ptr&lt;MyData&gt;&amp; data) {
  // 使用data
  return true;
}

方法2: 通过全局单例(不推荐,仅用于只读配置)

class SharedData {
 public:
  static SharedData&amp; Instance() {
    static SharedData instance;
    return instance;
  }

  const Config&amp; GetConfig() const { return config_; }

 private:
  Config config_;
};

Q5: DAG文件可以动态修改吗?

答案: 不可以。

DAG文件在mainboard启动时加载,运行时修改不会生效。

如果需要修改:

  1. 停止mainboard(Ctrl+C)
  2. 修改DAG文件
  3. 重新启动mainboard

动态配置的正确方式:

  • 使用Protobuf配置文件(可以在Init中重新加载)
  • 使用GFlags(可以通过cyber_launch修改)
  • 订阅配置更新消息

总结

DAG配置的核心概念

  1. 声明式配置: DAG文件描述"要加载什么",而非"如何加载"
  2. 模块化设计: 一个模块一个DAG,便于管理和复用
  3. 松耦合: 模块之间通过消息通道通信,不直接依赖

Mainboard启动机制

命令行 → 解析DAG → 加载库 → 创建实例 → 初始化 → 订阅消息 → 等待触发

Component两种模式

  • 消息驱动: 响应式处理,按需执行
  • 定时驱动: 周期性处理,固定频率

关键设计模式

  • 工厂模式: 动态创建Component
  • 注册模式: 类名到工厂函数的映射
  • 发布订阅: Reader/Writer解耦通信

使用建议

  1. ✅ 合理划分Component粒度
  2. ✅ 配置参数外部化
  3. ✅ 完善错误处理
  4. ✅ 添加详细日志
  5. ✅ 注意性能优化
  6. ✅ 版本化管理配置文件

参考文件

  • Proto定义:

    • <code>/apollo/cyber/proto/dag_conf.proto:1-26</code>
    • <code>/apollo/cyber/proto/component_conf.proto:1-28</code>
  • 源码实现:

    • <code>/apollo/cyber/mainboard/module_controller.cc:37-137</code>
    • <code>/apollo/cyber/component/component.h:56-557</code>
    • <code>/apollo/cyber/component/timer_component.h:35-67</code>
  • 配置示例:

    • <code>/apollo/modules/planning/dag/planning.dag:1-31</code>
    • <code>/apollo/modules/control/dag/control.dag:1-12</code>
    • <code>/apollo/cyber/examples/common_component_example/common.dag:1-17</code>
本文作者:SakuraPuare
本文链接:https://blog.sakurapuare.com/archives/2025/10/apollo-dag/
版权声明:本文采用 CC BY-NC-SA 4.0 CN 协议进行许可
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇