Skip to content
On this page

Agent-to-Agent 协议层(A2A)

文件:service/ai/a2a/orchestrator.pyschemas.py、agents/、run_orchestrator_api.py)


第一部分:技术背景与演进

问题背景

单个 Agent 能力有限,复杂任务往往需要多个专业化 Agent 分工协作——就像一个公司里有策划、写手、审核各司其职。但多 Agent 协作面临一个工程难题:Agent 之间如何通信?如果每对 Agent 都自定义接口,系统会变成一张互不兼容的蜘蛛网。A2A 协议(Agent-to-Agent Protocol,Google 2025 年提出)的目标是标准化 Agent 间通信,让任何实现了该协议的 Agent 都能相互协作。

核心概念

  • Agent Card(智能体名片):每个 Agent 在 GET /.well-known/agent.json 暴露自己的能力描述(能做什么、接受什么输入、输出什么格式),类比于微服务的 OpenAPI 文档。
  • Task(任务):A2A 协议的核心单元,有明确的生命周期:submitted → working → completed/failed
  • Part(零件):消息和工件的最小内容单元,分 TextPartFilePartDataPart 三种,支持跨 Agent 传递结构化数据。
  • Artifact(工件):Agent 完成任务后输出的结果,通过 Task 的 artifacts 字段携带,是链式调用中传递的"产品"。

演进脉络

阶段方案问题
自定义 API每对 Agent 定义私有接口系统耦合,新增 Agent 改动大
LangGraph 多节点同进程内多步骤无法跨服务部署,每步 LLM 串行
OpenAI Function Calling工具调用 → 子 Agent缺乏状态追踪和异步能力
A2A Protocol(2025)标准化 HTTP + JSON 协议Agent 可独立部署、独立扩展、跨语言互操作

本模块的定位

本项目实现了 A2A 协议的一个三节点内容创作流水线:OutlineAgent → DocAgent → SummaryAgent,分别负责生成大纲、正文、摘要,由 orchestrator.py 编排。每个 Agent 是独立运行的 HTTP 服务,可单独部署和扩展,是"微服务化 AI 工作流"的示范实现。


第二部分:架构剖析

内容创作流水线

用户输入:主题文本

      ▼ Orchestrator(orchestrator.py)

      ├─► Step 1: OutlineAgent(:8001)
      │     GET /.well-known/agent.json   能力发现
      │     POST /tasks/send              {TextPart: "主题"}
      │     ← Task.artifacts[0]           {DataPart: outline_json}

      ├─► Step 2: DocAgent(:8002)
      │     POST /tasks/send              {DataPart: outline_json}
      │     ← Task.artifacts[0]           {DataPart: doc_content}

      └─► Step 3: SummaryAgent(:8003)
            POST /tasks/send              {DataPart: doc_content}
            ← Task.artifacts[0]           {TextPart: summary}

输出:OrchestrationResult {chain: [步骤信息], tasks: [每步Task], final_task}

A2A 数据结构(schemas.py,Pydantic)

Message ─── parts: [TextPart | FilePart | DataPart]
               ├── messageId (UUID)
               └── taskId / contextId

Task ──────── id, contextId
               ├── status: TaskStatus {state, timestamp}
               ├── artifacts: [A2AArtifact]
               │     └── parts: [...]
               └── history: [Message]

AgentCard ─── name, url, version
               ├── skills: [AgentSkill]
               └── capabilities: {streaming, pushNotifications, stateTransitionHistory}

流式 SSE 支持

python
# 编排器提供生成器接口,逐步产出每一步的 ChainStep
def run_chain_stream(topic: str) -> Generator[ChainStep, None, None]:
    for step in [OutlineAgent, DocAgent, SummaryAgent]:
        yield ChainStep(status="running", agent_name=step)
        task = _send_message(step, message)
        yield ChainStep(status="completed", output_summary=...)

关键设计原则

  • Pydantic 强类型:所有 A2A 数据结构用 Pydantic BaseModel 定义,model_dump() 直接序列化为 JSON 请求体,反序列化也有类型校验,协议合规性由类型系统保证。
  • 能力发现先于调用:每次调用前先 _discover(agent_name) 获取 Agent Card,验证能力后再发送任务。虽然当前实现中能力发现结果未被缓存(每次都请求),但这保证了对 Agent 动态更新能力的支持。
  • 工件驱动的链式传递:前一个 Agent 的 Task.artifacts[0].parts[0].data 直接作为下一个 Agent 的 DataPart 输入,数据格式由 Agent 自己定义,Orchestrator 只负责透明传递。

与行业标准方案对比

维度A2A Protocol(本项目)LangGraph 多节点OpenAI Assistants API
部署方式独立 HTTP 服务,可跨机器同进程,单机云端托管
状态管理Task 生命周期标准化StateGraph 内置Thread/Message 托管
跨语言互操作是(HTTP + JSON)否(Python only)
协议标准化Google A2A Spec无(自定义)OpenAI 私有
运维复杂度高(多个独立服务)低(单进程)低(托管)
选型建议多团队协作、跨语言、独立部署同团队单服务快速原型、不介意托管

第三部分:代码实现深度解析

核心函数清单

