""" LLM服务 - 大模型池管理,支持思考功能 """ import httpx from typing import List, Dict, AsyncGenerator, Optional, Tuple import json import logging import re logger = logging.getLogger(__name__) class LLMService: """大模型调用服务,支持思考功能""" def __init__(self): self.providers_cache = {} # 缓存Provider配置 def load_provider(self, provider_config: dict): """加载Provider配置""" provider_id = provider_config.get('id') self.providers_cache[provider_id] = { 'api_base': provider_config.get('api_base'), 'api_key': provider_config.get('api_key'), 'supports_thinking': provider_config.get('supports_thinking', False), 'thinking_model': provider_config.get('thinking_model'), 'default_model': provider_config.get('default_model'), 'max_tokens': provider_config.get('max_tokens', 4096), 'temperature': provider_config.get('temperature', 0.7) } async def get_available_models(self, api_base: str, api_key: str) -> List[dict]: """从API获取可用模型列表""" if not api_base: return [] try: url = f"{api_base}/models" headers = {"Authorization": f"Bearer {api_key}"} async with httpx.AsyncClient(timeout=10.0) as client: response = await client.get(url, headers=headers) if response.status_code == 200: data = response.json() models = [] for m in data.get('data', []): model_id = m.get('id', '') if model_id: models.append({ "id": model_id, "name": m.get('name', model_id), "owned_by": m.get('owned_by', 'unknown') }) return models except Exception as e: logger.warning(f"获取模型列表失败: {e}") return [] async def test_connection(self, api_base: str, api_key: str, model: str) -> dict: """测试API连接""" try: url = f"{api_base}/chat/completions" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } payload = { "model": model, "messages": [{"role": "user", "content": "测试连接"}], "max_tokens": 50 } async with httpx.AsyncClient(timeout=15.0) as client: response = await client.post(url, headers=headers, json=payload) if response.status_code == 200: data = response.json() content = data['choices'][0]['message']['content'] return { "success": True, "message": f"连接成功!模型响应: {content[:100]}", "model": model } else: return { "success": False, "message": f"连接失败: HTTP {response.status_code}" } except httpx.ConnectError: return {"success": False, "message": f"无法连接到API地址"} except httpx.TimeoutException: return {"success": False, "message": "连接超时"} except Exception as e: return {"success": False, "message": f"连接失败: {str(e)}"} async def chat( self, messages: List[Dict], provider_config: dict, agent_config: dict, enable_thinking: bool = True, images: List[Dict] = None # 图片数据列表 [{'name', 'type', 'data': base64}] ) -> Tuple[str, Optional[str]]: """ 调用AI模型进行对话 Args: messages: 对话历史 provider_config: LLM Provider配置 agent_config: Agent配置 enable_thinking: 是否启用思考 images: 图片数据列表(用于多模态模型) Returns: Tuple[str, Optional[str]]: (回复内容, 思考过程) """ api_base = provider_config.get('api_base') api_key = provider_config.get('api_key') model = agent_config.get('model_override') or provider_config.get('default_model', 'auto') supports_thinking = provider_config.get('supports_thinking', False) thinking_model = provider_config.get('thinking_model') max_tokens = provider_config.get('max_tokens', 4096) temperature = agent_config.get('temperature_override') or provider_config.get('temperature', 0.7) # 构建消息 final_messages = messages.copy() # 添加系统提示 system_prompt = agent_config.get('system_prompt', '你是一个有用的AI助手。') if final_messages and final_messages[0]['role'] != 'system': final_messages.insert(0, {"role": "system", "content": system_prompt}) # 如果有图片,构建多模态消息(只修改最后一条用户消息) if images and len(images) > 0: # 找到最后一条用户消息 for i in range(len(final_messages) - 1, -1, -1): if final_messages[i]['role'] == 'user': original_text = final_messages[i]['content'] # 构建多模态内容 multimodal_content = [{"type": "text", "text": original_text if original_text else "请描述这张图片"}] for img in images: multimodal_content.append({ "type": "image_url", "image_url": {"url": img['data']} # base64 data URL }) final_messages[i]['content'] = multimodal_content break thinking_content = None # 处理思考功能 if enable_thinking and agent_config.get('enable_thinking', True): thinking_prompt = agent_config.get('thinking_prompt') thinking_prefix = agent_config.get('thinking_prefix', '') thinking_suffix = agent_config.get('thinking_suffix', '') if supports_thinking and thinking_model: # 使用专门的思考模型 thinking_messages = final_messages.copy() if thinking_prompt: thinking_messages.append({"role": "system", "content": thinking_prompt}) try: thinking_result = await self._call_api( api_base, api_key, thinking_model, thinking_messages, max_tokens=min(max_tokens, 1000), temperature=0.3 ) thinking_content = thinking_result except Exception as e: logger.warning(f"思考模型调用失败: {e}") elif thinking_prompt: # Agent配置了思考提示词,添加到系统提示中 enhanced_system = f"{system_prompt}\n\n{thinking_prompt}" final_messages[0] = {"role": "system", "content": enhanced_system} # 调用主模型 try: response = await self._call_api( api_base, api_key, model, final_messages, max_tokens=max_tokens, temperature=temperature ) # 尝试从回复中提取思考内容(支持DeepSeek R1、GLM等模型的思考模式) if enable_thinking and agent_config.get('enable_thinking', True): thinking_prefix = agent_config.get('thinking_prefix', '') thinking_suffix = agent_config.get('thinking_suffix', '') # 如果没有配置前缀后缀,使用常见的思考标记 if not thinking_prefix: # 尝试常见的思考标记 common_thinking_markers = [ ('', ''), ('【思考】', '【回答】'), ('Thought:', 'Answer:'), ('思考:', '回答:'), ] for prefix, suffix in common_thinking_markers: if prefix in response and suffix in response: thinking_prefix = prefix thinking_suffix = suffix break # 提取思考部分 if thinking_prefix and thinking_suffix and thinking_prefix in response: try: start_idx = response.find(thinking_prefix) end_idx = response.find(thinking_suffix, start_idx) if end_idx > start_idx: thinking_content = response[start_idx + len(thinking_prefix):end_idx].strip() # 移除思考部分,只保留回复 response = response[end_idx + len(thinking_suffix):].strip() except Exception as e: logger.warning(f"提取思考内容失败: {e}") return response, thinking_content except Exception as e: logger.error(f"LLM调用失败: {e}") raise async def _call_api( self, api_base: str, api_key: str, model: str, messages: List[Dict], max_tokens: int = 4096, temperature: float = 0.7 ) -> str: """调用API""" url = f"{api_base.rstrip('/')}/chat/completions" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } payload = { "model": model, "messages": messages, "temperature": temperature, "max_tokens": max_tokens } # 打印请求详情(调试) logger.info(f"调用LLM: url={url}, model={model}") logger.info(f"消息数量: {len(messages)}, 第一条消息类型: {type(messages[0].get('content'))}") try: async with httpx.AsyncClient(timeout=60.0) as client: response = await client.post(url, headers=headers, json=payload) # 检查HTTP状态 if response.status_code != 200: logger.error(f"API返回错误: status={response.status_code}, body={response.text[:500]}") response.raise_for_status() data = response.json() # 检查响应格式 if 'choices' not in data or len(data['choices']) == 0: logger.error(f"API响应格式错误: {data}") raise ValueError("API响应格式错误:缺少choices") return data['choices'][0]['message']['content'] except httpx.HTTPStatusError as e: logger.error(f"HTTP错误: {e.response.status_code}, {e.response.text}") raise except Exception as e: logger.error(f"API调用异常: {type(e).__name__}: {e}") raise async def chat_stream( self, messages: List[Dict], provider_config: dict, agent_config: dict, enable_thinking: bool = True ) -> AsyncGenerator[dict, None]: """ 流式调用AI模型 Yields: dict: {"type": "thinking"|"content", "text": "..."} """ api_base = provider_config.get('api_base') api_key = provider_config.get('api_key') model = agent_config.get('model_override') or provider_config.get('default_model', 'auto') max_tokens = provider_config.get('max_tokens', 4096) temperature = agent_config.get('temperature_override') or provider_config.get('temperature', 0.7) # 构建消息 final_messages = messages.copy() system_prompt = agent_config.get('system_prompt', '你是一个有用的AI助手。') if final_messages and final_messages[0]['role'] != 'system': final_messages.insert(0, {"role": "system", "content": system_prompt}) # 如果启用思考但模型不支持 if enable_thinking and agent_config.get('enable_thinking', True): supports_thinking = provider_config.get('supports_thinking', False) thinking_prompt = agent_config.get('thinking_prompt') if not supports_thinking and thinking_prompt: thinking_prefix = agent_config.get('thinking_prefix', '') thinking_suffix = agent_config.get('thinking_suffix', '') enhanced_system = f"{system_prompt}\n\n在回答之前,请先思考问题。思考过程请用{thinking_prefix}和{thinking_suffix}包裹,然后再给出正式回答。" if thinking_prompt: enhanced_system += f"\n思考指导:{thinking_prompt}" final_messages[0] = {"role": "system", "content": enhanced_system} url = f"{api_base}/chat/completions" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } payload = { "model": model, "messages": final_messages, "stream": True, "temperature": temperature, "max_tokens": max_tokens } thinking_prefix = agent_config.get('thinking_prefix', '') thinking_suffix = agent_config.get('thinking_suffix', '') buffer = "" # 用于累积和检测思考部分 async with httpx.AsyncClient(timeout=60.0) as client: async with client.stream("POST", url, headers=headers, json=payload) as response: async for line in response.aiter_lines(): if line.startswith("data: "): data_str = line[6:] if data_str == "[DONE]": break try: data = json.loads(data_str) if 'choices' in data and len(data['choices']) > 0: delta = data['choices'][0].get('delta', {}) if 'content' in delta: text = delta['content'] buffer += text # 检测思考部分(简化逻辑) if thinking_prefix and thinking_suffix and thinking_prefix in buffer: # 尝试解析思考部分 try: start_idx = buffer.find(thinking_prefix) if start_idx >= 0: # 找到思考开始,继续找结束 end_idx = buffer.find(thinking_suffix, start_idx) if end_idx > start_idx: # 思考部分完整,发送思考然后发送内容 thinking = buffer[start_idx + len(thinking_prefix):end_idx] yield {"type": "thinking", "text": thinking} # 发送思考后的内容 remaining = buffer[end_idx + len(thinking_suffix):] if remaining: yield {"type": "content", "text": remaining} buffer = "" else: # 思考部分还没结束,先发送之前的内容 if start_idx > 0: yield {"type": "content", "text": buffer[:start_idx]} # 等待更多数据来完成思考部分 buffer = buffer[start_idx:] else: # 没有思考标记,直接发送内容 yield {"type": "content", "text": text} buffer = "" except: yield {"type": "content", "text": text} else: # 没有思考标记配置,直接发送内容 yield {"type": "content", "text": text} except json.JSONDecodeError: continue # 处理剩余buffer if buffer: yield {"type": "content", "text": buffer} async def chat_with_tools( self, messages: List[Dict], provider_config: dict, agent_config: dict, tools: List[Dict] = None, enable_thinking: bool = True, images: List[Dict] = None ) -> Tuple[str, Optional[str], Optional[List[Dict]]]: """ 支持Function Calling的对话 Args: messages: 对话历史 provider_config: LLM Provider配置 agent_config: Agent配置 tools: 工具定义列表(OpenAI Function Calling格式) enable_thinking: 是否启用思考 images: 图片数据列表 Returns: Tuple[str, Optional[str], Optional[List[Dict]]]: (回复内容, 思考过程, 工具调用记录) """ api_base = provider_config.get('api_base') api_key = provider_config.get('api_key') model = agent_config.get('model_override') or provider_config.get('default_model', 'auto') supports_function_calling = provider_config.get('supports_function_calling', False) max_tokens = provider_config.get('max_tokens', 4096) temperature = agent_config.get('temperature_override') or provider_config.get('temperature', 0.7) # 如果不支持Function Calling,直接调用普通chat if not supports_function_calling or not tools: response, thinking = await self.chat(messages, provider_config, agent_config, enable_thinking, images) return response, thinking, None # 构建消息 final_messages = messages.copy() system_prompt = agent_config.get('system_prompt', '你是一个有用的AI助手。') if final_messages and final_messages[0]['role'] != 'system': final_messages.insert(0, {"role": "system", "content": system_prompt}) # 处理图片(多模态) if images and len(images) > 0: for i in range(len(final_messages) - 1, -1, -1): if final_messages[i]['role'] == 'user': original_text = final_messages[i]['content'] multimodal_content = [{"type": "text", "text": original_text if original_text else "请描述这张图片"}] for img in images: multimodal_content.append({ "type": "image_url", "image_url": {"url": img['data']} }) final_messages[i]['content'] = multimodal_content break # 第一次调用:让LLM决定是否调用工具 url = f"{api_base.rstrip('/')}/chat/completions" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } payload = { "model": model, "messages": final_messages, "temperature": temperature, "max_tokens": max_tokens, "tools": tools # 传入工具定义 } logger.info(f"Function Calling调用: url={url}, model={model}, tools={len(tools)}") tool_calls_record = [] # 记录工具调用 try: async with httpx.AsyncClient(timeout=60.0) as client: response = await client.post(url, headers=headers, json=payload) if response.status_code != 200: logger.error(f"API返回错误: status={response.status_code}, body={response.text[:500]}") response.raise_for_status() data = response.json() if 'choices' not in data or len(data['choices']) == 0: raise ValueError("API响应格式错误:缺少choices") message = data['choices'][0]['message'] # 检查是否有工具调用 if 'tool_calls' in message and message['tool_calls']: logger.info(f"LLM请求调用工具: {len(message['tool_calls'])} 个") # 将LLM的工具调用消息添加到历史 final_messages.append({ "role": "assistant", "content": None, "tool_calls": message['tool_calls'] }) # 记录工具调用 for tc in message['tool_calls']: tool_calls_record.append({ "id": tc['id'], "name": tc['function']['name'], "arguments": json.loads(tc['function']['arguments']) }) # 返回工具调用记录,由调用方执行工具 return None, None, tool_calls_record # 没有工具调用,直接返回内容 content = message.get('content', '') # 处理思考内容(如果有) thinking_content = None # 这里可以添加思考内容提取逻辑 return content, thinking_content, None except httpx.HTTPStatusError as e: logger.error(f"HTTP错误: {e.response.status_code}, {e.response.text}") raise except Exception as e: logger.error(f"Function Calling调用异常: {type(e).__name__}: {e}") raise async def chat_with_tool_results( self, messages: List[Dict], provider_config: dict, agent_config: dict, tool_results: List[Dict], enable_thinking: bool = True ) -> Tuple[str, Optional[str]]: """ 第二阶段调用:将工具执行结果返回给LLM Args: messages: 对话历史(包含工具调用和结果) provider_config: LLM Provider配置 agent_config: Agent配置 tool_results: 工具执行结果 [{"tool_call_id": "xxx", "content": "..."}] Returns: Tuple[str, Optional[str]]: (回复内容, 思考过程) """ api_base = provider_config.get('api_base') api_key = provider_config.get('api_key') model = agent_config.get('model_override') or provider_config.get('default_model', 'auto') max_tokens = provider_config.get('max_tokens', 4096) temperature = agent_config.get('temperature_override') or provider_config.get('temperature', 0.7) # 将工具结果添加到消息历史 final_messages = messages.copy() for result in tool_results: final_messages.append({ "role": "tool", "tool_call_id": result['tool_call_id'], "content": result['content'] }) # 调用LLM生成最终回复 url = f"{api_base.rstrip('/')}/chat/completions" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } payload = { "model": model, "messages": final_messages, "temperature": temperature, "max_tokens": max_tokens } logger.info(f"工具结果返回LLM: url={url}, model={model}") try: async with httpx.AsyncClient(timeout=60.0) as client: response = await client.post(url, headers=headers, json=payload) if response.status_code != 200: logger.error(f"API返回错误: status={response.status_code}") response.raise_for_status() data = response.json() content = data['choices'][0]['message']['content'] return content, None except Exception as e: logger.error(f"工具结果调用异常: {e}") raise # 全局实例 llm_service = LLMService()