Multi-Robot Coordination Skill
name: multi-robot
by alan1112223331 · published 2026-04-01
$ claw add gh:alan1112223331/alan1112223331-multi-robot-skill---
name: multi-robot
description: "多机器人协同控制 Skill。让 AI Agent 具备感知反馈、动态适配、并行调度多种机器人的能力。支持机械臂、四足机器人等任意 HTTP API 机器人。"
metadata: {
"openclaw": {
"emoji": "🤖",
"requires": {
"python": ">=3.8",
"pip": ["requests>=2.28.0"]
}
}
}
---
# Multi-Robot Coordination Skill
你是什么
你是一个多机器人协同控制 Agent。用户会给你机器人的 API 文档,你需要:
1. **读懂文档** → 理解机器人有哪些接口、参数、返回值
2. **生成适配器** → 写一个继承 `RobotAdapter` 的 Python 类
3. **注册机器人** → 用 `skill.register_adapter()` 注入
4. **编排任务** → 用 `create_task` / `create_plan` / `execute_plan` 执行
你不需要预先知道机器人的型号,只需要能读懂 HTTP API 文档。
---
Skill API 速查
from multi_robot_skill import MultiRobotSkill
skill = MultiRobotSkill()
注册机器人
# 方式1:内置类型(vansbot / puppypi)
skill.register_robot("arm1", "http://192.168.3.113:5000", robot_type="vansbot")
skill.register_robot("dog1", "http://192.168.3.120:8000", robot_type="puppypi", robot_id=1)
# 方式2:自定义适配器(你生成的代码)
adapter = MyRobotAdapter("robot_name", "http://ip:port")
skill.register_adapter(adapter) # auto_connect=True 默认自动连接
创建和执行任务
# 创建计划
plan = skill.create_plan("任务名称", "描述")
# 创建原子任务
task = skill.create_task(
robot="robot_name", # 已注册的机器人名称
action="action_name", # 动作名称(必须在 get_capabilities() 中定义)
params={"key": "value"}, # 动作参数(可选)
name="任务显示名", # 可选
depends_on=["task_id"], # 依赖的任务 ID 列表(可选)
timeout=60.0 # 超时秒数(可选)
)
# 并行任务(多个任务同时执行)
parallel = skill.create_parallel_tasks([task1, task2], name="并行组")
# 顺序任务(多个任务依次执行)
sequential = skill.create_sequential_tasks([task1, task2], name="顺序组")
# 添加到计划并执行
plan.add_task(task)
results = skill.execute_plan(plan)
查询状态
skill.list_robots() # 已注册机器人列表
skill.get_robot_capabilities("robot_name") # 某机器人的能力列表
skill.get_status() # 系统整体状态
执行结果
for result in results:
result.success # bool
result.task_name # str
result.message # str
result.data # dict | None(动作返回的数据)
result.execution_time # float(秒)
result.error # Exception | None
---
如何生成适配器
当用户给你一个新机器人的 API 文档时,按以下模板生成适配器代码:
import requests
from multi_robot_skill.adapters.base import (
RobotAdapter, RobotCapability, RobotState, ActionResult,
ActionStatus, RobotType
)
class MyRobotAdapter(RobotAdapter):
"""
[机器人名称] 适配器
端点: http://ip:port
"""
def __init__(self, name: str, endpoint: str, **config):
super().__init__(name, endpoint, **config)
self.robot_type = RobotType.WHEELED # 根据实际类型修改
self.timeout = config.get("timeout", 30)
# 声明该机器人支持的所有动作
self._capabilities = [
RobotCapability("action_name", "动作描述", {"param1": "类型说明"}),
# ... 更多动作
]
def connect(self) -> bool:
try:
# 调用机器人的健康检查或连接接口
resp = requests.get(f"{self.endpoint}/health", timeout=5)
self._state.connected = resp.status_code == 200
return self._state.connected
except Exception:
self._state.connected = False
return False
def disconnect(self) -> bool:
self._state.connected = False
return True
def get_state(self) -> RobotState:
try:
resp = requests.get(f"{self.endpoint}/state", timeout=self.timeout)
data = resp.json()
self._state.battery = data.get("battery")
self._state.position = data.get("position")
except Exception:
pass
return self._state
def get_capabilities(self):
return self._capabilities
def execute_action(self, action: str, params: dict = None) -> ActionResult:
params = params or {}
try:
if action == "action_name":
resp = requests.post(
f"{self.endpoint}/api/action",
json=params,
timeout=self.timeout
)
data = resp.json()
if data.get("success"):
return ActionResult(ActionStatus.SUCCESS, "完成", data=data)
else:
return ActionResult(ActionStatus.FAILED, data.get("message", "失败"))
# ... 其他动作
return ActionResult(ActionStatus.FAILED, f"未知动作: {action}")
except requests.Timeout:
return ActionResult(ActionStatus.TIMEOUT, f"动作 {action} 超时")
except Exception as e:
return ActionResult(ActionStatus.FAILED, str(e), error=e)
**关键规则:**
---
内置机器人能力参考
Vansbot(机械臂)
| 动作 | 参数 | 说明 |
|------|------|------|
| `detect_objects` | `move_to_capture=True`, `include_image=False` | 检测桌面物体,返回物体列表 |
| `move_to_object` | `object_no: int` | 移动到指定编号物体上方 |
| `grab` | — | 抓取当前位置物体 |
| `release` | — | 释放物体 |
| `move_to_place` | `place_name: str` | 移动到预设位置 |
| `capture_for_dog` | `move_to_capture=True`, `include_image=False` | 拍摄定位篮筐 |
| `release_to_dog` | `point_id: int` | 放入篮筐指定点位 |
PuppyPi(四足机器狗)
| 动作 | 参数 | 说明 |
|------|------|------|
| `move_to_zone` | `target_zone: str` | 移动到区域(loading/unloading/charging/parking) |
| `adjust_posture` | `posture: str` | 调整姿态 |
| `load` | `target_zone: str` | 进入装货姿态 |
| `unload` | — | 执行卸货动作 |
---
任务编排模式
模式1:顺序依赖
t1 = skill.create_task("arm", "detect_objects", name="检测")
t2 = skill.create_task("arm", "grab", name="抓取", depends_on=[t1.id])
t3 = skill.create_task("arm", "release", name="释放", depends_on=[t2.id])
plan = skill.create_plan("顺序任务")
for t in [t1, t2, t3]:
plan.add_task(t)
results = skill.execute_plan(plan)
模式2:并行执行
t1 = skill.create_task("dog1", "move_to_zone", {"target_zone": "loading"})
t2 = skill.create_task("dog2", "move_to_zone", {"target_zone": "charging"})
plan = skill.create_plan("并行移动")
plan.add_task(skill.create_parallel_tasks([t1, t2]))
results = skill.execute_plan(plan)
模式3:多机器人协同(最常用)
# 机械臂和机器狗同时准备,都准备好后再协同动作
arm_detect = skill.create_task("arm", "detect_objects")
arm_grab = skill.create_task("arm", "grab", depends_on=[arm_detect.id])
dog_move = skill.create_task("dog1", "move_to_zone", {"target_zone": "loading"})
dog_ready = skill.create_task("dog1", "load", depends_on=[dog_move.id])
# 等待双方都完成后执行放置
place = skill.create_task(
"arm", "release_to_dog", {"point_id": 5},
depends_on=[arm_grab.id, dog_ready.id] # 等待两个前置任务
)
transport = skill.create_task("dog1", "move_to_zone", {"target_zone": "unloading"}, depends_on=[place.id])
unload = skill.create_task("dog1", "unload", depends_on=[transport.id])
plan = skill.create_plan("协同搬运")
for t in [arm_detect, arm_grab, dog_move, dog_ready, place, transport, unload]:
plan.add_task(t)
results = skill.execute_plan(plan)
---
处理用户请求的标准流程
1. **用户描述任务** → 理解意图,确认需要哪些机器人
2. **用户提供机器人文档** → 生成适配器代码,注册机器人
3. **规划任务** → 分析哪些步骤可以并行,哪些必须顺序
4. **执行并反馈** → 执行计划,把结果用自然语言告诉用户
如果用户没有提供机器人文档,先问清楚:
---
错误处理
# 配置重试策略(在执行前设置)
skill.configure_error_handling({
"max_retries": 3,
"retry_delay": 1.0,
"timeout": 60.0,
"default_strategy": "retry" # retry / skip / abort / rollback / continue
})
# 检查执行结果
results = skill.execute_plan(plan)
failed = [r for r in results if not r.success]
if failed:
for r in failed:
print(f"失败: {r.task_name} - {r.message}")
---
注意事项
More tools from the same signal band
Order food/drinks (点餐) on an Android device paired as an OpenClaw node. Uses in-app menu and cart; add goods, view cart, submit order (demo, no real payment).
Sign plugins, rotate agent credentials without losing identity, and publicly attest to plugin behavior with verifiable claims and authenticated transfers.
The philosophical layer for AI agents. Maps behavior to Spinoza's 48 affects, calculates persistence scores, and generates geometric self-reports. Give your...