""" Matrix Bot 服务 - 使用 nio 库处理消息收发(支持加密) """ import asyncio import logging import os from typing import Optional, Callable from nio import AsyncClient, RoomMessageText, MegolmEvent from models import SessionLocal, User, Conversation, Message, SystemConfig from services.conversation_service import ConversationService from services import ai_service logger = logging.getLogger(__name__) MAIN_USER_ID = "main_user" STORE_PATH = "/home/xian/.openclaw/workspace-coder/works/ai-chat/matrix_store" class MatrixBot: def __init__(self): self.client: Optional[AsyncClient] = None self.homeserver: str = "" self.user_id: str = "" self.password: str = "" self.is_running: bool = False self.on_message_callback: Optional[Callable] = None self.last_room_id: str = "" self.processed_events: set = set() # 已处理的事件ID,防止重复处理 async def init_from_config(self): """从数据库配置初始化""" db = SessionLocal() try: configs = {c.key: c.value for c in db.query(SystemConfig).all()} self.homeserver = configs.get('matrix_homeserver', 'http://matrix.tphai.com') self.user_id = configs.get('matrix_username', '@tester:matrix.tphai.com') self.password = configs.get('matrix_password', 'tester12345@!') logger.info(f"Matrix配置: homeserver={self.homeserver}, user={self.user_id}") if self.user_id and self.password: # 创建存储目录 os.makedirs(STORE_PATH, exist_ok=True) # 使用加密存储 self.client = AsyncClient( self.homeserver, self.user_id, store_path=STORE_PATH ) self.is_running = True logger.info(f"Matrix nio 客户端已初始化,存储路径: {STORE_PATH}") return True finally: db.close() return False async def start_sync(self, message_handler: Callable = None): """开始同步消息""" if not self.is_running or not self.client: logger.warning("Matrix未连接") return self.on_message_callback = message_handler # 登录 try: login_resp = await self.client.login(self.password) logger.info(f"Matrix登录成功: {login_resp}") # 加载加密存储 logger.info("加载加密存储...") # 第一次同步获取房间密钥 sync_resp = await self.client.sync(full_state=True) logger.info(f"初始同步完成: next_batch={sync_resp.next_batch}") # 注册消息处理器 - 普通文本消息 self.client.add_event_callback(self._handle_nio_message, RoomMessageText) # 注册加密消息处理器(解密后) self.client.add_event_callback(self._handle_encrypted_message, MegolmEvent) logger.info("Matrix消息回调已注册") # 使用 sync_forever 自动处理事件(后台任务) asyncio.create_task(self._run_sync_forever()) logger.info("Matrix nio sync_forever 任务已启动") except Exception as e: logger.error(f"Matrix启动失败: {e}") import traceback logger.error(traceback.format_exc()) async def _run_sync_forever(self): """运行 sync_forever,自动处理所有事件""" try: logger.info("开始 Matrix sync_forever...") # 不使用 full_state,只做增量同步 await self.client.sync_forever(timeout=30000) except Exception as e: logger.error(f"sync_forever 错误: {e}") import traceback logger.error(traceback.format_exc()) self.is_running = False async def _handle_encrypted_message(self, room, event): """处理加密消息(MegolmEvent)""" logger.info(f"收到加密消息: [{room.room_id}] event_id={event.event_id}") # 检查是否已解密 logger.info(f"event.decrypted: {event.decrypted}") try: body = None sender = event.sender # 方法1: 检查 decrypted 属性 if event.decrypted: logger.info(f"消息已解密: {event.decrypted}") # 使用 parse_decrypted_event 获取解密后的完整事件 try: decrypted_event = event.parse_decrypted_event(event.decrypted) if decrypted_event: logger.info(f"解密事件类型: {type(decrypted_event)}") if hasattr(decrypted_event, 'body'): body = decrypted_event.body sender = decrypted_event.sender if hasattr(decrypted_event, 'sender') else event.sender logger.info(f"从 decrypted 获取: {body}") except Exception as e: logger.warning(f"parse_decrypted_event 失败: {e}") # 方法2: 从 source 获取(如果是自己发的消息) if not body and hasattr(event, 'source') and event.source: content = event.source.get('content', {}) # 检查是否有 plaintext body if 'body' in content: body = content.get('body', '') sender = event.source.get('sender', event.sender) logger.info(f"从source.content获取: {body[:50] if body else 'empty'}") # 方法3: 直接尝试 event.body (某些情况下可能有) if not body and hasattr(event, 'body') and event.body: body = event.body logger.info(f"从event.body获取: {body[:50]}") if body: logger.info(f"解密成功: sender={sender}, body={body[:50]}") await self._process_message(room, sender, body, event.event_id) else: logger.warning(f"加密消息无法解密: event_id={event.event_id}") logger.warning(f"event.decrypted={event.decrypted}, 需要密钥") except Exception as e: logger.error(f"处理加密消息失败: {e}") import traceback logger.error(traceback.format_exc()) async def _handle_nio_message(self, room, event): """处理 nio 收到的消息(普通文本)""" # 忽略自己发送的消息 sender = event.sender if sender == self.user_id: logger.debug(f"忽略自己发送的消息: {sender}") return message_text = event.body.strip() logger.info(f"Matrix收到文本消息: [{room.room_id}] {sender}: {message_text}") # 调用统一处理方法 await self._process_message(room, sender, message_text, event.event_id) async def _process_message(self, room, sender, message_text, event_id): """处理消息的核心逻辑""" # 检查是否已处理过此事件(防止重复处理) if event_id in self.processed_events: logger.debug(f"忽略已处理的事件: {event_id}") return # 标记为已处理 self.processed_events.add(event_id) logger.info(f"处理消息: event_id={event_id}") # 限制集合大小,保留最近的100个 if len(self.processed_events) > 100: # 清理旧的一半 self.processed_events = set(list(self.processed_events)[-50:]) # 忽略自己发送的消息 if sender == self.user_id: logger.debug(f"忽略自己发送的消息: {sender}") return # 保存房间ID self.last_room_id = room.room_id db = SessionLocal() try: conv_service = ConversationService(db) # 使用固定主用户 main_user = conv_service.get_or_create_user( MAIN_USER_ID, display_name="主用户", user_type='web' ) # 处理 /new 命令 if message_text == "/new": conversation = conv_service.create_conversation(main_user.id) await self.send_message(room.room_id, "✅ 已创建新会话") logger.info(f"Matrix创建新会话: {conversation.conversation_id}") if self.on_message_callback: await self.on_message_callback( action="new_conversation", conversation_id=conversation.conversation_id, room_id=room.room_id ) return # 获取最新会话 conversations = conv_service.get_user_conversations(main_user.id) if not conversations: conversation = conv_service.create_conversation(main_user.id) else: conversation = conversations[0] # 保存用户消息 user_msg = conv_service.add_message( conversation_id=conversation.id, role='user', content=message_text, source='matrix', extra_data={ 'event_id': event_id, 'room_id': room.room_id, 'sender': sender } ) # 立即通知网页端有新用户消息(通过回调) if self.on_message_callback: await self.on_message_callback( action="user_message", conversation_id=conversation.conversation_id, user_message=message_text, room_id=room.room_id, message_id=user_msg.id, sender=sender ) # 通知回调处理 AI 回复(不再在这里直接调用 AI) if self.on_message_callback: await self.on_message_callback( action="generate_ai_response", conversation_id=conversation.conversation_id, room_id=room.room_id ) except Exception as e: logger.error(f"处理Matrix消息失败: {e}") import traceback logger.error(traceback.format_exc()) await self.send_message(room.room_id, f"处理消息时出错: {str(e)}") try: await self.client.room_typing(room.room_id, typing_state=False) except: pass finally: db.close() async def send_message(self, room_id: str, message: str): """发送消息到Matrix房间""" if not self.client: logger.error("Matrix客户端未初始化") return False try: # 先尝试直接发送 await self.client.room_send( room_id, "m.room.message", { "msgtype": "m.text", "body": message } ) logger.info(f"Matrix消息发送成功: {room_id}") return True except Exception as e: error_str = str(e) if "not verified" in error_str or "OlmUnverifiedDeviceError" in error_str: logger.warning(f"设备未验证,尝试验证设备后发送: {e}") try: # 验证所有未验证的设备 for user_id, devices in self.client.device_store.items(): for device_id, device in devices.items(): if not device.verified: logger.info(f"验证设备: {user_id} {device_id}") self.client.verify_device(device) # 再次尝试发送 await self.client.room_send( room_id, "m.room.message", { "msgtype": "m.text", "body": message } ) logger.info(f"Matrix消息发送成功(验证设备后): {room_id}") return True except Exception as e2: logger.error(f"验证设备后发送仍失败: {e2}") # 最后尝试用 HTTP API 发送(不加密) try: import httpx async with httpx.AsyncClient() as http_client: txn_id = f"m{int(asyncio.get_event_loop().time() * 1000)}" resp = await http_client.put( f"{self.homeserver}/_matrix/client/v3/rooms/{room_id}/send/m.room.message/{txn_id}", headers={"Authorization": f"Bearer {self.client.access_token}"}, json={"msgtype": "m.text", "body": message} ) if resp.status_code == 200: logger.info(f"Matrix消息通过HTTP发送成功: {room_id}") return True else: logger.error(f"HTTP发送失败: {resp.status_code}") return False except Exception as e3: logger.error(f"HTTP发送失败: {e3}") return False else: logger.error(f"发送Matrix消息错误: {e}") return False async def disconnect(self): """断开连接""" self.is_running = False if self.client: await self.client.close() # 全局实例 matrix_bot = MatrixBot()