342 lines
15 KiB
Python
342 lines
15 KiB
Python
"""
|
||
LLM服务 - 大模型池管理,支持思考功能
|
||
"""
|
||
import httpx
|
||
from typing import List, Dict, AsyncGenerator, Optional, Tuple
|
||
import json
|
||
import logging
|
||
import re
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class LLMService:
|
||
"""大模型调用服务,支持思考功能"""
|
||
|
||
def __init__(self):
|
||
self.providers_cache = {} # 缓存Provider配置
|
||
|
||
def load_provider(self, provider_config: dict):
|
||
"""加载Provider配置"""
|
||
provider_id = provider_config.get('id')
|
||
self.providers_cache[provider_id] = {
|
||
'api_base': provider_config.get('api_base'),
|
||
'api_key': provider_config.get('api_key'),
|
||
'supports_thinking': provider_config.get('supports_thinking', False),
|
||
'thinking_model': provider_config.get('thinking_model'),
|
||
'default_model': provider_config.get('default_model'),
|
||
'max_tokens': provider_config.get('max_tokens', 4096),
|
||
'temperature': provider_config.get('temperature', 0.7)
|
||
}
|
||
|
||
async def get_available_models(self, api_base: str, api_key: str) -> List[dict]:
|
||
"""从API获取可用模型列表"""
|
||
if not api_base:
|
||
return []
|
||
|
||
try:
|
||
url = f"{api_base}/models"
|
||
headers = {"Authorization": f"Bearer {api_key}"}
|
||
|
||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||
response = await client.get(url, headers=headers)
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
models = []
|
||
for m in data.get('data', []):
|
||
model_id = m.get('id', '')
|
||
if model_id:
|
||
models.append({
|
||
"id": model_id,
|
||
"name": m.get('name', model_id),
|
||
"owned_by": m.get('owned_by', 'unknown')
|
||
})
|
||
return models
|
||
except Exception as e:
|
||
logger.warning(f"获取模型列表失败: {e}")
|
||
|
||
return []
|
||
|
||
async def test_connection(self, api_base: str, api_key: str, model: str) -> dict:
|
||
"""测试API连接"""
|
||
try:
|
||
url = f"{api_base}/chat/completions"
|
||
headers = {
|
||
"Authorization": f"Bearer {api_key}",
|
||
"Content-Type": "application/json"
|
||
}
|
||
payload = {
|
||
"model": model,
|
||
"messages": [{"role": "user", "content": "测试连接"}],
|
||
"max_tokens": 50
|
||
}
|
||
|
||
async with httpx.AsyncClient(timeout=15.0) as client:
|
||
response = await client.post(url, headers=headers, json=payload)
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
content = data['choices'][0]['message']['content']
|
||
return {
|
||
"success": True,
|
||
"message": f"连接成功!模型响应: {content[:100]}",
|
||
"model": model
|
||
}
|
||
else:
|
||
return {
|
||
"success": False,
|
||
"message": f"连接失败: HTTP {response.status_code}"
|
||
}
|
||
except httpx.ConnectError:
|
||
return {"success": False, "message": f"无法连接到API地址"}
|
||
except httpx.TimeoutException:
|
||
return {"success": False, "message": "连接超时"}
|
||
except Exception as e:
|
||
return {"success": False, "message": f"连接失败: {str(e)}"}
|
||
|
||
async def chat(
|
||
self,
|
||
messages: List[Dict],
|
||
provider_config: dict,
|
||
agent_config: dict,
|
||
enable_thinking: bool = True
|
||
) -> Tuple[str, Optional[str]]:
|
||
"""
|
||
调用AI模型进行对话
|
||
|
||
Returns:
|
||
Tuple[str, Optional[str]]: (回复内容, 思考过程)
|
||
"""
|
||
api_base = provider_config.get('api_base')
|
||
api_key = provider_config.get('api_key')
|
||
model = agent_config.get('model_override') or provider_config.get('default_model', 'auto')
|
||
supports_thinking = provider_config.get('supports_thinking', False)
|
||
thinking_model = provider_config.get('thinking_model')
|
||
|
||
max_tokens = provider_config.get('max_tokens', 4096)
|
||
temperature = agent_config.get('temperature_override') or provider_config.get('temperature', 0.7)
|
||
|
||
# 构建消息
|
||
final_messages = messages.copy()
|
||
|
||
# 添加系统提示
|
||
system_prompt = agent_config.get('system_prompt', '你是一个有用的AI助手。')
|
||
if final_messages and final_messages[0]['role'] != 'system':
|
||
final_messages.insert(0, {"role": "system", "content": system_prompt})
|
||
|
||
thinking_content = None
|
||
|
||
# 处理思考功能
|
||
if enable_thinking and agent_config.get('enable_thinking', True):
|
||
thinking_prompt = agent_config.get('thinking_prompt')
|
||
thinking_prefix = agent_config.get('thinking_prefix', '')
|
||
thinking_suffix = agent_config.get('thinking_suffix', '')
|
||
|
||
if supports_thinking and thinking_model:
|
||
# 使用专门的思考模型
|
||
thinking_messages = final_messages.copy()
|
||
if thinking_prompt:
|
||
thinking_messages.append({"role": "system", "content": thinking_prompt})
|
||
|
||
try:
|
||
thinking_result = await self._call_api(
|
||
api_base, api_key, thinking_model, thinking_messages,
|
||
max_tokens=min(max_tokens, 1000),
|
||
temperature=0.3
|
||
)
|
||
thinking_content = thinking_result
|
||
except Exception as e:
|
||
logger.warning(f"思考模型调用失败: {e}")
|
||
|
||
elif thinking_prompt:
|
||
# Agent配置了思考提示词,添加到系统提示中
|
||
enhanced_system = f"{system_prompt}\n\n{thinking_prompt}"
|
||
final_messages[0] = {"role": "system", "content": enhanced_system}
|
||
|
||
# 调用主模型
|
||
try:
|
||
response = await self._call_api(
|
||
api_base, api_key, model, final_messages,
|
||
max_tokens=max_tokens,
|
||
temperature=temperature
|
||
)
|
||
|
||
# 尝试从回复中提取思考内容(支持DeepSeek R1、GLM等模型的思考模式)
|
||
if enable_thinking and agent_config.get('enable_thinking', True):
|
||
thinking_prefix = agent_config.get('thinking_prefix', '')
|
||
thinking_suffix = agent_config.get('thinking_suffix', '')
|
||
|
||
# 如果没有配置前缀后缀,使用常见的思考标记
|
||
if not thinking_prefix:
|
||
# 尝试常见的思考标记
|
||
common_thinking_markers = [
|
||
('<think>', '</think>'),
|
||
('【思考】', '【回答】'),
|
||
('Thought:', 'Answer:'),
|
||
('思考:', '回答:'),
|
||
]
|
||
for prefix, suffix in common_thinking_markers:
|
||
if prefix in response and suffix in response:
|
||
thinking_prefix = prefix
|
||
thinking_suffix = suffix
|
||
break
|
||
|
||
# 提取思考部分
|
||
if thinking_prefix and thinking_suffix and thinking_prefix in response:
|
||
try:
|
||
start_idx = response.find(thinking_prefix)
|
||
end_idx = response.find(thinking_suffix, start_idx)
|
||
if end_idx > start_idx:
|
||
thinking_content = response[start_idx + len(thinking_prefix):end_idx].strip()
|
||
# 移除思考部分,只保留回复
|
||
response = response[end_idx + len(thinking_suffix):].strip()
|
||
except Exception as e:
|
||
logger.warning(f"提取思考内容失败: {e}")
|
||
|
||
return response, thinking_content
|
||
|
||
except Exception as e:
|
||
logger.error(f"LLM调用失败: {e}")
|
||
raise
|
||
|
||
async def _call_api(
|
||
self,
|
||
api_base: str,
|
||
api_key: str,
|
||
model: str,
|
||
messages: List[Dict],
|
||
max_tokens: int = 4096,
|
||
temperature: float = 0.7
|
||
) -> str:
|
||
"""调用API"""
|
||
url = f"{api_base}/chat/completions"
|
||
headers = {
|
||
"Authorization": f"Bearer {api_key}",
|
||
"Content-Type": "application/json"
|
||
}
|
||
payload = {
|
||
"model": model,
|
||
"messages": messages,
|
||
"temperature": temperature,
|
||
"max_tokens": max_tokens
|
||
}
|
||
|
||
logger.info(f"调用LLM: url={url}, model={model}")
|
||
|
||
async with httpx.AsyncClient(timeout=60.0) as client:
|
||
response = await client.post(url, headers=headers, json=payload)
|
||
response.raise_for_status()
|
||
data = response.json()
|
||
return data['choices'][0]['message']['content']
|
||
|
||
async def chat_stream(
|
||
self,
|
||
messages: List[Dict],
|
||
provider_config: dict,
|
||
agent_config: dict,
|
||
enable_thinking: bool = True
|
||
) -> AsyncGenerator[dict, None]:
|
||
"""
|
||
流式调用AI模型
|
||
|
||
Yields:
|
||
dict: {"type": "thinking"|"content", "text": "..."}
|
||
"""
|
||
api_base = provider_config.get('api_base')
|
||
api_key = provider_config.get('api_key')
|
||
model = agent_config.get('model_override') or provider_config.get('default_model', 'auto')
|
||
max_tokens = provider_config.get('max_tokens', 4096)
|
||
temperature = agent_config.get('temperature_override') or provider_config.get('temperature', 0.7)
|
||
|
||
# 构建消息
|
||
final_messages = messages.copy()
|
||
system_prompt = agent_config.get('system_prompt', '你是一个有用的AI助手。')
|
||
if final_messages and final_messages[0]['role'] != 'system':
|
||
final_messages.insert(0, {"role": "system", "content": system_prompt})
|
||
|
||
# 如果启用思考但模型不支持
|
||
if enable_thinking and agent_config.get('enable_thinking', True):
|
||
supports_thinking = provider_config.get('supports_thinking', False)
|
||
thinking_prompt = agent_config.get('thinking_prompt')
|
||
|
||
if not supports_thinking and thinking_prompt:
|
||
thinking_prefix = agent_config.get('thinking_prefix', '')
|
||
thinking_suffix = agent_config.get('thinking_suffix', '')
|
||
enhanced_system = f"{system_prompt}\n\n在回答之前,请先思考问题。思考过程请用{thinking_prefix}和{thinking_suffix}包裹,然后再给出正式回答。"
|
||
if thinking_prompt:
|
||
enhanced_system += f"\n思考指导:{thinking_prompt}"
|
||
final_messages[0] = {"role": "system", "content": enhanced_system}
|
||
|
||
url = f"{api_base}/chat/completions"
|
||
headers = {
|
||
"Authorization": f"Bearer {api_key}",
|
||
"Content-Type": "application/json"
|
||
}
|
||
payload = {
|
||
"model": model,
|
||
"messages": final_messages,
|
||
"stream": True,
|
||
"temperature": temperature,
|
||
"max_tokens": max_tokens
|
||
}
|
||
|
||
thinking_prefix = agent_config.get('thinking_prefix', '')
|
||
thinking_suffix = agent_config.get('thinking_suffix', '')
|
||
buffer = "" # 用于累积和检测思考部分
|
||
|
||
async with httpx.AsyncClient(timeout=60.0) as client:
|
||
async with client.stream("POST", url, headers=headers, json=payload) as response:
|
||
async for line in response.aiter_lines():
|
||
if line.startswith("data: "):
|
||
data_str = line[6:]
|
||
if data_str == "[DONE]":
|
||
break
|
||
try:
|
||
data = json.loads(data_str)
|
||
if 'choices' in data and len(data['choices']) > 0:
|
||
delta = data['choices'][0].get('delta', {})
|
||
if 'content' in delta:
|
||
text = delta['content']
|
||
buffer += text
|
||
|
||
# 检测思考部分(简化逻辑)
|
||
if thinking_prefix and thinking_suffix and thinking_prefix in buffer:
|
||
# 尝试解析思考部分
|
||
try:
|
||
start_idx = buffer.find(thinking_prefix)
|
||
if start_idx >= 0:
|
||
# 找到思考开始,继续找结束
|
||
end_idx = buffer.find(thinking_suffix, start_idx)
|
||
if end_idx > start_idx:
|
||
# 思考部分完整,发送思考然后发送内容
|
||
thinking = buffer[start_idx + len(thinking_prefix):end_idx]
|
||
yield {"type": "thinking", "text": thinking}
|
||
# 发送思考后的内容
|
||
remaining = buffer[end_idx + len(thinking_suffix):]
|
||
if remaining:
|
||
yield {"type": "content", "text": remaining}
|
||
buffer = ""
|
||
else:
|
||
# 思考部分还没结束,先发送之前的内容
|
||
if start_idx > 0:
|
||
yield {"type": "content", "text": buffer[:start_idx]}
|
||
# 等待更多数据来完成思考部分
|
||
buffer = buffer[start_idx:]
|
||
else:
|
||
# 没有思考标记,直接发送内容
|
||
yield {"type": "content", "text": text}
|
||
buffer = ""
|
||
except:
|
||
yield {"type": "content", "text": text}
|
||
else:
|
||
# 没有思考标记配置,直接发送内容
|
||
yield {"type": "content", "text": text}
|
||
except json.JSONDecodeError:
|
||
continue
|
||
|
||
# 处理剩余buffer
|
||
if buffer:
|
||
yield {"type": "content", "text": buffer}
|
||
|
||
|
||
# 全局实例
|
||
llm_service = LLMService() |