函数文件作用
_discover(agent_name)orchestrator.pyGET /.well-known/agent.json,返回 Agent Card
_send_message(agent_name, message)orchestrator.pyPOST /tasks/send,返回 Task 对象
_poll_task(agent_name, task_id)orchestrator.pyGET /tasks/{task_id},异步轮询
run_chain(topic)orchestrator.py同步执行三 Agent 链,返回 OrchestrationResult
run_chain_stream(topic)orchestrator.py流式生成器版本,逐步 yield ChainStep

ChainStep 执行追踪

python
class ChainStep(BaseModel):
    step_index: int
    agent_name: str
    status: TaskState | Literal["running"]
    input_summary: str       # 传入内容摘要(供前端展示)
    output_summary: str      # 输出内容摘要
    started_at: str
    ended_at: str
    error_message: str | None

每一步执行时 status="running",完成后更新为 "completed""failed",所有步骤汇总为 OrchestrationResult.chain

设计决策与取舍

决策 1:固定三 Agent URL 配置

python
AGENT_URLS = {
    "OutlineAgent": "http://127.0.0.1:8001",
    "DocAgent": "http://127.0.0.1:8002",
    "SummaryAgent": "http://127.0.0.1:8003",
}

当前 URL 硬编码,适合本地开发演示。生产环境应改为从配置文件或服务发现(如 Consul)读取,支持水平扩展和动态路由。

决策 2:工件零件类型的约定而非强制
DataPart.datadict[str, Any],各 Agent 之间通过隐式约定(如 OutlineAgent 输出 {"outline": [...], "sections": [...]} )传递数据,而非 Schema 强制。好处是灵活,代价是接口文档需要额外维护,Agent 间耦合通过约定而非类型系统保证。

决策 3:同步 + 流式双接口
run_chain 等待所有步骤完成后返回完整结果(适合 API 调用);run_chain_stream 是生成器,每完成一步 yield 一次(适合 SSE 推送,前端可实时看到进度)。两套接口共享相同的业务逻辑,通过生成器 vs 阻塞的方式区分。


第四部分:应用场景与实战

使用场景

  • 多模块内容自动创作:主题 → 大纲 → 正文 → 摘要,三个专业化 Agent 流水线
  • 演示 A2A 协议标准:本模块是 Google A2A Protocol 的参考实现,可作为团队了解 A2A 规范的学习代码
  • 微服务化 AI 工作流:每个 Agent 可用不同语言实现,只要遵守 A2A HTTP 协议即可接入流水线

环境依赖

bash
pip install pydantic requests flask
# 启动三个 Agent 服务(每个独立进程)
python service/ai/a2a/agents/outline_agent_server.py   # :8001
python service/ai/a2a/agents/doc_agent_server.py        # :8002
python service/ai/a2a/agents/summary_agent_server.py    # :8003

代码示例

python
from service.ai.a2a.orchestrator import run_chain

result = run_chain("人工智能在医疗领域的应用")
print(result.final_task.artifacts[0].parts[0].text)  # 最终摘要

# 流式版本
for step in run_chain_stream("人工智能在医疗领域的应用"):
    print(f"[{step.agent_name}] {step.status}: {step.output_summary[:50]}")

常见问题

  • _discover 超时:Agent 服务未启动时会立即失败(超时 5s)。检查对应端口是否在监听:lsof -i :8001
  • Task 状态卡在 working:Agent 服务内部 LLM 调用超时。_send_message 超时设置为 600s,长时间不响应可 kill 对应进程重启。
  • DataPart 数据格式不匹配:各 Agent 的输入/输出约定是隐式的,若修改了 OutlineAgent 的输出结构,需同步修改 DocAgent 的解析逻辑。

第五部分:优缺点评估与未来展望

优势

  • 每个 Agent 独立部署,可单独扩展、单独更新,微服务化程度高
  • 严格遵循 A2A Protocol,具备跨团队/跨语言互操作能力
  • Pydantic 强类型保证协议合规,序列化/反序列化安全
  • 流式 + 同步双接口满足不同场景需求

已知局限

  • 三个 Agent 需要分别启动独立服务,本地开发运维成本高
  • Agent URL 硬编码,不支持动态服务发现
  • 工件数据格式为隐式约定,Agent 间耦合难以从代码层面发现
  • 无重试机制:任一 Agent 失败则整个链路失败,缺乏容错设计

演进建议

  • 短期:增加每一步的重试逻辑(最多 3 次指数退避);Agent URL 改为从环境变量读取
  • 中期:为 DataPart 的 data 字段增加 Schema 注册(如 JSON Schema),让 Agent 间数据格式有显式契约
  • 长期:引入服务发现(Consul/etcd),支持 Agent 动态注册和负载均衡;增加监控和链路追踪(OpenTelemetry)

行业前沿

  • Google A2A Protocol v1.0:2025 年正式发布,微软、Salesforce、SAP 等主要厂商宣布支持,有望成为行业事实标准
  • MCP(Model Context Protocol):Anthropic 提出的工具调用协议,与 A2A 互补——MCP 解决 Agent 如何调用工具,A2A 解决 Agent 如何与 Agent 通信
  • Agent 编排平台:Temporal、Apache Airflow 等工作流引擎开始支持 AI Agent 编排,提供重试、持久化、监控等生产级能力

技术文档集合