Files
multi-agent-bidding/app/worker.py
hubian 05950a3c84 feat: 多智能体竞标调度系统 v1.0.0
核心组件:
- Orchestrator: 意图理解、任务拆分、竞标管理、结果验证
- Worker: 竞标任务、执行交付
- TaskBoard: 状态管理、信息存储
- BidEvaluator: 竞标评估算法
- ExecutionMonitor: 执行监控、超时处理
- LLMClient: 大模型接口调用

功能特性:
- 竞标机制:Agent主动竞争任务
- 动态调度:串行/并行任务智能调度
- 智能容错:超时切换、验证重试
- 质量保证:结果验证、历史追踪

Web界面:首页、请求列表、任务列表、Agent管理
API接口:请求/任务/Agent管理、测试接口
端口:19015
2026-04-12 01:54:15 +08:00

202 lines
6.1 KiB
Python

"""
执行Agent (Worker) - 竞标任务、执行交付
"""
import time
import json
from typing import Dict, List, Optional, Any
from .models import Task, Bid, Execution, AgentProfile
from .task_board import TaskBoard
from .llm_client import LLMClient, default_client
class Worker:
"""执行Agent"""
def __init__(
self,
profile: AgentProfile,
task_board: TaskBoard,
llm_client: LLMClient = None
):
self.profile = profile
self.task_board = task_board
self.llm = llm_client or default_client
# 注册到任务公告板
self.task_board.register_agent(profile)
# 当前任务
self.current_tasks: Dict[str, Execution] = {}
# === 竞标 ===
def bid_for_task(self, task: Task) -> Optional[Bid]:
"""
为任务竞标
Args:
task: 任务
Returns:
竞标书或None
"""
try:
# 使用LLM评估任务并生成竞标
bid_data = self.llm.generate_bid(task.to_dict(), self.profile.to_dict())
bid = Bid(
task_id=task.id,
agent_id=self.profile.id,
capability_match=float(bid_data.get('capability_match', 0.5)),
estimated_time=int(bid_data.get('estimated_time', 60)),
confidence=float(bid_data.get('confidence', 0.5)),
approach=bid_data.get('approach', ''),
prerequisites=bid_data.get('prerequisites', []),
alternative_approaches=bid_data.get('alternative_approaches', [])
)
return bid
except Exception as e:
return None
# === 执行 ===
def execute_task(self, task: Task, bid: Bid) -> Execution:
"""
执行任务
Args:
task: 任务
bid: 竞标书
Returns:
Execution对象
"""
execution = Execution(
task_id=task.id,
agent_id=self.profile.id,
bid_id=bid.id,
status='running'
)
self.current_tasks[task.id] = execution
self.task_board.add_execution(execution)
try:
# 使用LLM执行任务
result = self.llm.execute_task(task.to_dict(), bid.approach)
execution.status = 'completed'
execution.end_time = time.time()
execution.result = result.get('result') or result
self.task_board.update_execution(execution)
# 更新统计
duration = execution.end_time - execution.start_time
self.task_board.update_agent_stats(
self.profile.id,
success=True,
duration=duration
)
except Exception as e:
execution.status = 'failed'
execution.end_time = time.time()
execution.error = str(e)
self.task_board.update_execution(execution)
# 更新统计
self.task_board.update_agent_stats(
self.profile.id,
success=False,
duration=time.time() - execution.start_time
)
finally:
self.current_tasks.pop(task.id, None)
return execution
def get_status(self) -> Dict:
"""获取Worker状态"""
return {
'profile': self.profile.to_dict(),
'current_tasks': len(self.current_tasks),
'is_busy': len(self.current_tasks) >= self.profile.max_concurrent_tasks
}
class WorkerPool:
"""Worker池"""
def __init__(self, task_board: TaskBoard):
self.task_board = task_board
self.workers: Dict[str, Worker] = {}
def create_worker(self, profile: AgentProfile, llm_client: LLMClient = None) -> Worker:
"""创建Worker"""
worker = Worker(profile, self.task_board, llm_client)
self.workers[profile.id] = worker
return worker
def get_worker(self, agent_id: str) -> Optional[Worker]:
"""获取Worker"""
return self.workers.get(agent_id)
def list_workers(self) -> List[Worker]:
"""列出所有Worker"""
return list(self.workers.values())
def find_available_workers(self) -> List[Worker]:
"""找到空闲的Worker"""
return [w for w in self.workers.values() if not w.get_status()['is_busy']]
def create_default_workers(task_board: TaskBoard, llm_client: LLMClient = None) -> WorkerPool:
"""创建默认的Worker池"""
pool = WorkerPool(task_board)
# 创建几个默认的Agent
default_profiles = [
AgentProfile(
id="coder_agent",
name="代码专家",
description="擅长代码编写、调试、优化",
capabilities=["coding", "debugging", "optimization"],
max_concurrent_tasks=2,
preferred_task_types=["coding"]
),
AgentProfile(
id="search_agent",
name="搜索专家",
description="擅长信息搜索、资料收集",
capabilities=["search", "research", "summarization"],
max_concurrent_tasks=3,
preferred_task_types=["search"]
),
AgentProfile(
id="writer_agent",
name="写作专家",
description="擅长文档撰写、内容创作",
capabilities=["writing", "documentation", "translation"],
max_concurrent_tasks=2,
preferred_task_types=["writing"]
),
AgentProfile(
id="analyst_agent",
name="分析专家",
description="擅长数据分析、报告生成",
capabilities=["analysis", "reporting", "visualization"],
max_concurrent_tasks=2,
preferred_task_types=["analysis"]
)
]
for profile in default_profiles:
pool.create_worker(profile, llm_client)
return pool