跳到主要内容

ACP 内部

ACP(Agent Communication Protocol)是 Hermes Agent 用于跨实例通信的协议。本页详细介绍其内部实现。

概述

ACP 支持:

  • 跨 Agent 消息传递
  • 远程过程调用(RPC)
  • 事件广播
  • 文件传输

消息格式

ACP 消息是 JSON 格式:

{
"version": "1.0",
"type": "message|call|response|event",
"id": "unique-message-id",
"from": "agent-id@instance-id",
"to": "agent-id@instance-id|*",
"payload": {},
"timestamp": "2026-03-30T14:30:00Z"
}

消息类型

类型描述用途
message文本消息聊天消息
callRPC 调用请求操作
responseRPC 响应返回调用结果
event事件广播状态变更通知

地址格式

Agent 地址格式:agent-id@instance-id

  • agent-id:Agent 类型标识符
  • instance-id:唯一实例标识符
  • *:广播地址

示例:

  • hermes@local — 本地 Hermes 实例
  • hermes@prod-1 — 生产实例 1
  • *@* — 全局广播

路由

本地路由

本地消息直接通过内存传递:

def route_local(message):
recipient = resolve_instance(message.to)
if recipient.is_local:
deliver_direct(recipient, message)

远程路由

远程消息通过传输层发送:

def route_remote(message):
next_hop = routing_table.get_next_hop(message.to)
transport.send(next_hop, message)

会话管理

会话建立

1. HELLO → 2. HELLO_ACK → 3. 建立连接
// Client → Server
{"type": "hello", "agent_id": "hermes", "instance_id": "local"}

// Server → Client
{"type": "hello_ack", "session_id": "sess-123", "capabilities": ["message", "call", "event"]}

会话保持

定期发送心跳:

async def heartbeat():
while connected:
await send({"type": "ping"})
await asyncio.sleep(30)

RPC 调用

调用流程

Caller → CALL → ACP → Callee

Callee → RESPONSE → ACP → Caller

调用示例

# 发起调用
result = await acp.call(
target="agent@instance",
method="execute_command",
args={"command": "ls -la"}
)

错误处理

{
"type": "error",
"code": "METHOD_NOT_FOUND",
"message": "Method 'unknown' does not exist",
"details": {}
}

事件系统

订阅

# 订阅事件
acp.subscribe("agent.lifecycle.*", handler)

发布

# 发布事件
acp.publish("agent.lifecycle.started", {"instance_id": "prod-1"})

事件类型

事件描述负载
agent.lifecycle.startedAgent 启动实例信息
agent.lifecycle.stoppedAgent 停止实例信息
agent.session.started会话开始会话 ID
agent.session.ended会话结束会话 ID、原因

传输层

支持的传输

传输用途加密
memory同一进程
unix同一主机
tcp跨主机TLS 可选
websocketWeb 客户端TLS 必需

连接池

class ConnectionPool:
def __init__(self, max_size=10):
self.pool = asyncio.Queue(maxsize=max_size)

async def acquire(self):
return await self.pool.get()

def release(self, conn):
await self.pool.put(conn)

安全

认证

async def authenticate(message):
token = message.headers.get("Authorization")
if not verify_token(token):
raise AuthError("Invalid token")

授权

def authorize(sender, recipient, action):
if sender in trusted_agents:
return True
return check_acl(sender, recipient, action)

相关文档