ACP 内部
ACP(Agent Communication Protocol)是 Hermes Agent 用于跨实例通信的协议。本页详细介绍其内部实现。
概述
ACP 支持:
- 跨 Agent 消息传递
- 远程过程调用(RPC)
- 事件广播
- 文件传输
消息格式
ACP 消息是 JSON 格式:
{
"version": "1.0",
"type": "message|call|response|event",
"id": "unique-message-id",
"from": "agent-id@instance-id",
"to": "agent-id@instance-id|*",
"payload": {},
"timestamp": "2026-03-30T14:30:00Z"
}
消息类型
| 类型 | 描述 | 用途 |
|---|---|---|
message | 文本消息 | 聊天消息 |
call | RPC 调用 | 请求操作 |
response | RPC 响应 | 返回调用结果 |
event | 事件广播 | 状态变更通知 |
地址格式
Agent 地址格式:agent-id@instance-id
agent-id:Agent 类型标识符instance-id:唯一实例标识符*:广播地址
示例:
hermes@local— 本地 Hermes 实例hermes@prod-1— 生产实例 1*@*— 全局广播
路由
本地路由
本地消息直接通过内存传递:
def route_local(message):
recipient = resolve_instance(message.to)
if recipient.is_local:
deliver_direct(recipient, message)
远程路由
远程消息通过传输层发送:
def route_remote(message):
next_hop = routing_table.get_next_hop(message.to)
transport.send(next_hop, message)
会话管理
会话建立
1. HELLO → 2. HELLO_ACK → 3. 建立连接
// Client → Server
{"type": "hello", "agent_id": "hermes", "instance_id": "local"}
// Server → Client
{"type": "hello_ack", "session_id": "sess-123", "capabilities": ["message", "call", "event"]}
会话保持
定期发送心跳:
async def heartbeat():
while connected:
await send({"type": "ping"})
await asyncio.sleep(30)
RPC 调用
调用流程
Caller → CALL → ACP → Callee
↓
Callee → RESPONSE → ACP → Caller
调用示例
# 发起调用
result = await acp.call(
target="agent@instance",
method="execute_command",
args={"command": "ls -la"}
)
错误处理
{
"type": "error",
"code": "METHOD_NOT_FOUND",
"message": "Method 'unknown' does not exist",
"details": {}
}
事件系统
订阅
# 订阅事件
acp.subscribe("agent.lifecycle.*", handler)
发布
# 发布事件
acp.publish("agent.lifecycle.started", {"instance_id": "prod-1"})
事件类型
| 事件 | 描述 | 负载 |
|---|---|---|
agent.lifecycle.started | Agent 启动 | 实例信息 |
agent.lifecycle.stopped | Agent 停止 | 实例信息 |
agent.session.started | 会话开始 | 会话 ID |
agent.session.ended | 会话结束 | 会话 ID、原因 |
传输层
支持的传输
| 传输 | 用途 | 加密 |
|---|---|---|
memory | 同一进程 | 无 |
unix | 同一主机 | 无 |
tcp | 跨主机 | TLS 可选 |
websocket | Web 客户端 | TLS 必需 |
连接池
class ConnectionPool:
def __init__(self, max_size=10):
self.pool = asyncio.Queue(maxsize=max_size)
async def acquire(self):
return await self.pool.get()
def release(self, conn):
await self.pool.put(conn)
安全
认证
async def authenticate(message):
token = message.headers.get("Authorization")
if not verify_token(token):
raise AuthError("Invalid token")
授权
def authorize(sender, recipient, action):
if sender in trusted_agents:
return True
return check_acl(sender, recipient, action)