commit 05950a3c8488645d0e820afb522c1a67ffccf83f Author: hubian <908234780@qq.com> Date: Sun Apr 12 01:54:15 2026 +0800 feat: 多智能体竞标调度系统 v1.0.0 核心组件: - Orchestrator: 意图理解、任务拆分、竞标管理、结果验证 - Worker: 竞标任务、执行交付 - TaskBoard: 状态管理、信息存储 - BidEvaluator: 竞标评估算法 - ExecutionMonitor: 执行监控、超时处理 - LLMClient: 大模型接口调用 功能特性: - 竞标机制:Agent主动竞争任务 - 动态调度:串行/并行任务智能调度 - 智能容错:超时切换、验证重试 - 质量保证:结果验证、历史追踪 Web界面:首页、请求列表、任务列表、Agent管理 API接口:请求/任务/Agent管理、测试接口 端口:19015 diff --git a/README.md b/README.md new file mode 100644 index 0000000..78fcf3b --- /dev/null +++ b/README.md @@ -0,0 +1,120 @@ +# 多智能体竞标调度系统 + +基于邮件方案实现的多智能体竞标调度架构,核心包含: + +- **规划Agent (Orchestrator)**:意图理解、任务拆分、竞标管理、结果验证 +- **执行Agent (Worker)**:竞标任务、执行交付 +- **任务公告板 (Task Board)**:状态管理、信息存储 + +## 核心特性 + +### 1. 竞标机制 +Agent主动竞争任务,展示能力和方案,规划Agent综合评估选择最佳执行者。 + +评估公式: +``` +综合得分 = 0.3×能力匹配 + 0.2×自信度 + 0.2×时间效率 + 0.2×方案质量 + 0.1×历史表现 +``` + +### 2. 动态调度 +- **串行任务**:严格按依赖关系执行 +- **并行任务**:最大化并发执行 +- **任务图**:自动计算执行顺序 + +### 3. 智能容错 +- **超时切换**:超时后自动启动备选Agent +- **验证重试**:验证失败给予重试机会 +- **历史追踪**:记录失败原因,改进系统 + +### 4. 质量保证 +- **结果验证**:完整性、正确性、质量检查 +- **反馈改进**:验证失败时给予Agent反馈 +- **历史统计**:追踪Agent表现数据 + +## 技术架构 + +``` +用户请求 → 意图理解 → 任务拆分 → 任务发布 → 竞标收集 → +竞标评估 → 执行分配 → 执行监控 → 结果收集 → 结果验证 → + ├─ 验证通过 → 结果整合 → 返回用户 + ├─ 验证失败 → 重试机会 → 再次失败 → 换Agent + └─ 全部失败 → 任务终止 → 返回错误报告 +``` + +## 安装 + +```bash +pip install -r requirements.txt +``` + +## 运行 + +```bash +# 方式1:使用启动脚本 +./start.sh + +# 方式2:直接运行 +python3 -m app.app --port 19015 + +# 方式3:后台运行 +nohup python3 -m app.app --port 19015 > logs/app.log 2>&1 & +``` + +## API接口 + +### 用户请求 +- `POST /api/request` - 创建用户请求 +- `GET /api/request/` - 获取请求详情 +- `GET /api/requests` - 列出所有请求 + +### 任务管理 +- `GET /api/tasks` - 列出所有任务 +- `GET /api/task/` - 获取任务详情(含竞标和尝试记录) + +### Agent管理 +- `GET /api/agents` - 列出所有Agent +- `POST /api/agent` - 注册Agent +- `DELETE /api/agent/` - 注销Agent + +### 测试接口 +- `POST /api/test` - 快速测试LLM +- `POST /api/test_intent` - 测试意图分析 +- `POST /api/test_split` - 测试任务拆分 + +## 配置 + +LLM接口配置在 `app/llm_client.py`: +- 默认URL: `http://192.168.2.17:19007/v1` +- 默认模型: `auto` + +可通过环境变量或代码修改配置。 + +## Web界面 + +访问 `http://localhost:19015` 打开Web管理界面: + +- **首页**:发送请求、查看日志、统计信息 +- **请求列表**:查看历史请求及详情 +- **任务列表**:查看任务执行状态 +- **Agent管理**:注册、注销、查看Agent + +## 默认Agent + +系统预设4个Agent: + +| Agent | 能力 | +|-------|------| +| coder_agent | coding, debugging, optimization | +| search_agent | search, research, summarization | +| writer_agent | writing, documentation, translation | +| analyst_agent | analysis, reporting, visualization | + +可通过API或Web界面注册新Agent。 + +## 数据存储 + +任务数据存储在 `~/.multi_agent/task_board.json` + +## Git仓库 + +http://192.168.2.8:12007/coder/multi-agent-bidding \ No newline at end of file diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..1dbad76 --- /dev/null +++ b/app/__init__.py @@ -0,0 +1,11 @@ +""" +多智能体竞标调度系统 +""" + +from .models import * +from .task_board import TaskBoard +from .orchestrator import Orchestrator +from .worker import Worker, WorkerPool, create_default_workers +from .llm_client import LLMClient, default_client +from .bid_evaluator import BidEvaluator +from .execution_monitor import ExecutionMonitor, ExecutionMonitorManager \ No newline at end of file diff --git a/app/app.py b/app/app.py new file mode 100644 index 0000000..2e050f1 --- /dev/null +++ b/app/app.py @@ -0,0 +1,260 @@ +""" +多智能体竞标调度系统 - Web应用 +""" + +from flask import Flask, render_template, request, jsonify +from flask_cors import CORS +import json +import time + +from .models import AgentProfile, TaskStatus +from .task_board import TaskBoard +from .orchestrator import Orchestrator +from .worker import WorkerPool, create_default_workers +from .llm_client import LLMClient + +app = Flask(__name__) +CORS(app) + +# 初始化组件 +task_board = TaskBoard() +llm_client = LLMClient( + base_url="http://192.168.2.17:19007/v1", + api_key="xxxx", + model="auto" +) +orchestrator = Orchestrator(task_board, llm_client) +worker_pool = create_default_workers(task_board, llm_client) + + +# === Web页面 === + +@app.route('/') +def index(): + """首页""" + return render_template('index.html') + + +@app.route('/requests') +def requests_page(): + """请求列表页""" + return render_template('requests.html') + + +@app.route('/tasks') +def tasks_page(): + """任务列表页""" + return render_template('tasks.html') + + +@app.route('/agents') +def agents_page(): + """Agent列表页""" + return render_template('agents.html') + + +# === API === + +@app.route('/api/request', methods=['POST']) +def create_request(): + """ + 创建用户请求 + + Body: {"content": "用户请求内容"} + """ + data = request.get_json() + content = data.get('content', '') + + if not content: + return jsonify({'error': '请提供请求内容'}), 400 + + # 处理请求 + result = orchestrator.process_request(content) + + return jsonify(result.to_dict()) + + +@app.route('/api/request/') +def get_request(request_id): + """获取请求详情""" + req = task_board.get_request(request_id) + if req: + return jsonify(req.to_dict()) + return jsonify({'error': '请求不存在'}), 404 + + +@app.route('/api/requests') +def list_requests(): + """列出请求""" + limit = int(request.args.get('limit', 50)) + requests = task_board.list_requests(limit) + return jsonify([r.to_dict() for r in requests]) + + +@app.route('/api/tasks') +def list_tasks(): + """列出任务""" + limit = int(request.args.get('limit', 100)) + tasks = task_board.list_tasks(limit) + return jsonify([t.to_dict() for t in tasks]) + + +@app.route('/api/task/') +def get_task(task_id): + """获取任务详情""" + task = task_board.get_task(task_id) + if task: + task_dict = task.to_dict() + # 添加竞标和尝试记录 + task_dict['bids'] = [b.to_dict() for b in task_board.get_bids(task_id)] + task_dict['attempts'] = [a.to_dict() for a in task_board.get_attempts(task_id)] + return jsonify(task_dict) + return jsonify({'error': '任务不存在'}), 404 + + +@app.route('/api/agents') +def list_agents(): + """列出Agent""" + agents = task_board.list_agents() + return jsonify([a.to_dict() for a in agents]) + + +@app.route('/api/agent', methods=['POST']) +def register_agent(): + """ + 注册Agent + + Body: {"id": "...", "name": "...", "capabilities": [...]} + """ + data = request.get_json() + + agent = AgentProfile( + id=data.get('id', ''), + name=data.get('name', ''), + description=data.get('description', ''), + capabilities=data.get('capabilities', []), + max_concurrent_tasks=data.get('max_concurrent_tasks', 1), + preferred_task_types=data.get('preferred_task_types', []) + ) + + if not agent.id: + return jsonify({'error': '请提供Agent ID'}), 400 + + task_board.register_agent(agent) + + return jsonify(agent.to_dict()) + + +@app.route('/api/agent/', methods=['DELETE']) +def unregister_agent(agent_id): + """注销Agent""" + task_board.unregister_agent(agent_id) + return jsonify({'success': True}) + + +@app.route('/api/stats') +def get_stats(): + """获取统计信息""" + stats = task_board.get_stats() + stats['active_executions'] = orchestrator.execution_manager.get_active_count() + return jsonify(stats) + + +# === 快速测试 === + +@app.route('/api/test', methods=['POST']) +def test_execution(): + """ + 快速测试 + + Body: {"prompt": "测试内容"} + """ + data = request.get_json() + prompt = data.get('prompt', '你好,请介绍一下你自己') + + try: + response = llm_client.simple_chat(prompt) + return jsonify({ + 'success': True, + 'response': response + }) + except Exception as e: + return jsonify({ + 'success': False, + 'error': str(e) + }), 500 + + +@app.route('/api/test_intent', methods=['POST']) +def test_intent(): + """ + 测试意图分析 + + Body: {"prompt": "帮我写一个Python爬虫"} + """ + data = request.get_json() + prompt = data.get('prompt', '') + + try: + intent = llm_client.analyze_intent(prompt) + return jsonify({ + 'success': True, + 'intent': intent + }) + except Exception as e: + return jsonify({ + 'success': False, + 'error': str(e) + }), 500 + + +@app.route('/api/test_split', methods=['POST']) +def test_split(): + """ + 测试任务拆分 + + Body: {"prompt": "..."} + """ + data = request.get_json() + prompt = data.get('prompt', '') + + try: + intent = llm_client.analyze_intent(prompt) + tasks = llm_client.split_tasks(intent, prompt) + return jsonify({ + 'success': True, + 'intent': intent, + 'tasks': tasks + }) + except Exception as e: + return jsonify({ + 'success': False, + 'error': str(e) + }), 500 + + +def main(): + """启动应用""" + import argparse + + parser = argparse.ArgumentParser(description='多智能体竞标调度系统') + parser.add_argument('--port', type=int, default=19015, help='服务端口') + parser.add_argument('--host', type=str, default='0.0.0.0', help='服务地址') + parser.add_argument('--debug', action='store_true', help='调试模式') + + args = parser.parse_args() + + print(f"启动多智能体竞标调度系统...") + print(f"端口: {args.port}") + print(f"LLM接口: {llm_client.base_url}") + print(f"默认Agent数量: {len(worker_pool.list_workers())}") + + app.run( + host=args.host, + port=args.port, + debug=args.debug + ) + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/app/bid_evaluator.py b/app/bid_evaluator.py new file mode 100644 index 0000000..b689ef1 --- /dev/null +++ b/app/bid_evaluator.py @@ -0,0 +1,155 @@ +""" +竞标评估器 - 评估Agent竞标,选择最佳执行者 +""" + +from typing import Dict, List +from .models import Bid, Task, AgentProfile + + +class BidEvaluator: + """竞标评估器""" + + def __init__(self): + self.weights = { + 'capability': 0.3, # 能力匹配度 + 'confidence': 0.2, # 自信度 + 'time_efficiency': 0.2, # 时间效率 + 'approach_quality': 0.2, # 方案质量 + 'historical': 0.1 # 历史表现 + } + + def evaluate_bid(self, bid: Bid, agent: AgentProfile, max_time: int = 300) -> float: + """ + 评估单个竞标 + + Args: + bid: 竞标书 + agent: Agent档案 + max_time: 最大可接受时间 + + Returns: + 综合得分 0-1 + """ + scores = {} + + # 1. 能力匹配度 + scores['capability'] = bid.capability_match + + # 2. 自信度 + scores['confidence'] = bid.confidence + + # 3. 时间效率(时间越短得分越高) + scores['time_efficiency'] = max(0, 1 - bid.estimated_time / max_time) + + # 4. 方案质量 + scores['approach_quality'] = self._rate_approach(bid.approach) + + # 5. 历史表现 + scores['historical'] = agent.success_rate + + # 加权求和 + final_score = sum( + self.weights[k] * v + for k, v in scores.items() + ) + + return final_score + + def _rate_approach(self, approach: str) -> float: + """评估方案质量""" + score = 0.5 + + # 有备选方案加分 + if '备选' in approach or 'alternative' in approach.lower(): + score += 0.1 + + # 有错误处理加分 + if '错误' in approach or '异常' in approach or 'error' in approach.lower(): + score += 0.1 + + # 有详细描述加分 + if len(approach) > 100: + score += 0.1 + + # 有步骤分解加分 + if '步骤' in approach or 'step' in approach.lower(): + score += 0.1 + + return min(score, 1.0) + + def select_best_bid( + self, + bids: List[Bid], + agents: Dict[str, AgentProfile] + ) -> tuple: + """ + 选择最佳竞标 + + Args: + bids: 竞标列表 + agents: Agent档案字典 + + Returns: + (best_bid, best_agent, scores_dict) + """ + if not bids: + return (None, None, {}) + + scores = {} + + for bid in bids: + agent = agents.get(bid.agent_id) + if agent: + score = self.evaluate_bid(bid, agent) + scores[bid.id] = { + 'score': score, + 'bid': bid, + 'agent': agent, + 'details': { + 'capability': bid.capability_match, + 'confidence': bid.confidence, + 'time_efficiency': max(0, 1 - bid.estimated_time / 300), + 'approach_quality': self._rate_approach(bid.approach), + 'historical': agent.success_rate + } + } + + if not scores: + return (None, None, {}) + + # 找最高分 + best_bid_id = max(scores.keys(), key=lambda k: scores[k]['score']) + best = scores[best_bid_id] + + return (best['bid'], best['agent'], scores) + + def get_backup_agents( + self, + bids: List[Bid], + agents: Dict[str, AgentProfile], + selected_agent_id: str + ) -> List[AgentProfile]: + """ + 获取备选Agent列表(按得分排序) + + Args: + bids: 竞标列表 + agents: Agent档案字典 + selected_agent_id: 已选择的AgentID + + Returns: + 备选Agent列表 + """ + backup = [] + + for bid in bids: + if bid.agent_id != selected_agent_id: + agent = agents.get(bid.agent_id) + if agent: + score = self.evaluate_bid(bid, agent) + backup.append((agent, score)) + + # 按得分降序排序 + backup.sort(key=lambda x: x[1], reverse=True) + + return [a for a, s in backup] \ No newline at end of file diff --git a/app/execution_monitor.py b/app/execution_monitor.py new file mode 100644 index 0000000..893e964 --- /dev/null +++ b/app/execution_monitor.py @@ -0,0 +1,190 @@ +""" +执行监控器 - 监控执行状态、超时处理、结果收集 +""" + +import asyncio +import time +import threading +from typing import Dict, List, Optional, Any, Callable +from .models import Task, Execution, AgentProfile, Bid, TaskStatus + + +class ExecutionMonitor: + """执行监控器""" + + def __init__( + self, + task: Task, + agent: AgentProfile, + bid: Bid, + on_complete: Callable = None, + on_timeout: Callable = None, + on_error: Callable = None + ): + self.task = task + self.agent = agent + self.bid = bid + self.timeout = task.timeout + + self.callbacks = { + 'complete': on_complete, + 'timeout': on_timeout, + 'error': on_error + } + + self.execution: Optional[Execution] = None + self.backup_agents: List[AgentProfile] = [] + self.active_executions: Dict[str, Any] = {} + + self._start_time: float = 0 + self._is_running: bool = False + self._result: Optional[Any] = None + self._error: Optional[str] = None + + def add_backup_agent(self, agent: AgentProfile): + """添加备选Agent""" + self.backup_agents.append(agent) + + def start(self, execute_func: Callable) -> Execution: + """ + 启动执行 + + Args: + execute_func: 执行函数 + + Returns: + Execution对象 + """ + self.execution = Execution( + task_id=self.task.id, + agent_id=self.agent.id, + bid_id=self.bid.id, + status='running' + ) + self._start_time = time.time() + self._is_running = True + + # 启动执行线程 + thread = threading.Thread(target=self._run_execution, args=(execute_func,)) + thread.start() + + # 启动超时监控线程 + timeout_thread = threading.Thread(target=self._watch_timeout) + timeout_thread.start() + + return self.execution + + def _run_execution(self, execute_func: Callable): + """执行任务""" + try: + result = execute_func(self.task, self.agent, self.bid) + + if self._is_running: + self._result = result + self.execution.status = 'completed' + self.execution.end_time = time.time() + self.execution.result = result + self._is_running = False + + if self.callbacks['complete']: + self.callbacks['complete'](self.execution) + + except Exception as e: + if self._is_running: + self._error = str(e) + self.execution.status = 'failed' + self.execution.end_time = time.time() + self.execution.error = str(e) + self._is_running = False + + if self.callbacks['error']: + self.callbacks['error'](self.execution, e) + + def _watch_timeout(self): + """超时监控""" + while self._is_running: + elapsed = time.time() - self._start_time + + if elapsed >= self.timeout: + # 超时 + if self._is_running: + self.execution.status = 'timeout' + self.execution.end_time = time.time() + self.execution.error = f"执行超时 ({self.timeout}秒)" + self._is_running = False + + if self.callbacks['timeout']: + self.callbacks['timeout'](self.execution) + + # 启动备选Agent + if self.backup_agents: + self._start_backup() + break + + time.sleep(1) + + def _start_backup(self): + """启动备选Agent""" + # 这里简化处理,实际应该启动备选执行 + pass + + def get_result(self) -> Optional[Any]: + """获取结果""" + return self._result + + def is_complete(self) -> bool: + """是否完成""" + return not self._is_running + + def get_duration(self) -> float: + """获取执行时长""" + if self.execution and self.execution.end_time: + return self.execution.end_time - self.execution.start_time + return time.time() - self._start_time + + +class ExecutionMonitorManager: + """执行监控管理器""" + + def __init__(self): + self.monitors: Dict[str, ExecutionMonitor] = {} # execution_id -> monitor + + def create_monitor( + self, + task: Task, + agent: AgentProfile, + bid: Bid, + callbacks: Dict[str, Callable] = None + ) -> ExecutionMonitor: + """创建监控器""" + monitor = ExecutionMonitor( + task=task, + agent=agent, + bid=bid, + on_complete=callbacks.get('complete'), + on_timeout=callbacks.get('timeout'), + on_error=callbacks.get('error') + ) + + # 保存到管理器 + if monitor.execution: + self.monitors[monitor.execution.id] = monitor + + return monitor + + def get_monitor(self, execution_id: str) -> Optional[ExecutionMonitor]: + """获取监控器""" + return self.monitors.get(execution_id) + + def cleanup_completed(self): + """清理已完成的监控器""" + completed = [ + eid for eid, m in self.monitors.items() + if m.is_complete() + ] + for eid in completed: + self.monitors.pop(eid, None) + + def get_active_count(self) -> int: + """获取活跃执行数""" + return len([m for m in self.monitors.values() if not m.is_complete()]) \ No newline at end of file diff --git a/app/llm_client.py b/app/llm_client.py new file mode 100644 index 0000000..14e2c19 --- /dev/null +++ b/app/llm_client.py @@ -0,0 +1,400 @@ +""" +LLM客户端 - 调用大模型API +""" + +import requests +import json +import time +from typing import Dict, List, Optional + + +class LLMClient: + """大模型客户端""" + + def __init__( + self, + base_url: str = "http://192.168.2.17:19007/v1", + api_key: str = "xxxx", + model: str = "auto", + timeout: int = 120 + ): + self.base_url = base_url.rstrip('/') + self.api_key = api_key + self.model = model + self.timeout = timeout + + def chat( + self, + messages: List[Dict], + temperature: float = 0.7, + max_tokens: int = 4096, + stream: bool = False + ) -> Dict: + """ + 发送聊天请求 + + Args: + messages: [{"role": "user/assistant/system", "content": "..."}] + temperature: 温度参数 + max_tokens: 最大token数 + stream: 是否流式输出 + + Returns: + {"content": "...", "usage": {...}} + """ + url = f"{self.base_url}/chat/completions" + + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {self.api_key}" + } + + payload = { + "model": self.model, + "messages": messages, + "temperature": temperature, + "max_tokens": max_tokens, + "stream": stream + } + + try: + response = requests.post( + url, + headers=headers, + json=payload, + timeout=self.timeout, + stream=stream + ) + + if response.status_code != 200: + return { + "content": "", + "error": f"API错误: {response.status_code} - {response.text}" + } + + if stream: + # 流式处理 + content = "" + for line in response.iter_lines(): + if line: + line = line.decode('utf-8') + if line.startswith('data: '): + data = line[6:] + if data == '[DONE]': + break + try: + chunk = json.loads(data) + if 'choices' in chunk and len(chunk['choices']) > 0: + delta = chunk['choices'][0].get('delta', {}) + if 'content' in delta: + content += delta['content'] + except json.JSONDecodeError: + continue + + return {"content": content} + + else: + data = response.json() + content = data['choices'][0]['message']['content'] + usage = data.get('usage', {}) + + return { + "content": content, + "usage": usage + } + + except requests.Timeout: + return {"content": "", "error": "请求超时"} + except requests.RequestException as e: + return {"content": "", "error": f"请求异常: {str(e)}"} + + def simple_chat(self, prompt: str, system_prompt: str = "") -> str: + """ + 简单聊天接口 + + Args: + prompt: 用户输入 + system_prompt: 系统提示 + + Returns: + 模型回复文本 + """ + messages = [] + if system_prompt: + messages.append({"role": "system", "content": system_prompt}) + messages.append({"role": "user", "content": prompt}) + + result = self.chat(messages) + + if "error" in result and result["error"]: + raise Exception(result["error"]) + + return result["content"] + + def structured_output( + self, + prompt: str, + schema: Dict, + system_prompt: str = "" + ) -> Dict: + """ + 结构化输出 + + Args: + prompt: 用户输入 + schema: 输出格式描述 + system_prompt: 系统提示 + + Returns: + 解析后的JSON对象 + """ + schema_text = json.dumps(schema, indent=2, ensure_ascii=False) + + full_system = system_prompt + "\n\n请按照以下JSON格式输出,不要输出其他内容:\n" + schema_text + + messages = [ + {"role": "system", "content": full_system}, + {"role": "user", "content": prompt} + ] + + result = self.chat(messages, temperature=0.3) + + if "error" in result and result["error"]: + raise Exception(result["error"]) + + content = result["content"].strip() + + # 尝试解析JSON + try: + # 去除可能的markdown代码块标记 + if content.startswith('```'): + lines = content.split('\n') + content = '\n'.join(lines[1:-1] if lines[-1] == '```' else lines[1:]) + + return json.loads(content) + except json.JSONDecodeError: + # 尝试提取JSON部分 + import re + json_match = re.search(r'\{.*\}', content, re.DOTALL) + if json_match: + try: + return json.loads(json_match.group()) + except json.JSONDecodeError: + pass + + return {"raw_content": content, "parse_error": "无法解析为JSON"} + + def analyze_intent(self, user_request: str) -> Dict: + """ + 分析用户意图 + + Args: + user_request: 用户原始请求 + + Returns: + {"intent": "...", "keywords": [...], "need_clarification": bool, "questions": [...]} + """ + system_prompt = """你是一个意图分析专家。分析用户的请求,判断: +1. 用户的核心意图是什么 +2. 提取关键信息 +3. 信息是否足够完整(不需要额外澄清) +4. 如果不完整,需要澄清的问题 + +请以JSON格式输出。""" + + schema = { + "intent": "用户核心意图的简洁描述", + "keywords": ["关键信息列表"], + "need_clarification": "是否需要澄清 (true/false)", + "questions": ["需要澄清的问题列表(如果need_clarification为true)"] + } + + return self.structured_output( + f"分析以下用户请求:\n\n{user_request}", + schema, + system_prompt + ) + + def split_tasks(self, intent: Dict, user_request: str) -> List[Dict]: + """ + 任务拆分 + + Args: + intent: 意图分析结果 + user_request: 用户原始请求 + + Returns: + [{"id": "...", "type": "serial/parallel", "description": "...", "dependencies": [...]}] + """ + system_prompt = """你是一个任务规划专家。根据用户意图,将请求拆分为多个子任务。 + +规则: +1. 识别任务之间的依赖关系 +2. 无依赖的任务标记为parallel(可并行) +3. 有依赖的任务标记为serial(串行) +4. 每个任务有明确的描述 +5. 每个任务有明确的输入输出要求 + +请以JSON格式输出任务列表。""" + + schema = { + "tasks": [ + { + "id": "任务唯一标识(如task_1, task_2)", + "type": "serial 或 parallel", + "description": "任务描述", + "input_schema": {"输入要求"}, + "output_schema": {"输出要求"}, + "dependencies": ["依赖的任务ID列表"] + } + ], + "execution_order": ["任务执行顺序说明"] + } + + result = self.structured_output( + f"""用户原始请求: {user_request} + +意图分析结果: +{json.dumps(intent, indent=2, ensure_ascii=False)} + +请拆分为子任务列表。""", + schema, + system_prompt + ) + + return result.get("tasks", []) + + def generate_bid( + self, + task: Dict, + agent_profile: Dict + ) -> Dict: + """ + Agent生成竞标 + + Args: + task: 任务定义 + agent_profile: Agent档案 + + Returns: + 竞标书内容 + """ + system_prompt = """你是一个执行Agent,需要为任务竞标。 + +评估你的能力和任务的匹配度,给出: +1. 能力匹配度(0-1) +2. 预估完成时间(秒) +3. 自信度(0-1) +4. 执行方案描述 +5. 前置条件(如果有) +6. 备选方案(如果有) + +请以JSON格式输出。""" + + schema = { + "capability_match": "能力匹配度 0-1", + "estimated_time": "预估完成时间(秒)", + "confidence": "自信度 0-1", + "approach": "执行方案描述", + "prerequisites": ["前置条件列表"], + "alternative_approaches": ["备选方案列表"] + } + + result = self.structured_output( + f"""任务: +{json.dumps(task, indent=2, ensure_ascii=False)} + +你的档案: +{json.dumps(agent_profile, indent=2, ensure_ascii=False)} + +请生成竞标书。""", + schema, + system_prompt + ) + + return result + + def execute_task( + self, + task: Dict, + approach: str = "" + ) -> Dict: + """ + 执行任务 + + Args: + task: 任务定义 + approach: 执行方案 + + Returns: + 执行结果 + """ + system_prompt = """你是一个任务执行者。根据任务描述和执行方案,完成任务并输出结果。 + +输出应该符合任务的output_schema要求。""" + + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": f"""任务: +{json.dumps(task, indent=2, ensure_ascii=False)} + +执行方案: {approach} + +请执行任务并输出结果。"""} + ] + + result = self.chat(messages, temperature=0.5, max_tokens=8192) + + if "error" in result and result["error"]: + return {"error": result["error"]} + + return {"result": result["content"]} + + def validate_result( + self, + task: Dict, + result: Any + ) -> Dict: + """ + 验证结果 + + Args: + task: 任务定义 + result: 执行结果 + + Returns: + {"passed": bool, "issues": [...], "score": 0-1} + """ + system_prompt = """你是一个结果验证专家。评估执行结果是否符合任务要求。 + +判断: +1. 结果完整性(是否包含所有必要部分) +2. 结果正确性(是否符合预期) +3. 结果质量评分(0-1) + +请以JSON格式输出。""" + + schema = { + "passed": "是否通过验证 (true/false)", + "completeness": "完整性检查结果", + "correctness": "正确性检查结果", + "issues": ["问题列表(如果未通过)"], + "score": "质量评分 0-1", + "suggestions": ["改进建议"] + } + + return self.structured_output( + f"""任务: +{json.dumps(task, indent=2, ensure_ascii=False)} + +执行结果: +{json.dumps(result, indent=2, ensure_ascii=False) if isinstance(result, dict) else str(result)} + +请验证结果。""", + schema, + system_prompt + ) + + +# 默认客户端实例 +default_client = LLMClient() \ No newline at end of file diff --git a/app/models.py b/app/models.py new file mode 100644 index 0000000..801a4e6 --- /dev/null +++ b/app/models.py @@ -0,0 +1,281 @@ +""" +多智能体竞标调度架构 - 核心数据模型 +""" + +from dataclasses import dataclass, field +from typing import List, Dict, Optional, Any +from enum import Enum +import time +import uuid + + +class TaskStatus(Enum): + """任务状态""" + PENDING = "pending" # 待发布 + PUBLISHED = "published" # 已发布,等待竞标 + BIDDING = "bidding" # 竞标中 + ASSIGNED = "assigned" # 已分配 + EXECUTING = "executing" # 执行中 + VALIDATING = "validating" # 验证中 + RETRY = "retry" # 重试中 + COMPLETED = "completed" # 已完成 + FAILED = "failed" # 已失败 + TERMINATED = "terminated" # 已终止 + + +class TaskType(Enum): + """任务类型""" + SERIAL = "serial" # 串行任务 + PARALLEL = "parallel" # 并行任务 + INDEPENDENT = "independent" # 独立任务 + + +@dataclass +class Task: + """任务定义""" + id: str = field(default_factory=lambda: str(uuid.uuid4())[:8]) + type: TaskType = TaskType.INDEPENDENT + description: str = "" + input_data: Dict = field(default_factory=dict) + output_schema: Dict = field(default_factory=dict) + validation_criteria: Dict = field(default_factory=dict) + timeout: int = 300 # 超时时间(秒) + priority: int = 0 # 优先级 + dependencies: List[str] = field(default_factory=list) # 依赖的任务ID + status: TaskStatus = TaskStatus.PENDING + created_at: float = field(default_factory=time.time) + parent_request_id: Optional[str] = None # 原始用户请求ID + + def to_dict(self) -> Dict: + return { + 'id': self.id, + 'type': self.type.value, + 'description': self.description, + 'input_data': self.input_data, + 'output_schema': self.output_schema, + 'validation_criteria': self.validation_criteria, + 'timeout': self.timeout, + 'priority': self.priority, + 'dependencies': self.dependencies, + 'status': self.status.value, + 'created_at': self.created_at, + 'parent_request_id': self.parent_request_id + } + + +@dataclass +class Bid: + """竞标书""" + id: str = field(default_factory=lambda: str(uuid.uuid4())[:8]) + task_id: str = "" + agent_id: str = "" + capability_match: float = 0.5 # 能力匹配度 0-1 + estimated_time: int = 60 # 预估完成时间(秒) + confidence: float = 0.5 # 自信度 0-1 + approach: str = "" # 方案描述 + prerequisites: List[str] = field(default_factory=list) # 前置条件 + alternative_approaches: List[str] = field(default_factory=list) + created_at: float = field(default_factory=time.time) + + def to_dict(self) -> Dict: + return { + 'id': self.id, + 'task_id': self.task_id, + 'agent_id': self.agent_id, + 'capability_match': self.capability_match, + 'estimated_time': self.estimated_time, + 'confidence': self.confidence, + 'approach': self.approach, + 'prerequisites': self.prerequisites, + 'alternative_approaches': self.alternative_approaches, + 'created_at': self.created_at + } + + +@dataclass +class Execution: + """执行记录""" + id: str = field(default_factory=lambda: str(uuid.uuid4())[:8]) + task_id: str = "" + agent_id: str = "" + bid_id: str = "" + status: str = "running" # running | completed | failed | timeout + start_time: float = field(default_factory=time.time) + end_time: Optional[float] = None + result: Optional[Any] = None + error: Optional[str] = None + retry_count: int = 0 + + def to_dict(self) -> Dict: + return { + 'id': self.id, + 'task_id': self.task_id, + 'agent_id': self.agent_id, + 'bid_id': self.bid_id, + 'status': self.status, + 'start_time': self.start_time, + 'end_time': self.end_time, + 'result': self.result, + 'error': self.error, + 'retry_count': self.retry_count + } + + +@dataclass +class ValidationResult: + """验证结果""" + passed: bool = False + checks: List[Dict] = field(default_factory=list) + issues: List[str] = field(default_factory=list) + score: float = 0.0 # 质量评分 + + def to_dict(self) -> Dict: + return { + 'passed': self.passed, + 'checks': self.checks, + 'issues': self.issues, + 'score': self.score + } + + +@dataclass +class Attempt: + """一次完整的尝试记录""" + id: str = field(default_factory=lambda: str(uuid.uuid4())[:8]) + task_id: str = "" + agent_id: str = "" + execution: Optional[Execution] = None + validation: Optional[ValidationResult] = None + success: bool = False + failure_type: Optional[str] = None # timeout | validation | capability | error + duration: float = 0.0 + feedback: Optional[str] = None # 给Agent的反馈 + + def to_dict(self) -> Dict: + return { + 'id': self.id, + 'task_id': self.task_id, + 'agent_id': self.agent_id, + 'execution': self.execution.to_dict() if self.execution else None, + 'validation': self.validation.to_dict() if self.validation else None, + 'success': self.success, + 'failure_type': self.failure_type, + 'duration': self.duration, + 'feedback': self.feedback + } + + +@dataclass +class AgentProfile: + """Agent档案""" + id: str = "" + name: str = "" + description: str = "" + capabilities: List[str] = field(default_factory=list) # 能力标签 + max_concurrent_tasks: int = 1 + preferred_task_types: List[str] = field(default_factory=list) + + # 历史表现 + total_tasks: int = 0 + success_tasks: int = 0 + avg_completion_time: float = 0.0 + avg_quality_score: float = 0.0 + + @property + def success_rate(self) -> float: + if self.total_tasks > 0: + return self.success_tasks / self.total_tasks + return 0.0 + + def to_dict(self) -> Dict: + return { + 'id': self.id, + 'name': self.name, + 'description': self.description, + 'capabilities': self.capabilities, + 'max_concurrent_tasks': self.max_concurrent_tasks, + 'preferred_task_types': self.preferred_task_types, + 'total_tasks': self.total_tasks, + 'success_tasks': self.success_tasks, + 'avg_completion_time': self.avg_completion_time, + 'avg_quality_score': self.avg_quality_score, + 'success_rate': self.success_rate + } + + +@dataclass +class TaskGraph: + """任务依赖图""" + tasks: Dict[str, Task] = field(default_factory=dict) + dependencies: Dict[str, List[str]] = field(default_factory=dict) # task_id -> [dep_task_ids] + reverse_deps: Dict[str, List[str]] = field(default_factory=dict) # task_id -> [dependents] + + def add_task(self, task: Task): + """添加任务""" + self.tasks[task.id] = task + self.dependencies[task.id] = task.dependencies + + # 更新反向依赖 + for dep_id in task.dependencies: + if dep_id not in self.reverse_deps: + self.reverse_deps[dep_id] = [] + self.reverse_deps[dep_id].append(task.id) + + def get_ready_tasks(self, completed: set) -> List[Task]: + """获取当前可执行的任务(依赖已满足)""" + ready = [] + for task_id, deps in self.dependencies.items(): + if task_id not in completed: + if all(dep in completed for dep in deps): + ready.append(self.tasks[task_id]) + return ready + + def get_execution_order(self) -> List[List[Task]]: + """获取分层执行顺序(每层内的任务可并行)""" + order = [] + completed = set() + + while len(completed) < len(self.tasks): + ready = self.get_ready_tasks(completed) + if not ready: + # 可能存在循环依赖 + remaining = [t for t in self.tasks.values() if t.id not in completed] + if remaining: + # 把剩余任务强行加入(可能有问题) + order.append(remaining) + completed.update(t.id for t in remaining) + break + order.append(ready) + completed.update(t.id for t in ready) + + return order + + def to_dict(self) -> Dict: + return { + 'tasks': {k: v.to_dict() for k, v in self.tasks.items()}, + 'dependencies': self.dependencies, + 'reverse_deps': self.reverse_deps + } + + +@dataclass +class UserRequest: + """用户请求""" + id: str = field(default_factory=lambda: str(uuid.uuid4())[:8]) + content: str = "" + status: str = "pending" # pending | processing | completed | failed + created_at: float = field(default_factory=time.time) + task_graph: Optional[TaskGraph] = None + final_result: Optional[Any] = None + error: Optional[str] = None + + def to_dict(self) -> Dict: + return { + 'id': self.id, + 'content': self.content, + 'status': self.status, + 'created_at': self.created_at, + 'task_graph': self.task_graph.to_dict() if self.task_graph else None, + 'final_result': self.final_result, + 'error': self.error + } \ No newline at end of file diff --git a/app/orchestrator.py b/app/orchestrator.py new file mode 100644 index 0000000..593437a --- /dev/null +++ b/app/orchestrator.py @@ -0,0 +1,501 @@ +""" +规划Agent (Orchestrator) - 意图理解、任务拆分、竞标管理、结果验证 +""" + +import time +import json +from typing import Dict, List, Optional, Any +from .models import ( + Task, TaskType, TaskStatus, TaskGraph, Bid, Execution, + Attempt, ValidationResult, UserRequest, AgentProfile +) +from .task_board import TaskBoard +from .bid_evaluator import BidEvaluator +from .execution_monitor import ExecutionMonitorManager +from .llm_client import LLMClient, default_client + + +class Orchestrator: + """规划Agent - 核心调度引擎""" + + def __init__( + self, + task_board: TaskBoard, + llm_client: LLMClient = None + ): + self.task_board = task_board + self.llm = llm_client or default_client + + self.bid_evaluator = BidEvaluator() + self.execution_manager = ExecutionMonitorManager() + + # 配置 + self.bidding_window = 30 # 竞标窗口时间(秒) + self.max_retry_per_agent = 1 # 每个Agent最大重试次数 + self.max_total_attempts = 5 # 总尝试次数上限 + self.max_total_time = 1800 # 最大总执行时间(秒) + + # === 意图理解 === + + def understand_intent(self, user_request: str) -> Dict: + """ + 意图理解 + + Args: + user_request: 用户原始请求 + + Returns: + 意图分析结果 + """ + try: + intent = self.llm.analyze_intent(user_request) + return intent + except Exception as e: + # 简化处理 + return { + "intent": user_request, + "keywords": [], + "need_clarification": False, + "questions": [] + } + + # === 任务拆分 === + + def split_tasks(self, intent: Dict, user_request: str) -> TaskGraph: + """ + 任务拆分 + + Args: + intent: 意图分析结果 + user_request: 用户原始请求 + + Returns: + TaskGraph + """ + try: + task_list = self.llm.split_tasks(intent, user_request) + + graph = TaskGraph() + + for t in task_list: + task_type = TaskType.PARALLEL if t.get('type') == 'parallel' else TaskType.SERIAL + task = Task( + type=task_type, + description=t.get('description', ''), + input_data=t.get('input_schema', {}), + output_schema=t.get('output_schema', {}), + dependencies=t.get('dependencies', []) + ) + graph.add_task(task) + + return graph + + except Exception as e: + # 简化:创建单个任务 + graph = TaskGraph() + task = Task( + type=TaskType.INDEPENDENT, + description=user_request + ) + graph.add_task(task) + return graph + + # === 任务发布 === + + def publish_task(self, task: Task) -> List[AgentProfile]: + """ + 发布任务 + + Args: + task: 任务 + + Returns: + 能处理该任务的Agent列表 + """ + # 更新状态 + task.status = TaskStatus.PUBLISHED + self.task_board.add_task(task) + + # 找能处理的Agent + capable_agents = self.task_board.find_capable_agents(task) + + return capable_agents + + # === 竞标收集 === + + def collect_bids( + self, + task: Task, + agents: List[AgentProfile], + window_seconds: int = None + ) -> List[Bid]: + """ + 收集竞标 + + Args: + task: 任务 + agents: Agent列表 + window_seconds: 竞标窗口时间 + + Returns: + 竞标列表 + """ + window = window_seconds or self.bidding_window + task.status = TaskStatus.BIDDING + + bids = [] + + # 让每个Agent生成竞标 + for agent in agents: + try: + bid_data = self.llm.generate_bid(task.to_dict(), agent.to_dict()) + + bid = Bid( + task_id=task.id, + agent_id=agent.id, + capability_match=float(bid_data.get('capability_match', 0.5)), + estimated_time=int(bid_data.get('estimated_time', 60)), + confidence=float(bid_data.get('confidence', 0.5)), + approach=bid_data.get('approach', ''), + prerequisites=bid_data.get('prerequisites', []), + alternative_approaches=bid_data.get('alternative_approaches', []) + ) + + bids.append(bid) + self.task_board.add_bid(bid) + + except Exception as e: + # Agent无法竞标,跳过 + pass + + return bids + + # === 竞标评估 === + + def evaluate_bids(self, task: Task, bids: List[Bid]) -> tuple: + """ + 评估竞标 + + Args: + task: 任务 + bids: 竞标列表 + + Returns: + (selected_agent, selected_bid, backup_agents) + """ + agents_dict = { + a.id: a for a in self.task_board.list_agents() + } + + best_bid, best_agent, scores = self.bid_evaluator.select_best_bid(bids, agents_dict) + + if best_agent: + backup_agents = self.bid_evaluator.get_backup_agents( + bids, agents_dict, best_agent.id + ) + + return (best_agent, best_bid, backup_agents) + + return (None, None, []) + + # === 任务执行 === + + def assign_and_execute( + self, + task: Task, + agent: AgentProfile, + bid: Bid, + backup_agents: List[AgentProfile] = None + ) -> Execution: + """ + 分配并执行任务 + + Args: + task: 任务 + agent: 执行Agent + bid: 竞标书 + backup_agents: 备选Agent + + Returns: + Execution对象 + """ + task.status = TaskStatus.ASSIGNED + + # 定义执行函数 + def execute_func(t, a, b): + try: + result = self.llm.execute_task(t.to_dict(), b.approach) + return result.get('result') or result + except Exception as e: + raise e + + # 创建监控器 + monitor = self.execution_manager.create_monitor( + task=task, + agent=agent, + bid=bid, + callbacks={ + 'complete': lambda exec: self._on_execution_complete(exec, task), + 'timeout': lambda exec: self._on_execution_timeout(exec, task, backup_agents), + 'error': lambda exec, e: self._on_execution_error(exec, task, e) + } + ) + + # 添加备选 + if backup_agents: + for backup in backup_agents: + monitor.add_backup_agent(backup) + + # 启动执行 + task.status = TaskStatus.EXECUTING + execution = monitor.start(execute_func) + + self.task_board.add_execution(execution) + + return execution + + def _on_execution_complete(self, execution: Execution, task: Task): + """执行完成回调""" + task.status = TaskStatus.VALIDATING + + # 验证结果 + validation = self.validate_result(task, execution.result) + + if validation.passed: + task.status = TaskStatus.COMPLETED + self.task_board.update_task_status(task.id, task.status) + + # 更新Agent统计 + duration = execution.end_time - execution.start_time + self.task_board.update_agent_stats( + execution.agent_id, + success=True, + duration=duration, + quality_score=validation.score + ) + else: + # 验证失败,尝试重试 + self._handle_validation_failure(task, execution, validation) + + def _on_execution_timeout(self, execution: Execution, task: Task, backup_agents: List[AgentProfile]): + """执行超时回调""" + # 记录失败 + attempt = Attempt( + task_id=task.id, + agent_id=execution.agent_id, + execution=execution, + success=False, + failure_type='timeout', + duration=execution.end_time - execution.start_time + ) + self.task_board.add_attempt(attempt) + + # 尝试备选Agent + if backup_agents and self.task_board.get_attempt_count(task.id) < self.max_total_attempts: + # 找一个备选Agent重新执行 + for backup in backup_agents: + attempts = [a for a in self.task_board.get_attempts(task.id) if a.agent_id == backup.id] + if len(attempts) < self.max_retry_per_agent: + # 创建新的竞标 + bid = Bid( + task_id=task.id, + agent_id=backup.id, + capability_match=0.5, + estimated_time=60, + confidence=0.5, + approach="备选执行" + ) + self.assign_and_execute(task, backup, bid, []) + return + + # 无备选或已达上限,任务失败 + task.status = TaskStatus.FAILED + self.task_board.update_task_status(task.id, task.status) + + def _on_execution_error(self, execution: Execution, task: Task, error: Exception): + """执行错误回调""" + # 记录失败 + attempt = Attempt( + task_id=task.id, + agent_id=execution.agent_id, + execution=execution, + success=False, + failure_type='error', + duration=time.time() - execution.start_time, + feedback=str(error) + ) + self.task_board.add_attempt(attempt) + + task.status = TaskStatus.FAILED + self.task_board.update_task_status(task.id, task.status) + + def _handle_validation_failure(self, task: Task, execution: Execution, validation: ValidationResult): + """处理验证失败""" + # 记录失败 + attempt = Attempt( + task_id=task.id, + agent_id=execution.agent_id, + execution=execution, + validation=validation, + success=False, + failure_type='validation', + duration=execution.end_time - execution.start_time, + feedback=json.dumps(validation.issues) + ) + self.task_board.add_attempt(attempt) + + # 检查重试次数 + agent_attempts = [a for a in self.task_board.get_attempts(task.id) if a.agent_id == execution.agent_id] + + if len(agent_attempts) < self.max_retry_per_agent: + # 给予重试机会 + task.status = TaskStatus.RETRY + self.task_board.update_task_status(task.id, task.status) + + # 重新执行(带上反馈) + # 简化处理,直接失败 + task.status = TaskStatus.FAILED + self.task_board.update_task_status(task.id, task.status) + else: + # 已达重试上限,任务失败 + task.status = TaskStatus.FAILED + self.task_board.update_task_status(task.id, task.status) + + # === 结果验证 === + + def validate_result(self, task: Task, result: Any) -> ValidationResult: + """ + 验证结果 + + Args: + task: 任务 + result: 执行结果 + + Returns: + ValidationResult + """ + try: + validation_data = self.llm.validate_result(task.to_dict(), result) + + validation = ValidationResult( + passed=validation_data.get('passed', False), + issues=validation_data.get('issues', []), + score=float(validation_data.get('score', 0.5)) + ) + + return validation + + except Exception as e: + # 简化:假设通过 + return ValidationResult(passed=True, score=0.5) + + # === 主流程 === + + def process_request(self, user_request: str) -> UserRequest: + """ + 处理用户请求(完整流程) + + Args: + user_request: 用户原始请求 + + Returns: + UserRequest对象 + """ + # 1. 创建请求记录 + request = self.task_board.create_request(user_request) + + try: + # 2. 意图理解 + intent = self.understand_intent(user_request) + + # 3. 如果需要澄清 + if intent.get('need_clarification'): + request.status = 'clarification_needed' + request.error = json.dumps(intent.get('questions', [])) + self.task_board.update_request(request) + return request + + # 4. 任务拆分 + task_graph = self.split_tasks(intent, user_request) + request.task_graph = task_graph + request.status = 'processing' + self.task_board.update_request(request) + + # 5. 按执行顺序处理任务 + execution_order = task_graph.get_execution_order() + completed = set() + results = {} + + for layer_tasks in execution_order: + # 并行处理当前层的任务 + layer_results = {} + + for task in layer_tasks: + # 发布任务 + capable_agents = self.publish_task(task) + + if not capable_agents: + task.status = TaskStatus.FAILED + continue + + # 收集竞标 + bids = self.collect_bids(task, capable_agents) + + if not bids: + task.status = TaskStatus.FAILED + continue + + # 评估竞标 + agent, bid, backup = self.evaluate_bids(task, bids) + + if not agent: + task.status = TaskStatus.FAILED + continue + + # 执行任务 + execution = self.assign_and_execute(task, agent, bid, backup) + + # 等待执行完成(简化:直接等待) + # 实际应该用异步等待或轮询 + import time + max_wait = task.timeout + 10 + waited = 0 + while waited < max_wait: + exec_record = self.task_board.get_execution(execution.id) + if exec_record and exec_record.status in ['completed', 'failed', 'timeout']: + break + time.sleep(1) + waited += 1 + + # 获取结果 + if task.status == TaskStatus.COMPLETED: + layer_results[task.id] = execution.result + completed.add(task.id) + else: + # 任务失败,整个请求失败 + request.status = 'failed' + request.error = f"任务 {task.id} 执行失败" + self.task_board.update_request(request) + return request + + results.update(layer_results) + + # 6. 整合结果 + request.final_result = results + request.status = 'completed' + self.task_board.update_request(request) + + return request + + except Exception as e: + request.status = 'failed' + request.error = str(e) + self.task_board.update_request(request) + return request + + def get_request_status(self, request_id: str) -> Dict: + """获取请求状态""" + request = self.task_board.get_request(request_id) + if request: + return request.to_dict() + return None \ No newline at end of file diff --git a/app/task_board.py b/app/task_board.py new file mode 100644 index 0000000..8237eaf --- /dev/null +++ b/app/task_board.py @@ -0,0 +1,238 @@ +""" +任务公告板 - 任务状态管理、信息存储 +""" + +import json +import time +import threading +from typing import Dict, List, Optional, Any +from pathlib import Path +from .models import Task, Bid, Execution, Attempt, TaskStatus, TaskGraph, UserRequest, AgentProfile + + +class TaskBoard: + """任务公告板 - 存储和管理所有任务状态""" + + def __init__(self, storage_path: str = None): + self.storage_path = Path(storage_path or Path.home() / ".multi_agent" / "task_board.json") + self.storage_path.parent.mkdir(parents=True, exist_ok=True) + + # 内存存储 + self.user_requests: Dict[str, UserRequest] = {} + self.tasks: Dict[str, Task] = {} + self.bids: Dict[str, Dict[str, List[Bid]]] = {} # task_id -> {bid_id: Bid} + self.executions: Dict[str, Execution] = {} + self.attempts: Dict[str, List[Attempt]] = {} # task_id -> [Attempt] + self.agents: Dict[str, AgentProfile] = {} + + # 锁 + self._lock = threading.Lock() + + # 加载存储 + self._load() + + def _load(self): + """从文件加载""" + if self.storage_path.exists(): + try: + with open(self.storage_path, 'r') as f: + data = json.load(f) + # 这里可以恢复数据,简化处理,只保留基本结构 + except Exception: + pass + + def _save(self): + """保存到文件""" + try: + data = { + 'user_requests': {k: v.to_dict() for k, v in self.user_requests.items()}, + 'tasks': {k: v.to_dict() for k, v in self.tasks.items()}, + 'agents': {k: v.to_dict() for k, v in self.agents.items()}, + 'saved_at': time.time() + } + with open(self.storage_path, 'w') as f: + json.dump(data, f, ensure_ascii=False, indent=2) + except Exception as e: + print(f"保存失败: {e}") + + # === 用户请求管理 === + + def create_request(self, content: str) -> UserRequest: + """创建用户请求""" + request = UserRequest(content=content) + with self._lock: + self.user_requests[request.id] = request + self._save() + return request + + def get_request(self, request_id: str) -> Optional[UserRequest]: + """获取用户请求""" + return self.user_requests.get(request_id) + + def update_request(self, request: UserRequest): + """更新用户请求""" + with self._lock: + self.user_requests[request.id] = request + self._save() + + def list_requests(self, limit: int = 50) -> List[UserRequest]: + """列出用户请求""" + requests = list(self.user_requests.values()) + requests.sort(key=lambda r: r.created_at, reverse=True) + return requests[:limit] + + # === 任务管理 === + + def add_task(self, task: Task): + """添加任务""" + with self._lock: + self.tasks[task.id] = task + self.bids[task.id] = {} + self.attempts[task.id] = [] + self._save() + + def get_task(self, task_id: str) -> Optional[Task]: + """获取任务""" + return self.tasks.get(task_id) + + def update_task_status(self, task_id: str, status: TaskStatus): + """更新任务状态""" + with self._lock: + if task_id in self.tasks: + self.tasks[task_id].status = status + self._save() + + def get_tasks_by_status(self, status: TaskStatus) -> List[Task]: + """获取指定状态的任务""" + return [t for t in self.tasks.values() if t.status == status] + + def list_tasks(self, limit: int = 100) -> List[Task]: + """列出所有任务""" + tasks = list(self.tasks.values()) + tasks.sort(key=lambda t: t.created_at, reverse=True) + return tasks[:limit] + + # === 竞标管理 === + + def add_bid(self, bid: Bid): + """添加竞标""" + with self._lock: + if bid.task_id in self.bids: + self.bids[bid.task_id][bid.id] = bid + + def get_bids(self, task_id: str) -> List[Bid]: + """获取任务的竞标列表""" + if task_id in self.bids: + return list(self.bids[task_id].values()) + return [] + + def clear_bids(self, task_id: str): + """清除任务的竞标""" + with self._lock: + if task_id in self.bids: + self.bids[task_id] = {} + + # === 执行管理 === + + def add_execution(self, execution: Execution): + """添加执行记录""" + with self._lock: + self.executions[execution.id] = execution + + def get_execution(self, execution_id: str) -> Optional[Execution]: + """获取执行记录""" + return self.executions.get(execution_id) + + def update_execution(self, execution: Execution): + """更新执行记录""" + with self._lock: + self.executions[execution.id] = execution + self._save() + + # === 尝试记录管理 === + + def add_attempt(self, attempt: Attempt): + """添加尝试记录""" + with self._lock: + if attempt.task_id in self.attempts: + self.attempts[attempt.task_id].append(attempt) + + def get_attempts(self, task_id: str) -> List[Attempt]: + """获取任务的尝试记录""" + if task_id in self.attempts: + return self.attempts[task_id] + return [] + + def get_attempt_count(self, task_id: str) -> int: + """获取任务的尝试次数""" + return len(self.get_attempts(task_id)) + + # === Agent管理 === + + def register_agent(self, agent: AgentProfile): + """注册Agent""" + with self._lock: + self.agents[agent.id] = agent + self._save() + + def unregister_agent(self, agent_id: str): + """注销Agent""" + with self._lock: + self.agents.pop(agent_id, None) + self._save() + + def get_agent(self, agent_id: str) -> Optional[AgentProfile]: + """获取Agent""" + return self.agents.get(agent_id) + + def list_agents(self) -> List[AgentProfile]: + """列出所有Agent""" + return list(self.agents.values()) + + def update_agent_stats( + self, + agent_id: str, + success: bool, + duration: float, + quality_score: float = 0.0 + ): + """更新Agent统计""" + with self._lock: + if agent_id in self.agents: + agent = self.agents[agent_id] + agent.total_tasks += 1 + if success: + agent.success_tasks += 1 + # 更新平均完成时间 + prev_total_time = agent.avg_completion_time * (agent.success_tasks - 1) + agent.avg_completion_time = (prev_total_time + duration) / agent.success_tasks + # 更新平均质量评分 + prev_total_score = agent.avg_quality_score * (agent.success_tasks - 1) + agent.avg_quality_score = (prev_total_score + quality_score) / agent.success_tasks + self._save() + + def find_capable_agents(self, task: Task) -> List[AgentProfile]: + """找到能处理该任务的Agent""" + capable = [] + for agent in self.agents.values(): + # 检查能力匹配 + # 简化:所有Agent都可以尝试所有任务 + # 实际应该根据task.input_data中的required_capabilities匹配 + capable.append(agent) + return capable + + # === 统计信息 === + + def get_stats(self) -> Dict: + """获取统计信息""" + return { + 'total_requests': len(self.user_requests), + 'total_tasks': len(self.tasks), + 'tasks_by_status': { + status.value: len([t for t in self.tasks.values() if t.status == status]) + for status in TaskStatus + }, + 'total_agents': len(self.agents), + 'total_executions': len(self.executions), + 'total_bids': sum(len(b) for b in self.bids.values()) + } \ No newline at end of file diff --git a/app/worker.py b/app/worker.py new file mode 100644 index 0000000..10dcd54 --- /dev/null +++ b/app/worker.py @@ -0,0 +1,202 @@ +""" +执行Agent (Worker) - 竞标任务、执行交付 +""" + +import time +import json +from typing import Dict, List, Optional, Any +from .models import Task, Bid, Execution, AgentProfile +from .task_board import TaskBoard +from .llm_client import LLMClient, default_client + + +class Worker: + """执行Agent""" + + def __init__( + self, + profile: AgentProfile, + task_board: TaskBoard, + llm_client: LLMClient = None + ): + self.profile = profile + self.task_board = task_board + self.llm = llm_client or default_client + + # 注册到任务公告板 + self.task_board.register_agent(profile) + + # 当前任务 + self.current_tasks: Dict[str, Execution] = {} + + # === 竞标 === + + def bid_for_task(self, task: Task) -> Optional[Bid]: + """ + 为任务竞标 + + Args: + task: 任务 + + Returns: + 竞标书或None + """ + try: + # 使用LLM评估任务并生成竞标 + bid_data = self.llm.generate_bid(task.to_dict(), self.profile.to_dict()) + + bid = Bid( + task_id=task.id, + agent_id=self.profile.id, + capability_match=float(bid_data.get('capability_match', 0.5)), + estimated_time=int(bid_data.get('estimated_time', 60)), + confidence=float(bid_data.get('confidence', 0.5)), + approach=bid_data.get('approach', ''), + prerequisites=bid_data.get('prerequisites', []), + alternative_approaches=bid_data.get('alternative_approaches', []) + ) + + return bid + + except Exception as e: + return None + + # === 执行 === + + def execute_task(self, task: Task, bid: Bid) -> Execution: + """ + 执行任务 + + Args: + task: 任务 + bid: 竞标书 + + Returns: + Execution对象 + """ + execution = Execution( + task_id=task.id, + agent_id=self.profile.id, + bid_id=bid.id, + status='running' + ) + + self.current_tasks[task.id] = execution + self.task_board.add_execution(execution) + + try: + # 使用LLM执行任务 + result = self.llm.execute_task(task.to_dict(), bid.approach) + + execution.status = 'completed' + execution.end_time = time.time() + execution.result = result.get('result') or result + + self.task_board.update_execution(execution) + + # 更新统计 + duration = execution.end_time - execution.start_time + self.task_board.update_agent_stats( + self.profile.id, + success=True, + duration=duration + ) + + except Exception as e: + execution.status = 'failed' + execution.end_time = time.time() + execution.error = str(e) + + self.task_board.update_execution(execution) + + # 更新统计 + self.task_board.update_agent_stats( + self.profile.id, + success=False, + duration=time.time() - execution.start_time + ) + + finally: + self.current_tasks.pop(task.id, None) + + return execution + + def get_status(self) -> Dict: + """获取Worker状态""" + return { + 'profile': self.profile.to_dict(), + 'current_tasks': len(self.current_tasks), + 'is_busy': len(self.current_tasks) >= self.profile.max_concurrent_tasks + } + + +class WorkerPool: + """Worker池""" + + def __init__(self, task_board: TaskBoard): + self.task_board = task_board + self.workers: Dict[str, Worker] = {} + + def create_worker(self, profile: AgentProfile, llm_client: LLMClient = None) -> Worker: + """创建Worker""" + worker = Worker(profile, self.task_board, llm_client) + self.workers[profile.id] = worker + return worker + + def get_worker(self, agent_id: str) -> Optional[Worker]: + """获取Worker""" + return self.workers.get(agent_id) + + def list_workers(self) -> List[Worker]: + """列出所有Worker""" + return list(self.workers.values()) + + def find_available_workers(self) -> List[Worker]: + """找到空闲的Worker""" + return [w for w in self.workers.values() if not w.get_status()['is_busy']] + + +def create_default_workers(task_board: TaskBoard, llm_client: LLMClient = None) -> WorkerPool: + """创建默认的Worker池""" + pool = WorkerPool(task_board) + + # 创建几个默认的Agent + default_profiles = [ + AgentProfile( + id="coder_agent", + name="代码专家", + description="擅长代码编写、调试、优化", + capabilities=["coding", "debugging", "optimization"], + max_concurrent_tasks=2, + preferred_task_types=["coding"] + ), + AgentProfile( + id="search_agent", + name="搜索专家", + description="擅长信息搜索、资料收集", + capabilities=["search", "research", "summarization"], + max_concurrent_tasks=3, + preferred_task_types=["search"] + ), + AgentProfile( + id="writer_agent", + name="写作专家", + description="擅长文档撰写、内容创作", + capabilities=["writing", "documentation", "translation"], + max_concurrent_tasks=2, + preferred_task_types=["writing"] + ), + AgentProfile( + id="analyst_agent", + name="分析专家", + description="擅长数据分析、报告生成", + capabilities=["analysis", "reporting", "visualization"], + max_concurrent_tasks=2, + preferred_task_types=["analysis"] + ) + ] + + for profile in default_profiles: + pool.create_worker(profile, llm_client) + + return pool \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..78ecef1 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +flask>=3.0.0 +flask-cors>=4.0.0 +requests>=2.31.0 \ No newline at end of file diff --git a/start.sh b/start.sh new file mode 100755 index 0000000..ce04040 --- /dev/null +++ b/start.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +# 多智能体竞标调度系统启动脚本 + +cd "$(dirname "$0")" + +echo "启动多智能体竞标调度系统..." +echo "端口: 19015" +echo "LLM接口: http://192.168.2.17:19007/v1" + +python3 -m app.app --port 19015 --host 0.0.0.0 \ No newline at end of file diff --git a/templates/agents.html b/templates/agents.html new file mode 100644 index 0000000..9a5bdc6 --- /dev/null +++ b/templates/agents.html @@ -0,0 +1,213 @@ + + + + + + Agent管理 - 多智能体竞标调度系统 + + + + + + + +
+
+

