""" Matrix Bot 服务 v2.0 - 使用Agent架构 支持渠道独立Agent绑定 """ import asyncio import logging import os from typing import Optional, Callable from nio import AsyncClient, RoomMessageText, MegolmEvent from models_v2 import SessionLocal, User, Conversation, Message, SystemConfig, Agent, Channel, ChannelAgentMapping from services.conversation_service import ConversationService from services.agent_service import AgentService, ChannelService from services.llm_service import llm_service logger = logging.getLogger(__name__) MAIN_USER_ID = "main_user" STORE_PATH = "/home/xian/.openclaw/workspace-coder/works/ai-chat/matrix_store" class MatrixBotV2: """Matrix Bot v2.0 - 支持Agent架构""" def __init__(self): self.client: Optional[AsyncClient] = None self.homeserver: str = "" self.user_id: str = "" self.password: str = "" self.is_running: bool = False self.on_message_callback: Optional[Callable] = None self.last_room_id: str = "" self.processed_events: set = set() self.channel_id: Optional[int] = None # Matrix渠道ID async def init_from_config(self): """从渠道配置初始化""" db = SessionLocal() try: # 获取Matrix渠道配置 channel_service = ChannelService(db) channel = channel_service.get_channel_by_type('matrix') if not channel or not channel.is_active: logger.warning("Matrix渠道未配置或未启用") return False self.channel_id = channel.id # 从渠道配置获取Matrix参数 config = channel.config or {} self.homeserver = config.get('homeserver', 'http://matrix.tphai.com') self.user_id = config.get('username', '@tester:matrix.tphai.com') self.password = config.get('password', 'tester12345@!') logger.info(f"Matrix配置: homeserver={self.homeserver}, user={self.user_id}") if self.user_id and self.password: os.makedirs(STORE_PATH, exist_ok=True) self.client = AsyncClient( self.homeserver, self.user_id, store_path=STORE_PATH ) self.is_running = True logger.info(f"Matrix nio 客户端已初始化") return True finally: db.close() return False async def start_sync(self, message_handler: Callable = None): """开始同步消息""" if not self.is_running or not self.client: logger.warning("Matrix未连接") return self.on_message_callback = message_handler try: login_resp = await self.client.login(self.password) logger.info(f"Matrix登录成功: {login_resp}") sync_resp = await self.client.sync(full_state=True) logger.info(f"初始同步完成: next_batch={sync_resp.next_batch}") self.client.add_event_callback(self._handle_nio_message, RoomMessageText) self.client.add_event_callback(self._handle_encrypted_message, MegolmEvent) logger.info("Matrix消息回调已注册") asyncio.create_task(self._run_sync_forever()) logger.info("Matrix nio sync_forever 任务已启动") except Exception as e: logger.error(f"Matrix启动失败: {e}") import traceback logger.error(traceback.format_exc()) async def _run_sync_forever(self): """运行 sync_forever""" try: logger.info("开始 Matrix sync_forever...") await self.client.sync_forever(timeout=30000) except Exception as e: logger.error(f"sync_forever 错误: {e}") self.is_running = False async def _handle_encrypted_message(self, room, event): """处理加密消息""" logger.info(f"收到加密消息: [{room.room_id}] event_id={event.event_id}") try: body = None sender = event.sender if event.decrypted: try: decrypted_event = event.parse_decrypted_event(event.decrypted) if decrypted_event and hasattr(decrypted_event, 'body'): body = decrypted_event.body sender = decrypted_event.sender if hasattr(decrypted_event, 'sender') else event.sender except Exception as e: logger.warning(f"parse_decrypted_event 失败: {e}") if not body and hasattr(event, 'source') and event.source: content = event.source.get('content', {}) if 'body' in content: body = content.get('body', '') sender = event.source.get('sender', event.sender) if not body and hasattr(event, 'body') and event.body: body = event.body if body: logger.info(f"解密成功: sender={sender}, body={body[:50]}") await self._process_message(room, sender, body, event.event_id) else: logger.warning(f"加密消息无法解密: event_id={event.event_id}") except Exception as e: logger.error(f"处理加密消息失败: {e}") async def _handle_nio_message(self, room, event): """处理普通文本消息""" sender = event.sender if sender == self.user_id: return message_text = event.body.strip() logger.info(f"Matrix收到文本消息: [{room.room_id}] {sender}: {message_text}") await self._process_message(room, sender, message_text, event.event_id) async def _process_message(self, room, sender, message_text, event_id): """处理消息核心逻辑""" if event_id in self.processed_events: return self.processed_events.add(event_id) if len(self.processed_events) > 100: self.processed_events = set(list(self.processed_events)[-50:]) if sender == self.user_id: return self.last_room_id = room.room_id db = SessionLocal() try: conv_service = ConversationService(db) agent_service = AgentService(db) channel_service = ChannelService(db) # 获取或创建主用户 main_user = conv_service.get_or_create_user( MAIN_USER_ID, display_name="主用户", user_type='web' ) # 获取Matrix渠道绑定的Agent agent = agent_service.get_agent_for_channel(self.channel_id) if not agent: agent = agent_service.get_default_agent() agent_config = agent_service.get_agent_config(agent.id) if agent else None # 处理命令 if message_text == "/new": conversation = conv_service.create_conversation( main_user.id, channel_id=self.channel_id ) await self.send_message(room.room_id, "✅ 已创建新会话") logger.info(f"Matrix创建新会话: {conversation.conversation_id}") if self.on_message_callback: await self.on_message_callback( action="new_conversation", conversation_id=conversation.conversation_id, room_id=room.room_id ) return # 处理Agent切换命令 if message_text.startswith("/agent "): agent_name = message_text[7:].strip() new_agent = agent_service.get_agent_by_name(agent_name) if new_agent and new_agent.is_active: await self.send_message(room.room_id, f"✅ 已切换到Agent: {new_agent.display_name}") # TODO: 更新会话Agent else: await self.send_message(room.room_id, f"❌ Agent '{agent_name}' 不存在或未启用") return # 处理思考开关命令 enable_thinking = True if message_text == "/thinking off": enable_thinking = False await self.send_message(room.room_id, "✅ 思考功能已关闭") return elif message_text == "/thinking on": enable_thinking = True await self.send_message(room.room_id, "✅ 思考功能已开启") return # 获取最新会话 conversations = conv_service.get_user_conversations(main_user.id) if not conversations: conversation = conv_service.create_conversation(main_user.id, channel_id=self.channel_id) else: conversation = conversations[0] # 保存用户消息 user_msg = conv_service.add_message( conversation_id=conversation.id, role='user', content=message_text, source='matrix', extra_data={ 'event_id': event_id, 'room_id': room.room_id, 'sender': sender } ) # 通知网页端 if self.on_message_callback: await self.on_message_callback( action="user_message", conversation_id=conversation.conversation_id, user_message=message_text, room_id=room.room_id, message_id=user_msg.id, sender=sender ) # 调用AI生成回复 if agent_config and agent_config.get('provider'): try: await self.client.room_typing(room.room_id, typing_state=True) history = conv_service.get_conversation_history( conversation.conversation_id, limit=agent_config['agent'].get('max_history', 20) ) # 调用LLM response, thinking_content = await llm_service.chat( messages=history, provider_config=agent_config['provider'], agent_config=agent_config['agent'], enable_thinking=enable_thinking and agent_config['agent'].get('enable_thinking', True) ) # 保存AI回复 assistant_msg = conv_service.add_message( conversation_id=conversation.id, role='assistant', content=response, source='matrix', thinking_content=thinking_content, agent_id=agent.id, model_used=agent_config['provider'].get('default_model') ) # 发送回复到Matrix await self.send_message(room.room_id, response) await self.client.room_typing(room.room_id, typing_state=False) # 同步到网页端 if self.on_message_callback: await self.on_message_callback( action="assistant_message", conversation_id=conversation.conversation_id, room_id=room.room_id, message_id=assistant_msg.id, content=response, thinking_content=thinking_content, agent_id=agent.id, agent_name=agent_config['agent'].get('display_name') ) logger.info(f"Matrix AI回复已发送: {response[:50]}") except Exception as e: logger.error(f"AI调用失败: {e}") await self.send_message(room.room_id, f"处理消息时出错: {str(e)}") await self.client.room_typing(room.room_id, typing_state=False) else: await self.send_message(room.room_id, "❌ Agent配置不完整,无法生成回复") except Exception as e: logger.error(f"处理Matrix消息失败: {e}") import traceback logger.error(traceback.format_exc()) await self.send_message(room.room_id, f"处理消息时出错: {str(e)}") finally: db.close() async def send_message(self, room_id: str, message: str): """发送消息到Matrix房间""" if not self.client: logger.error("Matrix客户端未初始化") return False try: await self.client.room_send( room_id, "m.room.message", {"msgtype": "m.text", "body": message} ) logger.info(f"Matrix消息发送成功: {room_id}") return True except Exception as e: error_str = str(e) if "not verified" in error_str or "OlmUnverifiedDeviceError" in error_str: logger.warning(f"设备未验证: {e}") try: for user_id, devices in self.client.device_store.items(): for device_id, device in devices.items(): if not device.verified: logger.info(f"验证设备: {user_id} {device_id}") self.client.verify_device(device) await self.client.room_send( room_id, "m.room.message", {"msgtype": "m.text", "body": message} ) return True except Exception as e2: logger.error(f"验证设备后发送仍失败: {e2}") return False else: logger.error(f"发送Matrix消息错误: {e}") return False async def disconnect(self): """断开连接""" self.is_running = False if self.client: await self.client.close() # 全局实例 matrix_bot = MatrixBotV2()