diff --git a/ai_chat.db b/ai_chat.db index bcf8637..3b7bd7a 100644 Binary files a/ai_chat.db and b/ai_chat.db differ diff --git a/services/__pycache__/matrix_service.cpython-310.pyc b/services/__pycache__/matrix_service.cpython-310.pyc index b69dd76..faece90 100644 Binary files a/services/__pycache__/matrix_service.cpython-310.pyc and b/services/__pycache__/matrix_service.cpython-310.pyc differ diff --git a/services/matrix_service.py b/services/matrix_service.py index ebafddf..67403e1 100644 --- a/services/matrix_service.py +++ b/services/matrix_service.py @@ -3,8 +3,9 @@ Matrix Bot 服务 - 使用 nio 库处理消息收发(支持加密) """ import asyncio import logging +import os from typing import Optional, Callable -from nio import AsyncClient, RoomMessageText, RoomMessageEmote +from nio import AsyncClient, RoomMessageText, MegolmEvent from models import SessionLocal, User, Conversation, Message, SystemConfig from services.conversation_service import ConversationService @@ -13,6 +14,7 @@ from services import ai_service logger = logging.getLogger(__name__) MAIN_USER_ID = "main_user" +STORE_PATH = "/home/xian/.openclaw/workspace-coder/works/ai-chat/matrix_store" class MatrixBot: @@ -37,9 +39,17 @@ class MatrixBot: logger.info(f"Matrix配置: homeserver={self.homeserver}, user={self.user_id}") if self.user_id and self.password: - self.client = AsyncClient(self.homeserver, self.user_id) + # 创建存储目录 + os.makedirs(STORE_PATH, exist_ok=True) + + # 使用加密存储 + self.client = AsyncClient( + self.homeserver, + self.user_id, + store_path=STORE_PATH + ) self.is_running = True - logger.info("Matrix nio 客户端已初始化") + logger.info(f"Matrix nio 客户端已初始化,存储路径: {STORE_PATH}") return True finally: db.close() @@ -53,44 +63,57 @@ class MatrixBot: self.on_message_callback = message_handler - # 注册消息处理器 - self.client.add_event_callback(self._handle_nio_message, RoomMessageText) - # 登录 try: login_resp = await self.client.login(self.password) logger.info(f"Matrix登录成功: {login_resp}") - # 开始同步(后台任务) - asyncio.create_task(self._sync_loop()) - logger.info("Matrix nio 同步任务已启动") + # 加载加密存储 + logger.info("加载加密存储...") + + # 第一次同步获取房间密钥 + sync_resp = await self.client.sync(full_state=True) + logger.info(f"初始同步完成: next_batch={sync_resp.next_batch}") + + # 注册消息处理器 - 普通文本消息 + self.client.add_event_callback(self._handle_nio_message, RoomMessageText) + # 注册加密消息处理器(解密后) + self.client.add_event_callback(self._handle_encrypted_message, MegolmEvent) + + logger.info("Matrix消息回调已注册") + + # 使用 sync_forever 自动处理事件(后台任务) + asyncio.create_task(self._run_sync_forever()) + logger.info("Matrix nio sync_forever 任务已启动") except Exception as e: - logger.error(f"Matrix登录失败: {e}") + logger.error(f"Matrix启动失败: {e}") + import traceback + logger.error(traceback.format_exc()) - async def _sync_loop(self): - """后台同步循环""" - while self.is_running: - try: - # 同步一次 - sync_resp = await self.client.sync(timeout=30000) - if sync_resp: - logger.debug(f"Matrix同步完成: next_batch={sync_resp.next_batch}") - - # 处理已加入的房间 - for room_id, room in self.client.rooms.items(): - self.last_room_id = room_id - logger.debug(f"房间: {room_id}, 名称: {room.display_name}") - - await asyncio.sleep(1) - except Exception as e: - logger.error(f"Matrix同步错误: {e}") - await asyncio.sleep(10) + async def _run_sync_forever(self): + """运行 sync_forever,自动处理所有事件""" + try: + logger.info("开始 Matrix sync_forever...") + await self.client.sync_forever(timeout=30000, full_state=True) + except Exception as e: + logger.error(f"sync_forever 错误: {e}") + self.is_running = False + + async def _handle_encrypted_message(self, room, event): + """处理加密消息(MegolmEvent)""" + logger.info(f"收到加密消息: [{room.room_id}] event_id={event.event_id}") + + # MegolmEvent 需要解密,nio 会自动解密并转换为普通消息事件 + # 解密后会触发 RoomMessageText 回调,所以这里只是记录 + # 如果解密失败,检查是否有密钥 + pass async def _handle_nio_message(self, room, event): """处理 nio 收到的消息""" # 忽略自己发送的消息 sender = event.sender if sender == self.user_id: + logger.debug(f"忽略自己发送的消息: {sender}") return message_text = event.body.strip() @@ -146,7 +169,10 @@ class MatrixBot: ) # 发送"正在输入"状态 - await self.client.room_typing(room.room_id, typing_state=True) + try: + await self.client.room_typing(room.room_id, typing_state=True) + except Exception as e: + logger.warning(f"发送输入状态失败: {e}") # 获取AI回复 messages = conv_service.get_messages(conversation.id) @@ -164,7 +190,10 @@ class MatrixBot: await self.send_message(room.room_id, ai_response) # 关闭"正在输入"状态 - await self.client.room_typing(room.room_id, typing_state=False) + try: + await self.client.room_typing(room.room_id, typing_state=False) + except Exception as e: + logger.warning(f"关闭输入状态失败: {e}") # 调用回调(用于网页同步) if self.on_message_callback: @@ -180,8 +209,13 @@ class MatrixBot: except Exception as e: logger.error(f"处理Matrix消息失败: {e}") + import traceback + logger.error(traceback.format_exc()) await self.send_message(room.room_id, f"处理消息时出错: {str(e)}") - await self.client.room_typing(room.room_id, typing_state=False) + try: + await self.client.room_typing(room.room_id, typing_state=False) + except: + pass finally: db.close() @@ -204,6 +238,8 @@ class MatrixBot: return True except Exception as e: logger.error(f"发送Matrix消息错误: {e}") + import traceback + logger.error(traceback.format_exc()) return False async def disconnect(self):