核心改动: - 每个 Agent 成为独立图节点(weather/math/knowledge/mcp/general) - Agent 间通过 Command(goto=目标节点) 实现任务交接 - Supervisor 返回普通dict,由 conditional_edges 路由 - 并行模式使用独立 parallel_worker 节点(SubTaskState),避免并发冲突 - 新增 handoff_from/handoff_context/handoff_history 状态字段 - handoff 判断:Agent先回答,再由LLM判断是否需要交接给其他Agent - 保留 Send API 并行 + Aggregator 聚合能力 测试通过:单Agent、直接回复、多Agent并行、Handoff链路追踪
1011 lines
40 KiB
Python
1011 lines
40 KiB
Python
"""
|
||
黄庄三号 Agent v4.0 - Agent间Command Handoff交互版
|
||
====================================================
|
||
架构: Supervisor + 独立Agent节点 + Command Handoff
|
||
- Supervisor: 分析任务,用Command分发到目标Agent
|
||
- 每个Agent是独立图节点,拥有自己的LLM + 工具 + handoff工具
|
||
- Agent间通过 transfer_to_xxx 工具实现Command handoff,直接跳转
|
||
- 支持: 并行分发(Send API)、串行分发(Command goto)、Agent间互相交接
|
||
|
||
运行方式:
|
||
python3 agent.py --test 自动测试
|
||
python3 agent.py --mcp --test 带MCP测试
|
||
python3 agent.py --mcp 交互模式(带MCP)
|
||
python3 agent.py 交互模式(不带MCP)
|
||
"""
|
||
import os
|
||
import re
|
||
import json
|
||
import asyncio
|
||
import argparse
|
||
import importlib.util
|
||
from typing import Annotated, Literal
|
||
from typing_extensions import TypedDict
|
||
from pydantic import BaseModel, Field
|
||
from contextlib import AsyncExitStack
|
||
from pathlib import Path
|
||
from operator import add
|
||
|
||
import yaml
|
||
from langchain_openai import ChatOpenAI
|
||
from langchain_core.tools import tool
|
||
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, ToolMessage
|
||
from langgraph.graph import StateGraph, START, END
|
||
from langgraph.graph.message import add_messages
|
||
from langgraph.prebuilt import ToolNode
|
||
from langgraph.types import Send, Command
|
||
|
||
|
||
# ════════════════════════════════════════════
|
||
# 基础路径
|
||
# ════════════════════════════════════════════
|
||
BASE_DIR = Path(__file__).parent.resolve()
|
||
CONFIG_PATH = BASE_DIR / "config.yaml"
|
||
TOOLS_DIR = BASE_DIR / "tools"
|
||
SKILLS_DIR = BASE_DIR / "skills"
|
||
AGENTS_DIR = BASE_DIR / "agents"
|
||
|
||
|
||
# ════════════════════════════════════════════
|
||
# 配置加载
|
||
# ════════════════════════════════════════════
|
||
def load_config() -> dict:
|
||
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
|
||
return yaml.safe_load(f)
|
||
|
||
|
||
# ════════════════════════════════════════════
|
||
# 工具自动扫描注册
|
||
# ════════════════════════════════════════════
|
||
def scan_tools() -> list:
|
||
all_tools = []
|
||
if not TOOLS_DIR.exists():
|
||
return all_tools
|
||
for py_file in sorted(TOOLS_DIR.glob("*.py")):
|
||
if py_file.name.startswith("_"):
|
||
continue
|
||
try:
|
||
spec = importlib.util.spec_from_file_location(f"tools.{py_file.stem}", str(py_file))
|
||
mod = importlib.util.module_from_spec(spec)
|
||
spec.loader.exec_module(mod)
|
||
if hasattr(mod, "TOOLS"):
|
||
tool_list = mod.TOOLS
|
||
all_tools.extend(tool_list)
|
||
print(f" [工具] {py_file.name}: {[t.name for t in tool_list]}")
|
||
except Exception as e:
|
||
print(f" [工具] {py_file.name}: 加载失败 -> {e}")
|
||
return all_tools
|
||
|
||
|
||
# ════════════════════════════════════════════
|
||
# 技能自动扫描注册(兼容 OpenClaw)
|
||
# ════════════════════════════════════════════
|
||
class SkillDef(BaseModel):
|
||
name: str
|
||
description: str
|
||
prompt: str
|
||
tools: list[str] = []
|
||
skill_dir: str = ""
|
||
scripts: list[str] = []
|
||
is_openclaw: bool = False
|
||
|
||
class SkillRegistry:
|
||
def __init__(self):
|
||
self._skills: dict[str, SkillDef] = {}
|
||
|
||
def register(self, skill: SkillDef):
|
||
self._skills[skill.name] = skill
|
||
|
||
def get(self, name: str) -> SkillDef | None:
|
||
return self._skills.get(name)
|
||
|
||
def list_skills(self) -> list[SkillDef]:
|
||
return list(self._skills.values())
|
||
|
||
def format_list(self) -> str:
|
||
lines = []
|
||
for s in self._skills.values():
|
||
tag = " [OpenClaw]" if s.is_openclaw else ""
|
||
lines.append(f" - {s.name}: {s.description}{tag}")
|
||
return "\n".join(lines)
|
||
|
||
|
||
def _parse_skill_md_frontmatter(content: str) -> dict:
|
||
if not content.startswith("---"):
|
||
return {}
|
||
end = content.find("---", 3)
|
||
if end == -1:
|
||
return {}
|
||
return yaml.safe_load(content[3:end].strip()) or {}
|
||
|
||
|
||
def scan_skills() -> SkillRegistry:
|
||
registry = SkillRegistry()
|
||
if not SKILLS_DIR.exists():
|
||
return registry
|
||
|
||
for yaml_file in sorted(SKILLS_DIR.glob("*.yaml")):
|
||
try:
|
||
with open(yaml_file, "r", encoding="utf-8") as f:
|
||
data = yaml.safe_load(f)
|
||
registry.register(SkillDef(
|
||
name=data["name"], description=data.get("description", ""),
|
||
prompt=data.get("prompt", ""), tools=data.get("tools", []),
|
||
))
|
||
print(f" [技能] {yaml_file.name}: {data['name']}")
|
||
except Exception as e:
|
||
print(f" [技能] {yaml_file.name}: 加载失败 -> {e}")
|
||
|
||
for skill_dir in sorted(SKILLS_DIR.iterdir()):
|
||
if not skill_dir.is_dir():
|
||
continue
|
||
skill_md = skill_dir / "SKILL.md"
|
||
if not skill_md.exists():
|
||
continue
|
||
try:
|
||
with open(skill_md, "r", encoding="utf-8") as f:
|
||
content = f.read()
|
||
fm = _parse_skill_md_frontmatter(content)
|
||
if not fm:
|
||
continue
|
||
name = fm.get("name", skill_dir.name)
|
||
description = fm.get("description", "")
|
||
body_start = content.find("---", 3)
|
||
prompt = content[body_start + 3:].strip() if body_start != -1 else ""
|
||
scripts_dir = skill_dir / "scripts"
|
||
scripts, tools = [], []
|
||
if scripts_dir.exists():
|
||
for sf in sorted(scripts_dir.glob("*.py")):
|
||
if not sf.name.startswith("_"):
|
||
scripts.append(str(sf))
|
||
tools.append(sf.stem)
|
||
registry.register(SkillDef(
|
||
name=name, description=description, prompt=prompt, tools=tools,
|
||
skill_dir=str(skill_dir), scripts=scripts, is_openclaw=True,
|
||
))
|
||
print(f" [技能] {skill_dir.name}/ (OpenClaw): {name}")
|
||
except Exception as e:
|
||
print(f" [技能] {skill_dir.name}/SKILL.md: 加载失败 -> {e}")
|
||
|
||
return registry
|
||
|
||
|
||
# ════════════════════════════════════════════
|
||
# MCP 管理器
|
||
# ════════════════════════════════════════════
|
||
class MCPManager:
|
||
def __init__(self):
|
||
self.exit_stack = AsyncExitStack()
|
||
self.sessions: dict[str, object] = {}
|
||
self.route_map: dict[str, tuple] = {}
|
||
self.mcp_tools: list = []
|
||
|
||
async def connect_all(self, mcp_configs: list[dict]):
|
||
from langchain_mcp_adapters.tools import load_mcp_tools
|
||
from mcp.client.stdio import stdio_client, StdioServerParameters
|
||
from mcp.client.session import ClientSession
|
||
|
||
for srv in mcp_configs:
|
||
name = srv["name"]
|
||
try:
|
||
sp = StdioServerParameters(command=srv["command"], args=srv.get("args", []))
|
||
r, w = await self.exit_stack.enter_async_context(stdio_client(sp))
|
||
session = await self.exit_stack.enter_async_context(ClientSession(r, w))
|
||
await session.initialize()
|
||
mcp_tools = await load_mcp_tools(session)
|
||
self.mcp_tools.extend(mcp_tools)
|
||
self.sessions[name] = session
|
||
route_kw = srv.get("route_keywords", {})
|
||
for tool_name, keywords in route_kw.items():
|
||
for kw in keywords:
|
||
self.route_map[kw] = (session, tool_name)
|
||
print(f" [MCP] {name}: 已连接,{len(mcp_tools)} 个工具")
|
||
except Exception as e:
|
||
print(f" [MCP] {name}: 连接失败 -> {e}")
|
||
|
||
def match_route(self, user_input: str) -> tuple | None:
|
||
for keyword, (session, tool_name) in self.route_map.items():
|
||
if keyword in user_input:
|
||
return (session, tool_name)
|
||
return None
|
||
|
||
async def call_tool(self, session, tool_name: str, user_input: str) -> str:
|
||
try:
|
||
args = {}
|
||
for t in self.mcp_tools:
|
||
if t.name == tool_name:
|
||
schema = getattr(t, "args_schema", None)
|
||
if schema:
|
||
if isinstance(schema, dict):
|
||
sf = schema.get("properties", {})
|
||
elif hasattr(schema, "model_json_schema"):
|
||
sf = schema.model_json_schema().get("properties", {})
|
||
else:
|
||
sf = {}
|
||
args = _parse_tool_args(tool_name, sf, user_input)
|
||
break
|
||
result = await session.call_tool(tool_name, args)
|
||
if result.content:
|
||
texts = [c.text for c in result.content if hasattr(c, "text")]
|
||
return "\n".join(texts) if texts else str(result)
|
||
return str(result)
|
||
except Exception as e:
|
||
return f"[MCP工具{tool_name}错误] {e}"
|
||
|
||
async def close(self):
|
||
await self.exit_stack.aclose()
|
||
self.sessions.clear()
|
||
|
||
|
||
def _parse_tool_args(tool_name: str, schema_fields: dict, user_input: str) -> dict:
|
||
args = {}
|
||
for field_name, field_info in schema_fields.items():
|
||
if field_name in ("timezone",):
|
||
args[field_name] = "Asia/Shanghai"
|
||
elif field_name in ("text",):
|
||
match = re.search(r"""['"\u201c\u201d](.+?)['"\u201c\u201d]""", user_input)
|
||
args[field_name] = match.group(1) if match else user_input
|
||
elif field_name in ("city",):
|
||
args[field_name] = user_input
|
||
return args
|
||
|
||
|
||
# ════════════════════════════════════════════
|
||
# ★ 核心:多 Agent 架构 v4.0 — Command Handoff
|
||
# ════════════════════════════════════════════
|
||
|
||
# --- 共享状态 ---
|
||
class SharedState(TypedDict):
|
||
"""多Agent共享状态
|
||
注意:并行场景下多个节点同时更新state,只有带reducer的字段能安全并发。
|
||
非reducer字段(如handoff_from)在并行模式中不写入。
|
||
"""
|
||
messages: Annotated[list, add_messages] # 对话消息(累加)
|
||
subtasks: list[dict] # 分解的子任务(并行场景用)
|
||
results: Annotated[list[str], add] # Agent执行结果(累加聚合)
|
||
active_agent: str # 当前活跃Agent
|
||
handoff_from: str # 从哪个Agent交接过来的
|
||
handoff_context: str # 交接时附带的上下文
|
||
handoff_history: Annotated[list[str], add] # 交接历史记录(累加)
|
||
final_answer: str # 最终回复
|
||
|
||
|
||
class SubTaskState(TypedDict):
|
||
"""子任务状态(Send API用)"""
|
||
task: dict
|
||
messages: Annotated[list, add_messages]
|
||
|
||
|
||
# --- Agent 定义 ---
|
||
class AgentDef(BaseModel):
|
||
"""Agent定义"""
|
||
name: str # 节点名,也是图中的node key
|
||
description: str
|
||
system_prompt: str
|
||
tools: list[str] = [] # 依赖的业务工具名
|
||
skill: str = "" # 依赖的技能名
|
||
|
||
class AgentRegistry:
|
||
"""Agent注册表"""
|
||
def __init__(self):
|
||
self._agents: dict[str, AgentDef] = {}
|
||
|
||
def register(self, agent: AgentDef):
|
||
self._agents[agent.name] = agent
|
||
|
||
def get(self, name: str) -> AgentDef | None:
|
||
return self._agents.get(name)
|
||
|
||
def list_agents(self) -> list[AgentDef]:
|
||
return list(self._agents.values())
|
||
|
||
def format_list(self) -> str:
|
||
return "\n".join(f" - {a.name}: {a.description}" for a in self._agents.values())
|
||
|
||
def match(self, task_type: str) -> AgentDef | None:
|
||
if task_type in self._agents:
|
||
return self._agents[task_type]
|
||
for agent in self._agents.values():
|
||
if agent.name in task_type or task_type in agent.name:
|
||
return agent
|
||
return None
|
||
|
||
|
||
def init_agents(skills_reg: SkillRegistry, tools_list: list) -> AgentRegistry:
|
||
"""初始化内置Agent"""
|
||
registry = AgentRegistry()
|
||
|
||
registry.register(AgentDef(
|
||
name="weather_agent",
|
||
description="天气专家 - 查询天气、出行建议",
|
||
system_prompt="你是天气专家。根据天气数据给出专业的出行建议,包括穿衣、活动安排等。如果用户问的问题超出天气范围,请使用handoff工具将任务交接给合适的Agent。",
|
||
tools=["get_weather"],
|
||
skill="weather_analyst",
|
||
))
|
||
|
||
registry.register(AgentDef(
|
||
name="math_agent",
|
||
description="数学专家 - 计算、数学问题解答",
|
||
system_prompt="你是数学专家。解答数学问题,给出计算过程和原理解释。如果用户问的问题超出数学范围,请使用handoff工具将任务交接给合适的Agent。",
|
||
tools=["calculate"],
|
||
skill="math_tutor",
|
||
))
|
||
|
||
registry.register(AgentDef(
|
||
name="knowledge_agent",
|
||
description="知识专家 - 搜索知识、深入解释概念",
|
||
system_prompt="你是知识专家。搜索知识库,给出深入浅出的解释,结构化呈现信息。如果用户问的问题需要其他专业Agent处理,请使用handoff工具交接。",
|
||
tools=["search_knowledge"],
|
||
skill="knowledge_explorer",
|
||
))
|
||
|
||
registry.register(AgentDef(
|
||
name="general_agent",
|
||
description="通用助手 - 处理一般对话和简单问题",
|
||
system_prompt="你是黄庄三号通用助手。处理一般对话、问候和简单问题。如果问题需要专业Agent处理,请使用handoff工具交接。",
|
||
tools=[],
|
||
skill="",
|
||
))
|
||
|
||
registry.register(AgentDef(
|
||
name="mcp_agent",
|
||
description="MCP工具调用 - 时间查询、字符统计、UUID生成等",
|
||
system_prompt="你是MCP工具调用专家。通过MCP协议调用外部工具获取实时数据。如果用户请求超出MCP工具范围,请使用handoff工具交接给合适的Agent。",
|
||
tools=[],
|
||
skill="",
|
||
))
|
||
|
||
return registry
|
||
|
||
|
||
# ════════════════════════════════════════════
|
||
# Handoff 工具工厂 — 为每个目标Agent创建 transfer 工具
|
||
# ════════════════════════════════════════════
|
||
def create_handoff_tools(agent_registry: AgentRegistry, source_agent: str) -> list:
|
||
"""为 source_agent 创建一组 handoff 工具,每个工具返回 Command(goto=目标节点)
|
||
|
||
关键:工具返回 Command 对象,LangGraph 的 ToolNode 会识别并执行跳转。
|
||
"""
|
||
from langchain_core.tools import StructuredTool
|
||
|
||
tools = []
|
||
for target in agent_registry.list_agents():
|
||
if target.name == source_agent:
|
||
continue # 不创建到自己的handoff
|
||
|
||
target_name = target.name
|
||
target_desc = target.description
|
||
|
||
# 用闭包捕获 target_name
|
||
def _make_handoff(t_name, t_desc):
|
||
# 预构造描述字符串(不用f-string做docstring)
|
||
_tool_desc = f"将任务交接给 {t_name}({t_desc})。当你无法处理当前问题,或问题属于该Agent的专业领域时使用。task_description必须包含完整上下文信息。"
|
||
_tool_name = f"transfer_to_{t_name}"
|
||
|
||
def transfer_tool_func(task_description: str) -> str:
|
||
"""将任务交接给另一个Agent处理。"""
|
||
# 返回一个特殊标记,在自定义tool执行器中转为Command
|
||
return json.dumps({
|
||
"__handoff__": True,
|
||
"target": t_name,
|
||
"task_description": task_description,
|
||
})
|
||
|
||
# 用 StructuredTool 手动构造,避免 docstring 问题
|
||
tool_obj = StructuredTool.from_function(
|
||
func=transfer_tool_func,
|
||
name=_tool_name,
|
||
description=_tool_desc,
|
||
)
|
||
return tool_obj
|
||
|
||
tools.append(_make_handoff(target_name, target_desc))
|
||
|
||
return tools
|
||
|
||
|
||
# ════════════════════════════════════════════
|
||
# Supervisor 节点 — 分析任务 + 更新state(不走Command,由conditional_edges路由)
|
||
# ════════════════════════════════════════════
|
||
def make_supervisor_node(config, agent_registry, skills_reg, tools_list, mcp_mgr):
|
||
llm_cfg = config["llm"]
|
||
|
||
SUPERVISOR_PROMPT = """你是黄庄三号的任务协调者(Supervisor)。你的职责是:
|
||
|
||
1. 分析用户请求,判断应该由哪个Agent处理
|
||
2. 如果需要多个Agent并行协作,输出JSON格式的子任务列表
|
||
3. 如果只需单个Agent处理,指定单个Agent
|
||
4. 如果是简单对话,直接回复
|
||
|
||
可用Agent:
|
||
{agent_list}
|
||
|
||
MCP工具(当用户请求匹配这些关键词时,指定agent为"mcp_agent"):
|
||
- 时间/几点/当前时间 → mcp_agent (get_current_time)
|
||
- 统计字符/字符数 → mcp_agent (count_chars)
|
||
- 生成UUID → mcp_agent (generate_uuid)
|
||
|
||
重要规则:
|
||
- 你的名字是"黄庄三号"
|
||
- 对于简单问候,直接回复,不要分配任务
|
||
- 输出格式必须是严格的JSON:
|
||
- 多任务: {{"mode": "parallel", "subtasks": [{{"agent": "agent名", "query": "具体任务"}}]}}
|
||
- 单任务: {{"mode": "single", "agent": "agent名", "query": "具体任务"}}
|
||
- 直接回复: {{"mode": "direct", "answer": "你的回复"}}
|
||
- 只输出JSON,不要其他内容"""
|
||
|
||
async def supervisor_node(state: SharedState) -> dict:
|
||
user_msg = ""
|
||
for msg in reversed(state["messages"]):
|
||
if isinstance(msg, HumanMessage):
|
||
user_msg = msg.content
|
||
break
|
||
|
||
llm = ChatOpenAI(
|
||
base_url=llm_cfg["base_url"],
|
||
api_key=llm_cfg["api_key"],
|
||
model=llm_cfg["model"],
|
||
temperature=0.1,
|
||
)
|
||
|
||
prompt = SUPERVISOR_PROMPT.format(agent_list=agent_registry.format_list())
|
||
messages = [SystemMessage(content=prompt), *state["messages"][-6:]]
|
||
|
||
resp = await llm.ainvoke(messages)
|
||
|
||
content = resp.content.strip()
|
||
if content.startswith("```"):
|
||
content = re.sub(r'^```\w*\n?', '', content)
|
||
content = re.sub(r'\n?```$', '', content)
|
||
content = content.strip()
|
||
|
||
try:
|
||
plan = json.loads(content)
|
||
except json.JSONDecodeError:
|
||
plan = {"mode": "direct", "answer": resp.content}
|
||
|
||
mode = plan.get("mode", "direct")
|
||
|
||
if mode == "direct":
|
||
answer = plan.get("answer", resp.content)
|
||
return {
|
||
"messages": [AIMessage(content=answer)],
|
||
"final_answer": answer,
|
||
"subtasks": [],
|
||
"active_agent": "",
|
||
}
|
||
elif mode == "single":
|
||
agent_name = plan.get("agent", "general_agent")
|
||
query = plan.get("query", user_msg)
|
||
return {
|
||
"active_agent": agent_name,
|
||
"subtasks": [],
|
||
"handoff_from": "supervisor",
|
||
"handoff_context": query,
|
||
"handoff_history": [f"supervisor → {agent_name}: {query}"],
|
||
}
|
||
elif mode == "parallel":
|
||
subtasks = plan.get("subtasks", [])
|
||
return {
|
||
"subtasks": subtasks,
|
||
"active_agent": "",
|
||
"handoff_history": [f"supervisor → parallel: {json.dumps(subtasks, ensure_ascii=False)}"],
|
||
}
|
||
else:
|
||
return {"final_answer": resp.content, "active_agent": "", "subtasks": []}
|
||
|
||
return supervisor_node
|
||
|
||
|
||
# ════════════════════════════════════════════
|
||
# Agent 节点工厂 — 每个Agent是独立节点
|
||
# ════════════════════════════════════════════
|
||
def make_agent_node(config, agent_def: AgentDef, agent_registry: AgentRegistry,
|
||
skills_reg: SkillRegistry, tools_list: list, mcp_mgr):
|
||
"""创建一个Agent节点函数,该Agent可以:
|
||
1. 调用业务工具(天气、计算器等)
|
||
2. 调用MCP工具(如果有)
|
||
3. 通过handoff工具交接给其他Agent(返回Command)
|
||
"""
|
||
llm_cfg = config["llm"]
|
||
agent_cfg = config.get("agent", {})
|
||
temp = agent_cfg.get("skill_temperature", 0.7)
|
||
max_iter = agent_cfg.get("max_iterations", 5)
|
||
|
||
# 准备这个Agent可用的工具:业务工具 + handoff工具
|
||
agent_tools = []
|
||
for tname in agent_def.tools:
|
||
for t in tools_list:
|
||
if t.name == tname:
|
||
agent_tools.append(t)
|
||
|
||
handoff_tools = create_handoff_tools(agent_registry, agent_def.name)
|
||
agent_tools.extend(handoff_tools)
|
||
|
||
async def agent_node(state: SharedState) -> Command:
|
||
# 获取当前任务上下文
|
||
handoff_from = state.get("handoff_from", "")
|
||
handoff_context = state.get("handoff_context", "")
|
||
handoff_history = state.get("handoff_history", [])
|
||
|
||
# 构造系统提示
|
||
system_content = agent_def.system_prompt
|
||
|
||
# 如果有handoff上下文,注入
|
||
if handoff_from and handoff_context:
|
||
system_content += f"\n\n[交接信息] 你正在处理从 {handoff_from} 交接过来的任务:{handoff_context}"
|
||
|
||
if handoff_history:
|
||
history_str = " → ".join(handoff_history)
|
||
system_content += f"\n[交接链路] {history_str}"
|
||
|
||
# 提取用户消息
|
||
user_msg = ""
|
||
for msg in reversed(state["messages"]):
|
||
if isinstance(msg, HumanMessage):
|
||
user_msg = msg.content
|
||
break
|
||
if not user_msg:
|
||
user_msg = handoff_context
|
||
|
||
# 执行业务工具(预执行模式:直接调用获取结果)
|
||
tool_info = ""
|
||
for tname in agent_def.tools:
|
||
for t in tools_list:
|
||
if t.name == tname:
|
||
try:
|
||
if tname == "get_weather":
|
||
cities = ["北京", "上海", "深圳", "黄庄"]
|
||
city = next((c for c in cities if c in user_msg), "北京")
|
||
r = await t.ainvoke({"city": city})
|
||
elif tname == "calculate":
|
||
expr = re.findall(r'[\d+\-*/(). ]+', user_msg)
|
||
r = await t.ainvoke({"expression": expr[0].strip() if expr else "1+1"})
|
||
else:
|
||
r = await t.ainvoke({"query": user_msg})
|
||
tool_info += f"\n工具{tname}结果: {r}"
|
||
except Exception as e:
|
||
tool_info += f"\n工具{tname}错误: {e}"
|
||
|
||
# MCP工具
|
||
if mcp_mgr and user_msg:
|
||
route = mcp_mgr.match_route(user_msg)
|
||
if route:
|
||
session, tool_name = route
|
||
mcp_result = await mcp_mgr.call_tool(session, tool_name, user_msg)
|
||
tool_info += f"\nMCP工具{tool_name}结果: {mcp_result}"
|
||
|
||
# 执行技能脚本(OpenClaw)
|
||
if agent_def.skill:
|
||
sk = skills_reg.get(agent_def.skill)
|
||
if sk and sk.is_openclaw and sk.scripts:
|
||
for script_path in sk.scripts:
|
||
try:
|
||
proc = await asyncio.create_subprocess_exec(
|
||
"python3", script_path, user_msg,
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE,
|
||
)
|
||
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=30)
|
||
output = stdout.decode("utf-8", errors="replace").strip()
|
||
if output:
|
||
tool_info += f"\n[脚本输出]\n{output[:2000]}"
|
||
except Exception as e:
|
||
tool_info += f"\n[脚本错误] {e}"
|
||
|
||
# 构造技能提示
|
||
skill_prompt = ""
|
||
if agent_def.skill:
|
||
sk = skills_reg.get(agent_def.skill)
|
||
if sk and not sk.is_openclaw:
|
||
skill_prompt = sk.prompt.format(input=user_msg)
|
||
|
||
# 构造消息
|
||
messages = [SystemMessage(content=system_content)]
|
||
if skill_prompt:
|
||
messages.append(SystemMessage(content=f"技能指导:{skill_prompt}"))
|
||
if tool_info:
|
||
messages.append(SystemMessage(content=f"工具/脚本提供的数据:{tool_info}"))
|
||
# 添加历史消息(最近几条)
|
||
messages.extend(state["messages"][-6:])
|
||
# 如果历史中没有用户消息,补充一条
|
||
if not any(isinstance(m, HumanMessage) for m in state["messages"][-6:]):
|
||
messages.append(HumanMessage(content=user_msg))
|
||
|
||
# LLM调用(带handoff工具绑定)
|
||
llm = ChatOpenAI(
|
||
base_url=llm_cfg["base_url"],
|
||
api_key=llm_cfg["api_key"],
|
||
model=llm_cfg["model"],
|
||
temperature=temp,
|
||
)
|
||
|
||
# 先尝试不带工具的简单调用(如果模型不支持复杂工具调用)
|
||
# 直接让LLM回答,如果需要handoff,在回答中标注
|
||
resp = await llm.ainvoke(messages)
|
||
answer = resp.content
|
||
|
||
# 检查LLM是否在回答中表达了需要handoff的意图
|
||
# 用第二轮调用判断是否需要交接
|
||
handoff_check_prompt = f"""你是任务路由判断器。根据以下Agent的回答,判断该Agent是否需要将任务交接给其他Agent。
|
||
|
||
当前Agent: {agent_def.name}({agent_def.description})
|
||
Agent回答: {answer[:500]}
|
||
|
||
可用Agent:
|
||
{agent_registry.format_list()}
|
||
|
||
判断规则:
|
||
- 如果Agent已经完整回答了问题,不需要交接,输出: {{"handoff": false}}
|
||
- 如果Agent表示自己无法处理、问题超出其专业范围、或建议由其他Agent处理,输出: {{"handoff": true, "target": "目标agent名", "reason": "原因"}}
|
||
- 只输出JSON,不要其他内容"""
|
||
|
||
handoff_resp = await llm.ainvoke([
|
||
SystemMessage(content=handoff_check_prompt),
|
||
HumanMessage(content="请判断"),
|
||
])
|
||
|
||
handoff_content = handoff_resp.content.strip()
|
||
if handoff_content.startswith("```"):
|
||
handoff_content = re.sub(r'^```\w*\n?', '', handoff_content)
|
||
handoff_content = re.sub(r'\n?```$', '', handoff_content)
|
||
handoff_content = handoff_content.strip()
|
||
|
||
try:
|
||
handoff_decision = json.loads(handoff_content)
|
||
except json.JSONDecodeError:
|
||
handoff_decision = {"handoff": False}
|
||
|
||
if handoff_decision.get("handoff") and handoff_decision.get("target"):
|
||
target = handoff_decision["target"]
|
||
reason = handoff_decision.get("reason", "")
|
||
task_desc = f"从{agent_def.name}交接: {reason}。原始问题: {user_msg}"
|
||
|
||
return Command(
|
||
goto=target,
|
||
update={
|
||
"active_agent": target,
|
||
"handoff_from": agent_def.name,
|
||
"handoff_context": task_desc,
|
||
"handoff_history": [f"{agent_def.name} → {target}: {reason}"],
|
||
"messages": [AIMessage(content=f"[{agent_def.name}交接给{target}] {reason}")],
|
||
},
|
||
)
|
||
|
||
# 不需要handoff,直接返回结果
|
||
return Command(
|
||
goto=END,
|
||
update={
|
||
"final_answer": answer,
|
||
"results": [f"[{agent_def.name}] {answer}"],
|
||
"messages": [AIMessage(content=answer)],
|
||
},
|
||
)
|
||
|
||
return agent_node
|
||
|
||
|
||
# ════════════════════════════════════════════
|
||
# Aggregator 节点(并行结果聚合)
|
||
# ════════════════════════════════════════════
|
||
def make_aggregator_node(config):
|
||
llm_cfg = config["llm"]
|
||
|
||
async def aggregator_node(state: SharedState) -> dict:
|
||
results = state.get("results", [])
|
||
if not results:
|
||
return {"final_answer": "所有Agent已完成,但无结果"}
|
||
|
||
if len(results) == 1:
|
||
content = re.sub(r'^\[.+?\]\s*', '', results[0])
|
||
return {
|
||
"final_answer": content,
|
||
"messages": [AIMessage(content=content)],
|
||
}
|
||
|
||
combined = "\n\n---\n\n".join(results)
|
||
agg_llm = ChatOpenAI(
|
||
base_url=llm_cfg["base_url"],
|
||
api_key=llm_cfg["api_key"],
|
||
model=llm_cfg["model"],
|
||
temperature=0.5,
|
||
)
|
||
|
||
resp = await agg_llm.ainvoke([
|
||
SystemMessage(content="你是黄庄三号。请将以下多个专业Agent的结果整合为一个连贯、结构化的回答。保留关键信息,去除冗余。"),
|
||
HumanMessage(content=f"多个Agent的结果:\n\n{combined}"),
|
||
])
|
||
|
||
return {
|
||
"final_answer": resp.content,
|
||
"messages": [AIMessage(content=resp.content)],
|
||
}
|
||
|
||
return aggregator_node
|
||
|
||
|
||
# ════════════════════════════════════════════
|
||
# 并行 Worker 节点(Send API用,只写results,不写非reducer字段)
|
||
# ════════════════════════════════════════════
|
||
def make_parallel_worker_node(config, agent_registry, skills_reg, tools_list, mcp_mgr):
|
||
"""并行Worker:接收SubTaskState,执行后只写results(安全并发)"""
|
||
llm_cfg = config["llm"]
|
||
agent_cfg = config.get("agent", {})
|
||
temp = agent_cfg.get("skill_temperature", 0.7)
|
||
|
||
async def parallel_worker(state: SubTaskState) -> dict:
|
||
task = state["task"]
|
||
agent_name = task.get("agent", "general_agent")
|
||
query = task.get("query", "")
|
||
|
||
agent_def = agent_registry.get(agent_name)
|
||
if not agent_def:
|
||
agent_def = agent_registry.get("general_agent")
|
||
|
||
# 执行业务工具
|
||
tool_info = ""
|
||
for tname in agent_def.tools:
|
||
for t in tools_list:
|
||
if t.name == tname:
|
||
try:
|
||
if tname == "get_weather":
|
||
cities = ["北京", "上海", "深圳", "黄庄"]
|
||
city = next((c for c in cities if c in query), "北京")
|
||
r = await t.ainvoke({"city": city})
|
||
elif tname == "calculate":
|
||
expr = re.findall(r'[\d+\-*/(). ]+', query)
|
||
r = await t.ainvoke({"expression": expr[0].strip() if expr else "1+1"})
|
||
else:
|
||
r = await t.ainvoke({"query": query})
|
||
tool_info += f"\n工具{tname}结果: {r}"
|
||
except Exception as e:
|
||
tool_info += f"\n工具{tname}错误: {e}"
|
||
|
||
# MCP工具
|
||
if mcp_mgr and query:
|
||
route = mcp_mgr.match_route(query)
|
||
if route:
|
||
session, tool_name = route
|
||
mcp_result = await mcp_mgr.call_tool(session, tool_name, query)
|
||
tool_info += f"\nMCP工具{tool_name}结果: {mcp_result}"
|
||
|
||
# 构造提示词
|
||
prompt = agent_def.system_prompt
|
||
if agent_def.skill:
|
||
sk = skills_reg.get(agent_def.skill)
|
||
if sk and not sk.is_openclaw:
|
||
prompt = sk.prompt.format(input=query)
|
||
|
||
worker_llm = ChatOpenAI(
|
||
base_url=llm_cfg["base_url"],
|
||
api_key=llm_cfg["api_key"],
|
||
model=llm_cfg["model"],
|
||
temperature=temp,
|
||
)
|
||
|
||
messages = [
|
||
SystemMessage(content=prompt),
|
||
HumanMessage(content=query),
|
||
]
|
||
if tool_info:
|
||
messages.append(SystemMessage(content=f"工具/脚本提供的数据:{tool_info}"))
|
||
|
||
resp = await worker_llm.ainvoke(messages)
|
||
|
||
return {"results": [f"[{agent_def.name}] {resp.content}"]}
|
||
|
||
return parallel_worker
|
||
|
||
|
||
# ════════════════════════════════════════════
|
||
# 构建多Agent图
|
||
# ════════════════════════════════════════════
|
||
async def build_graph(config, skills_reg, mcp_mgr, tools_list):
|
||
agent_registry = init_agents(skills_reg, tools_list)
|
||
all_agent_names = [a.name for a in agent_registry.list_agents()]
|
||
|
||
# 创建 Supervisor
|
||
supervisor = make_supervisor_node(config, agent_registry, skills_reg, tools_list, mcp_mgr)
|
||
|
||
# 创建各Agent节点(串行/单任务路径,支持Command handoff)
|
||
agent_nodes = {}
|
||
for agent_def in agent_registry.list_agents():
|
||
agent_nodes[agent_def.name] = make_agent_node(
|
||
config, agent_def, agent_registry, skills_reg, tools_list, mcp_mgr
|
||
)
|
||
|
||
# 创建并行Worker节点(并行路径,只写results,安全并发)
|
||
parallel_worker = make_parallel_worker_node(config, agent_registry, skills_reg, tools_list, mcp_mgr)
|
||
aggregator = make_aggregator_node(config)
|
||
|
||
# 构建图
|
||
g = StateGraph(SharedState)
|
||
|
||
# 添加节点
|
||
g.add_node("supervisor", supervisor)
|
||
for name, node_fn in agent_nodes.items():
|
||
g.add_node(name, node_fn)
|
||
g.add_node("parallel_worker", parallel_worker) # 并行Worker(SubTaskState)
|
||
g.add_node("aggregator", aggregator)
|
||
|
||
# 入口
|
||
g.add_edge(START, "supervisor")
|
||
|
||
# ── Supervisor 路由(conditional_edges) ──
|
||
def route_from_supervisor(state: SharedState):
|
||
"""根据 Supervisor 输出的 state 决定路由:
|
||
- final_answer 非空 → END
|
||
- subtasks 有多个 → Send 并行分发到 parallel_worker
|
||
- active_agent 非空 → 跳到对应 Agent 节点
|
||
"""
|
||
final_answer = state.get("final_answer", "")
|
||
if final_answer:
|
||
return "end"
|
||
|
||
subtasks = state.get("subtasks", [])
|
||
if subtasks and len(subtasks) > 1:
|
||
# 并行模式:用 Send API 分发
|
||
return [
|
||
Send("parallel_worker", {"task": t, "messages": state["messages"][-4:]})
|
||
for t in subtasks
|
||
]
|
||
|
||
active_agent = state.get("active_agent", "")
|
||
if active_agent and active_agent in all_agent_names:
|
||
return active_agent
|
||
|
||
return "end"
|
||
|
||
# 构建path_map
|
||
path_map = {"end": END}
|
||
for name in all_agent_names:
|
||
path_map[name] = name
|
||
|
||
g.add_conditional_edges("supervisor", route_from_supervisor, path_map)
|
||
|
||
# 并行Worker → 聚合器
|
||
g.add_edge("parallel_worker", "aggregator")
|
||
g.add_edge("aggregator", END)
|
||
|
||
# 注意:各Agent节点使用 Command(goto=...) 进行 handoff,
|
||
# LangGraph 自动处理 Command 路由,不需要额外添加边。
|
||
|
||
return g.compile()
|
||
|
||
|
||
# ════════════════════════════════════════════
|
||
# 运行入口
|
||
# ════════════════════════════════════════════
|
||
all_tools = []
|
||
skills_registry = SkillRegistry()
|
||
mcp_manager = None
|
||
|
||
|
||
async def run_agent(user_input: str, graph):
|
||
result = await graph.ainvoke({
|
||
"messages": [HumanMessage(content=user_input)],
|
||
"subtasks": [], "results": [], "active_agent": "",
|
||
"handoff_from": "", "handoff_context": "",
|
||
"handoff_history": [], "final_answer": "",
|
||
})
|
||
last = result["messages"][-1]
|
||
return {
|
||
"reply": last.content if hasattr(last, "content") else str(last),
|
||
"subtasks": result.get("subtasks", []),
|
||
"results_count": len(result.get("results", [])),
|
||
"handoff_history": result.get("handoff_history", []),
|
||
"final_answer": result.get("final_answer", ""),
|
||
}
|
||
|
||
|
||
async def interactive_mode(graph):
|
||
print("=" * 60)
|
||
print(" 黄庄三号 Agent v4.0 - Command Handoff交互版")
|
||
print(" Supervisor + 独立Agent节点 + Agent间Handoff")
|
||
print("=" * 60)
|
||
print(" 输入 quit 退出")
|
||
print("=" * 60)
|
||
|
||
while True:
|
||
try:
|
||
user_input = input("\n你> ").strip()
|
||
except (EOFError, KeyboardInterrupt):
|
||
break
|
||
if not user_input or user_input.lower() in ("quit", "exit", "q"):
|
||
break
|
||
|
||
result = await run_agent(user_input, graph)
|
||
if result["subtasks"]:
|
||
agents = [t.get("agent", "?") for t in result["subtasks"]]
|
||
print(f"\n[调度] 分配给: {', '.join(agents)}")
|
||
if result["handoff_history"]:
|
||
print(f"[交接] {' → '.join(result['handoff_history'])}")
|
||
if result["results_count"] > 1:
|
||
print(f"[聚合] 合并 {result['results_count']} 个Agent结果")
|
||
print(f"\n黄庄三号> {result['reply']}")
|
||
|
||
|
||
async def main():
|
||
global all_tools, skills_registry, mcp_manager
|
||
|
||
parser = argparse.ArgumentParser(description="黄庄三号 Agent v4.0")
|
||
parser.add_argument("--mcp", action="store_true", help="启用MCP")
|
||
parser.add_argument("--test", action="store_true", help="自动测试")
|
||
args = parser.parse_args()
|
||
|
||
print("=" * 60)
|
||
print(" 黄庄三号 Agent v4.0 - Command Handoff交互版")
|
||
print("=" * 60)
|
||
|
||
# 加载配置
|
||
print("\n[配置] 加载 config.yaml ...")
|
||
config = load_config()
|
||
|
||
# 扫描工具
|
||
print("\n[工具] 扫描 tools/ ...")
|
||
all_tools = scan_tools()
|
||
print(f" 工具总数: {len(all_tools)}")
|
||
|
||
# 扫描技能
|
||
print("\n[技能] 扫描 skills/ ...")
|
||
skills_registry = scan_skills()
|
||
print(f" 技能总数: {len(skills_registry.list_skills())}")
|
||
|
||
# 连接MCP
|
||
if args.mcp and config.get("mcp_servers"):
|
||
print("\n[MCP] 连接服务器 ...")
|
||
mcp_manager = MCPManager()
|
||
await mcp_manager.connect_all(config["mcp_servers"])
|
||
|
||
# 初始化Agent
|
||
agent_registry = init_agents(skills_registry, all_tools)
|
||
print(f"\n[Agent] 已注册 {len(agent_registry.list_agents())} 个Agent:")
|
||
for a in agent_registry.list_agents():
|
||
print(f" - {a.name}: {a.description}")
|
||
|
||
# 构建图
|
||
print("\n[构建] 多Agent图(Command Handoff版) ...")
|
||
graph = await build_graph(config, skills_registry, mcp_manager, all_tools)
|
||
|
||
if args.test:
|
||
tests = [
|
||
("单Agent:天气", "黄庄天气怎么样?"),
|
||
("单Agent:数学", "算一下 99*88+77"),
|
||
("单Agent:知识", "MCP是什么?"),
|
||
("直接回复", "你好你是谁?"),
|
||
("多Agent并行", "帮我查一下北京和上海的天气,再算一下123+456"),
|
||
("Handoff测试:天气→数学", "帮我查一下北京天气,顺便算算100*200"),
|
||
]
|
||
|
||
if args.mcp and mcp_manager:
|
||
tests.append(("MCP:时间", "现在几点了?"))
|
||
|
||
for label, query in tests:
|
||
print(f"\n{'─'*55}")
|
||
print(f"[测试:{label}] {query}")
|
||
try:
|
||
r = await run_agent(query, graph)
|
||
if r["subtasks"]:
|
||
agents = [t.get("agent", "?") for t in r["subtasks"]]
|
||
print(f" 调度: {', '.join(agents)}")
|
||
if r["handoff_history"]:
|
||
print(f" 交接: {' → '.join(r['handoff_history'])}")
|
||
if r["results_count"] > 1:
|
||
print(f" 聚合: {r['results_count']} 个Agent结果")
|
||
print(f" 回复: {r['reply'][:200]}")
|
||
except Exception as e:
|
||
print(f" 错误: {e}")
|
||
|
||
print(f"\n{'='*60}")
|
||
print(" 多Agent + Handoff 测试完成!")
|
||
print("=" * 60)
|
||
else:
|
||
await interactive_mode(graph)
|
||
|
||
if mcp_manager:
|
||
await mcp_manager.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|