Appearance
Agent-to-Agent 协议层(A2A)
文件:
service/ai/a2a/(orchestrator.py、schemas.py、agents/、run_orchestrator_api.py)
第一部分:技术背景与演进
问题背景
单个 Agent 能力有限,复杂任务往往需要多个专业化 Agent 分工协作——就像一个公司里有策划、写手、审核各司其职。但多 Agent 协作面临一个工程难题:Agent 之间如何通信?如果每对 Agent 都自定义接口,系统会变成一张互不兼容的蜘蛛网。A2A 协议(Agent-to-Agent Protocol,Google 2025 年提出)的目标是标准化 Agent 间通信,让任何实现了该协议的 Agent 都能相互协作。
核心概念
- Agent Card(智能体名片):每个 Agent 在
GET /.well-known/agent.json暴露自己的能力描述(能做什么、接受什么输入、输出什么格式),类比于微服务的 OpenAPI 文档。 - Task(任务):A2A 协议的核心单元,有明确的生命周期:
submitted → working → completed/failed。 - Part(零件):消息和工件的最小内容单元,分
TextPart、FilePart、DataPart三种,支持跨 Agent 传递结构化数据。 - Artifact(工件):Agent 完成任务后输出的结果,通过 Task 的
artifacts字段携带,是链式调用中传递的"产品"。
演进脉络
| 阶段 | 方案 | 问题 |
|---|---|---|
| 自定义 API | 每对 Agent 定义私有接口 | 系统耦合,新增 Agent 改动大 |
| LangGraph 多节点 | 同进程内多步骤 | 无法跨服务部署,每步 LLM 串行 |
| OpenAI Function Calling | 工具调用 → 子 Agent | 缺乏状态追踪和异步能力 |
| A2A Protocol(2025) | 标准化 HTTP + JSON 协议 | Agent 可独立部署、独立扩展、跨语言互操作 |
本模块的定位
本项目实现了 A2A 协议的一个三节点内容创作流水线:OutlineAgent → DocAgent → SummaryAgent,分别负责生成大纲、正文、摘要,由 orchestrator.py 编排。每个 Agent 是独立运行的 HTTP 服务,可单独部署和扩展,是"微服务化 AI 工作流"的示范实现。
第二部分:架构剖析
内容创作流水线
用户输入:主题文本
│
▼ Orchestrator(orchestrator.py)
│
├─► Step 1: OutlineAgent(:8001)
│ GET /.well-known/agent.json 能力发现
│ POST /tasks/send {TextPart: "主题"}
│ ← Task.artifacts[0] {DataPart: outline_json}
│
├─► Step 2: DocAgent(:8002)
│ POST /tasks/send {DataPart: outline_json}
│ ← Task.artifacts[0] {DataPart: doc_content}
│
└─► Step 3: SummaryAgent(:8003)
POST /tasks/send {DataPart: doc_content}
← Task.artifacts[0] {TextPart: summary}
输出:OrchestrationResult {chain: [步骤信息], tasks: [每步Task], final_task}
A2A 数据结构(schemas.py,Pydantic)
Message ─── parts: [TextPart | FilePart | DataPart]
├── messageId (UUID)
└── taskId / contextId
Task ──────── id, contextId
├── status: TaskStatus {state, timestamp}
├── artifacts: [A2AArtifact]
│ └── parts: [...]
└── history: [Message]
AgentCard ─── name, url, version
├── skills: [AgentSkill]
└── capabilities: {streaming, pushNotifications, stateTransitionHistory}
流式 SSE 支持
python
# 编排器提供生成器接口,逐步产出每一步的 ChainStep
def run_chain_stream(topic: str) -> Generator[ChainStep, None, None]:
for step in [OutlineAgent, DocAgent, SummaryAgent]:
yield ChainStep(status="running", agent_name=step)
task = _send_message(step, message)
yield ChainStep(status="completed", output_summary=...)
关键设计原则
- Pydantic 强类型:所有 A2A 数据结构用 Pydantic
BaseModel定义,model_dump()直接序列化为 JSON 请求体,反序列化也有类型校验,协议合规性由类型系统保证。 - 能力发现先于调用:每次调用前先
_discover(agent_name)获取 Agent Card,验证能力后再发送任务。虽然当前实现中能力发现结果未被缓存(每次都请求),但这保证了对 Agent 动态更新能力的支持。 - 工件驱动的链式传递:前一个 Agent 的
Task.artifacts[0].parts[0].data直接作为下一个 Agent 的DataPart输入,数据格式由 Agent 自己定义,Orchestrator 只负责透明传递。
与行业标准方案对比
| 维度 | A2A Protocol(本项目) | LangGraph 多节点 | OpenAI Assistants API |
|---|---|---|---|
| 部署方式 | 独立 HTTP 服务,可跨机器 | 同进程,单机 | 云端托管 |
| 状态管理 | Task 生命周期标准化 | StateGraph 内置 | Thread/Message 托管 |
| 跨语言互操作 | 是(HTTP + JSON) | 否(Python only) | 是 |
| 协议标准化 | Google A2A Spec | 无(自定义) | OpenAI 私有 |
| 运维复杂度 | 高(多个独立服务) | 低(单进程) | 低(托管) |
| 选型建议 | 多团队协作、跨语言、独立部署 | 同团队单服务 | 快速原型、不介意托管 |
第三部分:代码实现深度解析
核心函数清单
| 函数 | 文件 | 作用 |
|---|---|---|
_discover(agent_name) | orchestrator.py | GET /.well-known/agent.json,返回 Agent Card |
_send_message(agent_name, message) | orchestrator.py | POST /tasks/send,返回 Task 对象 |
_poll_task(agent_name, task_id) | orchestrator.py | GET /tasks/{task_id},异步轮询 |
run_chain(topic) | orchestrator.py | 同步执行三 Agent 链,返回 OrchestrationResult |
run_chain_stream(topic) | orchestrator.py | 流式生成器版本,逐步 yield ChainStep |
ChainStep 执行追踪
python
class ChainStep(BaseModel):
step_index: int
agent_name: str
status: TaskState | Literal["running"]
input_summary: str # 传入内容摘要(供前端展示)
output_summary: str # 输出内容摘要
started_at: str
ended_at: str
error_message: str | None
每一步执行时 status="running",完成后更新为 "completed" 或 "failed",所有步骤汇总为 OrchestrationResult.chain。
设计决策与取舍
决策 1:固定三 Agent URL 配置
python
AGENT_URLS = {
"OutlineAgent": "http://127.0.0.1:8001",
"DocAgent": "http://127.0.0.1:8002",
"SummaryAgent": "http://127.0.0.1:8003",
}
当前 URL 硬编码,适合本地开发演示。生产环境应改为从配置文件或服务发现(如 Consul)读取,支持水平扩展和动态路由。
决策 2:工件零件类型的约定而非强制DataPart.data 是 dict[str, Any],各 Agent 之间通过隐式约定(如 OutlineAgent 输出 {"outline": [...], "sections": [...]} )传递数据,而非 Schema 强制。好处是灵活,代价是接口文档需要额外维护,Agent 间耦合通过约定而非类型系统保证。
决策 3:同步 + 流式双接口run_chain 等待所有步骤完成后返回完整结果(适合 API 调用);run_chain_stream 是生成器,每完成一步 yield 一次(适合 SSE 推送,前端可实时看到进度)。两套接口共享相同的业务逻辑,通过生成器 vs 阻塞的方式区分。
第四部分:应用场景与实战
使用场景
- 多模块内容自动创作:主题 → 大纲 → 正文 → 摘要,三个专业化 Agent 流水线
- 演示 A2A 协议标准:本模块是 Google A2A Protocol 的参考实现,可作为团队了解 A2A 规范的学习代码
- 微服务化 AI 工作流:每个 Agent 可用不同语言实现,只要遵守 A2A HTTP 协议即可接入流水线
环境依赖
bash
pip install pydantic requests flask
# 启动三个 Agent 服务(每个独立进程)
python service/ai/a2a/agents/outline_agent_server.py # :8001
python service/ai/a2a/agents/doc_agent_server.py # :8002
python service/ai/a2a/agents/summary_agent_server.py # :8003
代码示例
python
from service.ai.a2a.orchestrator import run_chain
result = run_chain("人工智能在医疗领域的应用")
print(result.final_task.artifacts[0].parts[0].text) # 最终摘要
# 流式版本
for step in run_chain_stream("人工智能在医疗领域的应用"):
print(f"[{step.agent_name}] {step.status}: {step.output_summary[:50]}")
常见问题
_discover超时:Agent 服务未启动时会立即失败(超时 5s)。检查对应端口是否在监听:lsof -i :8001。- Task 状态卡在
working:Agent 服务内部 LLM 调用超时。_send_message超时设置为 600s,长时间不响应可 kill 对应进程重启。 - DataPart 数据格式不匹配:各 Agent 的输入/输出约定是隐式的,若修改了 OutlineAgent 的输出结构,需同步修改 DocAgent 的解析逻辑。
第五部分:优缺点评估与未来展望
优势
- 每个 Agent 独立部署,可单独扩展、单独更新,微服务化程度高
- 严格遵循 A2A Protocol,具备跨团队/跨语言互操作能力
- Pydantic 强类型保证协议合规,序列化/反序列化安全
- 流式 + 同步双接口满足不同场景需求
已知局限
- 三个 Agent 需要分别启动独立服务,本地开发运维成本高
- Agent URL 硬编码,不支持动态服务发现
- 工件数据格式为隐式约定,Agent 间耦合难以从代码层面发现
- 无重试机制:任一 Agent 失败则整个链路失败,缺乏容错设计
演进建议
- 短期:增加每一步的重试逻辑(最多 3 次指数退避);Agent URL 改为从环境变量读取
- 中期:为 DataPart 的
data字段增加 Schema 注册(如 JSON Schema),让 Agent 间数据格式有显式契约 - 长期:引入服务发现(Consul/etcd),支持 Agent 动态注册和负载均衡;增加监控和链路追踪(OpenTelemetry)
行业前沿
- Google A2A Protocol v1.0:2025 年正式发布,微软、Salesforce、SAP 等主要厂商宣布支持,有望成为行业事实标准
- MCP(Model Context Protocol):Anthropic 提出的工具调用协议,与 A2A 互补——MCP 解决 Agent 如何调用工具,A2A 解决 Agent 如何与 Agent 通信
- Agent 编排平台:Temporal、Apache Airflow 等工作流引擎开始支持 AI Agent 编排,提供重试、持久化、监控等生产级能力