Files
multi-agent-bidding/app/models.py
hubian 05950a3c84 feat: 多智能体竞标调度系统 v1.0.0
核心组件:
- Orchestrator: 意图理解、任务拆分、竞标管理、结果验证
- Worker: 竞标任务、执行交付
- TaskBoard: 状态管理、信息存储
- BidEvaluator: 竞标评估算法
- ExecutionMonitor: 执行监控、超时处理
- LLMClient: 大模型接口调用

功能特性:
- 竞标机制:Agent主动竞争任务
- 动态调度:串行/并行任务智能调度
- 智能容错:超时切换、验证重试
- 质量保证:结果验证、历史追踪

Web界面:首页、请求列表、任务列表、Agent管理
API接口:请求/任务/Agent管理、测试接口
端口:19015
2026-04-12 01:54:15 +08:00

281 lines
9.4 KiB
Python

"""
多智能体竞标调度架构 - 核心数据模型
"""
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Any
from enum import Enum
import time
import uuid
class TaskStatus(Enum):
"""任务状态"""
PENDING = "pending" # 待发布
PUBLISHED = "published" # 已发布,等待竞标
BIDDING = "bidding" # 竞标中
ASSIGNED = "assigned" # 已分配
EXECUTING = "executing" # 执行中
VALIDATING = "validating" # 验证中
RETRY = "retry" # 重试中
COMPLETED = "completed" # 已完成
FAILED = "failed" # 已失败
TERMINATED = "terminated" # 已终止
class TaskType(Enum):
"""任务类型"""
SERIAL = "serial" # 串行任务
PARALLEL = "parallel" # 并行任务
INDEPENDENT = "independent" # 独立任务
@dataclass
class Task:
"""任务定义"""
id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
type: TaskType = TaskType.INDEPENDENT
description: str = ""
input_data: Dict = field(default_factory=dict)
output_schema: Dict = field(default_factory=dict)
validation_criteria: Dict = field(default_factory=dict)
timeout: int = 300 # 超时时间(秒)
priority: int = 0 # 优先级
dependencies: List[str] = field(default_factory=list) # 依赖的任务ID
status: TaskStatus = TaskStatus.PENDING
created_at: float = field(default_factory=time.time)
parent_request_id: Optional[str] = None # 原始用户请求ID
def to_dict(self) -> Dict:
return {
'id': self.id,
'type': self.type.value,
'description': self.description,
'input_data': self.input_data,
'output_schema': self.output_schema,
'validation_criteria': self.validation_criteria,
'timeout': self.timeout,
'priority': self.priority,
'dependencies': self.dependencies,
'status': self.status.value,
'created_at': self.created_at,
'parent_request_id': self.parent_request_id
}
@dataclass
class Bid:
"""竞标书"""
id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
task_id: str = ""
agent_id: str = ""
capability_match: float = 0.5 # 能力匹配度 0-1
estimated_time: int = 60 # 预估完成时间(秒)
confidence: float = 0.5 # 自信度 0-1
approach: str = "" # 方案描述
prerequisites: List[str] = field(default_factory=list) # 前置条件
alternative_approaches: List[str] = field(default_factory=list)
created_at: float = field(default_factory=time.time)
def to_dict(self) -> Dict:
return {
'id': self.id,
'task_id': self.task_id,
'agent_id': self.agent_id,
'capability_match': self.capability_match,
'estimated_time': self.estimated_time,
'confidence': self.confidence,
'approach': self.approach,
'prerequisites': self.prerequisites,
'alternative_approaches': self.alternative_approaches,
'created_at': self.created_at
}
@dataclass
class Execution:
"""执行记录"""
id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
task_id: str = ""
agent_id: str = ""
bid_id: str = ""
status: str = "running" # running | completed | failed | timeout
start_time: float = field(default_factory=time.time)
end_time: Optional[float] = None
result: Optional[Any] = None
error: Optional[str] = None
retry_count: int = 0
def to_dict(self) -> Dict:
return {
'id': self.id,
'task_id': self.task_id,
'agent_id': self.agent_id,
'bid_id': self.bid_id,
'status': self.status,
'start_time': self.start_time,
'end_time': self.end_time,
'result': self.result,
'error': self.error,
'retry_count': self.retry_count
}
@dataclass
class ValidationResult:
"""验证结果"""
passed: bool = False
checks: List[Dict] = field(default_factory=list)
issues: List[str] = field(default_factory=list)
score: float = 0.0 # 质量评分
def to_dict(self) -> Dict:
return {
'passed': self.passed,
'checks': self.checks,
'issues': self.issues,
'score': self.score
}
@dataclass
class Attempt:
"""一次完整的尝试记录"""
id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
task_id: str = ""
agent_id: str = ""
execution: Optional[Execution] = None
validation: Optional[ValidationResult] = None
success: bool = False
failure_type: Optional[str] = None # timeout | validation | capability | error
duration: float = 0.0
feedback: Optional[str] = None # 给Agent的反馈
def to_dict(self) -> Dict:
return {
'id': self.id,
'task_id': self.task_id,
'agent_id': self.agent_id,
'execution': self.execution.to_dict() if self.execution else None,
'validation': self.validation.to_dict() if self.validation else None,
'success': self.success,
'failure_type': self.failure_type,
'duration': self.duration,
'feedback': self.feedback
}
@dataclass
class AgentProfile:
"""Agent档案"""
id: str = ""
name: str = ""
description: str = ""
capabilities: List[str] = field(default_factory=list) # 能力标签
max_concurrent_tasks: int = 1
preferred_task_types: List[str] = field(default_factory=list)
# 历史表现
total_tasks: int = 0
success_tasks: int = 0
avg_completion_time: float = 0.0
avg_quality_score: float = 0.0
@property
def success_rate(self) -> float:
if self.total_tasks > 0:
return self.success_tasks / self.total_tasks
return 0.0
def to_dict(self) -> Dict:
return {
'id': self.id,
'name': self.name,
'description': self.description,
'capabilities': self.capabilities,
'max_concurrent_tasks': self.max_concurrent_tasks,
'preferred_task_types': self.preferred_task_types,
'total_tasks': self.total_tasks,
'success_tasks': self.success_tasks,
'avg_completion_time': self.avg_completion_time,
'avg_quality_score': self.avg_quality_score,
'success_rate': self.success_rate
}
@dataclass
class TaskGraph:
"""任务依赖图"""
tasks: Dict[str, Task] = field(default_factory=dict)
dependencies: Dict[str, List[str]] = field(default_factory=dict) # task_id -> [dep_task_ids]
reverse_deps: Dict[str, List[str]] = field(default_factory=dict) # task_id -> [dependents]
def add_task(self, task: Task):
"""添加任务"""
self.tasks[task.id] = task
self.dependencies[task.id] = task.dependencies
# 更新反向依赖
for dep_id in task.dependencies:
if dep_id not in self.reverse_deps:
self.reverse_deps[dep_id] = []
self.reverse_deps[dep_id].append(task.id)
def get_ready_tasks(self, completed: set) -> List[Task]:
"""获取当前可执行的任务(依赖已满足)"""
ready = []
for task_id, deps in self.dependencies.items():
if task_id not in completed:
if all(dep in completed for dep in deps):
ready.append(self.tasks[task_id])
return ready
def get_execution_order(self) -> List[List[Task]]:
"""获取分层执行顺序(每层内的任务可并行)"""
order = []
completed = set()
while len(completed) < len(self.tasks):
ready = self.get_ready_tasks(completed)
if not ready:
# 可能存在循环依赖
remaining = [t for t in self.tasks.values() if t.id not in completed]
if remaining:
# 把剩余任务强行加入(可能有问题)
order.append(remaining)
completed.update(t.id for t in remaining)
break
order.append(ready)
completed.update(t.id for t in ready)
return order
def to_dict(self) -> Dict:
return {
'tasks': {k: v.to_dict() for k, v in self.tasks.items()},
'dependencies': self.dependencies,
'reverse_deps': self.reverse_deps
}
@dataclass
class UserRequest:
"""用户请求"""
id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
content: str = ""
status: str = "pending" # pending | processing | completed | failed
created_at: float = field(default_factory=time.time)
task_graph: Optional[TaskGraph] = None
final_result: Optional[Any] = None
error: Optional[str] = None
def to_dict(self) -> Dict:
return {
'id': self.id,
'content': self.content,
'status': self.status,
'created_at': self.created_at,
'task_graph': self.task_graph.to_dict() if self.task_graph else None,
'final_result': self.final_result,
'error': self.error
}