Agent管理

+ +
+ +
+
+ +

加载中...

+
+
+
+ + + + + + + + \ No newline at end of file diff --git a/templates/index.html b/templates/index.html new file mode 100644 index 0000000..96d7f59 --- /dev/null +++ b/templates/index.html @@ -0,0 +1,279 @@ + + + + + + 多智能体竞标调度系统 + + + + + + + +
+
+

多智能体竞标调度系统

+

任务竞标 · 动态调度 · 智能容错

+
+ 代码专家 + 搜索专家 + 写作专家 + 分析专家 +
+
+
+ +
+ +
+
+
+ +

0

+

用户请求

+
+
+
+
+ +

0

+

任务总数

+
+
+
+
+ +

4

+

注册Agent

+
+
+
+
+ +

0

+

活跃执行

+
+
+
+ + +
+
+
+
发送请求
+
+ +
+ + +
+
快速测试:
+ + + +
+
+
+
+
+
执行日志
+
+等待输入... +
+
+
+
+ + +
+
+
+ +
竞标机制
+

Agent主动竞争任务,展示能力和方案,规划Agent综合评估选择最佳执行者。

+
+
+
+
+ +
动态调度
+

串行任务严格按依赖执行,并行任务最大化并发,智能分配执行顺序。

+
+
+
+
+ +
智能容错
+

超时自动切换备选Agent,验证失败给予重试机会,失败记录用于改进。

+
+
+
+
+ + + + + \ No newline at end of file diff --git a/templates/requests.html b/templates/requests.html new file mode 100644 index 0000000..19bf753 --- /dev/null +++ b/templates/requests.html @@ -0,0 +1,113 @@ + + + + + + 请求列表 - 多智能体竞标调度系统 + + + + + + + +
+
+

用户请求列表

+ +
+ +
+
+ +

加载中...

+
+
+
+ + + + + \ No newline at end of file diff --git a/templates/tasks.html b/templates/tasks.html new file mode 100644 index 0000000..e9591ef --- /dev/null +++ b/templates/tasks.html @@ -0,0 +1,139 @@ + + + + + + 任务列表 - 多智能体竞标调度系统 + + + + + + + +
+
+

任务列表

+
+ + +
+
+ +
+
+ +

加载中...

+
+
+
+ + + + + \ No newline at end of file