Roboflow 是一个分布式数据转换流水线,用于将机器人 bag/MCAP 文件转换为可训练的数据集(LeRobot 格式)。
- 水平扩展:基于 TiKV 的分布式协调处理
- 模式驱动转换:支持 CDR(ROS1/ROS2)、Protobuf、JSON 消息格式
- 零拷贝分配:基于 Arena 的内存高效设计(减少约 22% 开销)
- 云存储支持:原生支持 S3 和阿里云 OSS,用于分布式工作负载
- 高吞吐量:并行分块处理,最高可达 ~1800 MB/s
- LeRobot 导出:转换为 LeRobot 数据集格式,用于机器人学习
Roboflow 采用受 Kubernetes 启发的分布式控制平面,实现容错的批处理。
┌─────────────────────────────────────────────────────────────────────┐
│ 控制平面 (Control Plane) │
├─────────────────────────────────────────────────────────────────────┤
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Scanner │ │ Reaper │ │ Finalizer │ │
│ │ 控制器 │ │ 控制器 │ │ 控制器 │ │
│ │ │ │ │ │ │ │
│ │ • 发现文件 │ │ • 检测失活 │ │ • 监控批处理 │ │
│ │ • 创建作业 │ │ Pod │ │ • 触发合并 │ │
│ │ │ │ • 回收孤儿 │ │ │ │
│ │ │ │ 作业 │ │ │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ └─────────────────┼─────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ TiKV │ │
│ │ (类似 etcd │ │
│ │ 状态存储) │ │
│ └─────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│ 数据平面 (Data Plane) │
├─────────────────────────────────────────────────────────────────────┤
│ Worker (pod-abc) Worker (pod-def) Worker (pod-xyz) │
│ • 认领作业 • 认领作业 • 认领作业 │
│ • 发送心跳 • 发送心跳 • 发送心跳 │
│ • 处理数据 • 处理数据 • 处理数据 │
│ • 保存检查点 • 保存检查点 • 保存检查点 │
└─────────────────────────────────────────────────────────────────────┘
| Kubernetes 概念 | Roboflow 等价实现 |
|---|---|
| Pod | 带 pod_id 的 Worker |
| etcd | TiKV 分布式存储 |
| kubelet 心跳 | HeartbeatManager |
| node-controller | ZombieReaper |
| Finalizers | Finalizer 控制器 |
| Job/CronJob | JobRecord, BatchSpec |
| 状态机 | BatchPhase (Pending → Discovering → Running → Merging → Complete/Failed) |
| Crate | 用途 |
|---|---|
roboflow-core |
错误类型、注册表、值类型 |
roboflow-storage |
S3、OSS、本地存储(始终可用) |
roboflow-executor |
分布式流水线的阶段任务执行器 |
roboflow-media |
机器人数据集的图像和视频编解码 |
roboflow-dataset |
KPS、LeRobot、流式转换器、数据源 |
roboflow-pipeline |
数据集处理的流水线执行和阶段 |
roboflow-distributed |
TiKV 客户端、目录、熔断器、Worker 协调 |
# 设置环境变量
export TIKV_PD_ENDPOINTS="127.0.0.1:2379"
export AWS_ACCESS_KEY_ID="your-key"
export AWS_SECRET_ACCESS_KEY="your-secret"
# 运行统一服务 (scanner + worker + finalizer + reaper)
roboflow run# 仅 Worker - 处理工作单元
roboflow run --role worker
# 仅 Finalizer - 合并已完成的批次
roboflow run --role finalizer
# 使用自定义 Pod ID
roboflow run --pod-id my-pod-1roboflow submit s3://bucket/input.bag --output s3://bucket/output/roboflow jobs list
roboflow jobs get <job-id>
roboflow batch list
roboflow batch get <batch-id>git clone https://github.com/archebase/roboflow.git
cd roboflow
cargo build --release- Rust 1.80+
- TiKV 4.0+(用于分布式协调)
- ffmpeg(用于 LeRobot 数据集中的视频编码)
[dataset]
name = "my_dataset"
fps = 30
robot_type = "stretch"
[[mapping]]
topic = "/camera/image_raw"
name = "observation.images.camera_0"
encoding = "ros1msg"
[[mapping]]
topic = "/joint_states"
name = "observation.joint_state"
encoding = "cdr"| 变量 | 说明 | 默认值 |
|---|---|---|
TIKV_PD_ENDPOINTS |
TiKV PD 端点 | 127.0.0.1:2379 |
AWS_ACCESS_KEY_ID |
AWS 访问密钥 | - |
AWS_SECRET_ACCESS_KEY |
AWS 密钥 | - |
AWS_REGION |
AWS 区域 | - |
OSS_ACCESS_KEY_ID |
阿里云 OSS 密钥 | - |
OSS_ACCESS_KEY_SECRET |
阿里云 OSS 密钥 | - |
OSS_ENDPOINT |
阿里云 OSS 端点 | - |
WORKER_POLL_INTERVAL_SECS |
任务轮询间隔 | 5 |
WORKER_MAX_CONCURRENT_JOBS |
最大并发任务数 | 1 |
SCANNER_SCAN_INTERVAL_SECS |
扫描间隔 | 60 |
FINALIZER_POLL_INTERVAL_SECS |
Finalizer 轮询间隔 | 30 |
cargo buildcargo testcargo fmt
cargo clippy --all-targets -- -D warnings使用 Docker Compose 启动所需服务:
docker compose up -d # 启动所有服务 (MinIO, TiKV, PD)
docker compose down # 停止所有服务服务:
| 服务 | 用途 | 端口 |
|---|---|---|
| MinIO | S3 兼容对象存储 | 9000 (API), 9001 (控制台) |
| TiKV | 分布式 KV 存储 | 20160 |
| PD | TiKV 调度器 | 2379, 2380 |
详见 CONTRIBUTING.md。
本项目采用 MulanPSL v2 许可证 - 详见 LICENSE 文件。