diff --git a/ai_chat.db b/ai_chat.db index eed130e..bcf8637 100644 Binary files a/ai_chat.db and b/ai_chat.db differ diff --git a/services/__pycache__/matrix_service.cpython-310.pyc b/services/__pycache__/matrix_service.cpython-310.pyc index 23a5729..b69dd76 100644 Binary files a/services/__pycache__/matrix_service.cpython-310.pyc and b/services/__pycache__/matrix_service.cpython-310.pyc differ diff --git a/services/matrix_service.py b/services/matrix_service.py index 5920778..ebafddf 100644 --- a/services/matrix_service.py +++ b/services/matrix_service.py @@ -1,13 +1,14 @@ """ -Matrix Bot 服务 - 使用HTTP API处理消息收发 +Matrix Bot 服务 - 使用 nio 库处理消息收发(支持加密) """ import asyncio -import httpx import logging from typing import Optional, Callable +from nio import AsyncClient, RoomMessageText, RoomMessageEmote from models import SessionLocal, User, Conversation, Message, SystemConfig from services.conversation_service import ConversationService +from services import ai_service logger = logging.getLogger(__name__) @@ -16,14 +17,13 @@ MAIN_USER_ID = "main_user" class MatrixBot: def __init__(self): + self.client: Optional[AsyncClient] = None self.homeserver: str = "" - self.access_token: str = "" self.user_id: str = "" + self.password: str = "" self.is_running: bool = False self.on_message_callback: Optional[Callable] = None - self.sync_token: str = "" - self.client: httpx.AsyncClient = None - self.last_room_id: str = "" # 最后收到消息的房间ID(用于网页端同步) + self.last_room_id: str = "" async def init_from_config(self): """从数据库配置初始化""" @@ -31,115 +31,74 @@ class MatrixBot: try: configs = {c.key: c.value for c in db.query(SystemConfig).all()} self.homeserver = configs.get('matrix_homeserver', 'http://matrix.tphai.com') - self.access_token = configs.get('matrix_access_token', '') - self.user_id = configs.get('matrix_username', '') + self.user_id = configs.get('matrix_username', '@tester:matrix.tphai.com') + self.password = configs.get('matrix_password', 'tester12345@!') logger.info(f"Matrix配置: homeserver={self.homeserver}, user={self.user_id}") - if self.access_token and self.user_id: - self.client = httpx.AsyncClient(timeout=10.0) - # 验证token - try: - resp = await self.client.get( - f"{self.homeserver}/_matrix/client/v3/account/whoami", - headers={"Authorization": f"Bearer {self.access_token}"} - ) - if resp.status_code == 200: - self.is_running = True - logger.info(f"Matrix HTTP API连接成功: {self.user_id}") - # 获取已加入的房间 - rooms = await self.get_joined_rooms() - if rooms: - self.last_room_id = rooms[0] - logger.info(f"Matrix房间: {self.last_room_id}") - return True - else: - logger.error(f"Matrix验证失败: status={resp.status_code}, body={resp.text}") - except Exception as e: - logger.error(f"Matrix连接错误: {type(e).__name__}: {str(e)}") + if self.user_id and self.password: + self.client = AsyncClient(self.homeserver, self.user_id) + self.is_running = True + logger.info("Matrix nio 客户端已初始化") + return True finally: db.close() return False async def start_sync(self, message_handler: Callable = None): """开始同步消息""" - if not self.is_running: + if not self.is_running or not self.client: logger.warning("Matrix未连接") return self.on_message_callback = message_handler - # 启动后台同步任务 - asyncio.create_task(self._sync_loop()) - logger.info("Matrix HTTP同步任务已启动") + # 注册消息处理器 + self.client.add_event_callback(self._handle_nio_message, RoomMessageText) + + # 登录 + try: + login_resp = await self.client.login(self.password) + logger.info(f"Matrix登录成功: {login_resp}") + + # 开始同步(后台任务) + asyncio.create_task(self._sync_loop()) + logger.info("Matrix nio 同步任务已启动") + except Exception as e: + logger.error(f"Matrix登录失败: {e}") async def _sync_loop(self): """后台同步循环""" while self.is_running: try: - await self.sync_events() - await asyncio.sleep(2) # 每2秒同步一次 + # 同步一次 + sync_resp = await self.client.sync(timeout=30000) + if sync_resp: + logger.debug(f"Matrix同步完成: next_batch={sync_resp.next_batch}") + + # 处理已加入的房间 + for room_id, room in self.client.rooms.items(): + self.last_room_id = room_id + logger.debug(f"房间: {room_id}, 名称: {room.display_name}") + + await asyncio.sleep(1) except Exception as e: logger.error(f"Matrix同步错误: {e}") await asyncio.sleep(10) - async def sync_events(self): - """同步Matrix事件""" - if not self.client: - return - - try: - # 使用/sync API获取新事件 - params = {"timeout": 5000} - if self.sync_token: - params["since"] = self.sync_token - - resp = await self.client.get( - f"{self.homeserver}/_matrix/client/v3/sync", - headers={"Authorization": f"Bearer {self.access_token}"}, - params=params - ) - - if resp.status_code != 200: - logger.error(f"Matrix同步失败: {resp.status_code}") - return - - data = resp.json() - self.sync_token = data.get("next_batch", "") - - # 处理房间消息 - rooms = data.get("rooms", {}) - join_rooms = rooms.get("join", {}) - - for room_id, room_data in join_rooms.items(): - events = room_data.get("timeline", {}).get("events", []) - for event in events: - if event.get("type") == "m.room.message": - await self._handle_message(room_id, event) - - except Exception as e: - logger.error(f"同步事件失败: {e}") - - async def _handle_message(self, room_id: str, event: dict): - """处理消息""" + async def _handle_nio_message(self, room, event): + """处理 nio 收到的消息""" # 忽略自己发送的消息 - sender = event.get("sender", "") + sender = event.sender if sender == self.user_id: return - content = event.get("content", {}) - msgtype = content.get("msgtype", "") + message_text = event.body.strip() - if msgtype != "m.text": - return + logger.info(f"Matrix收到消息: [{room.room_id}] {sender}: {message_text}") - message_text = content.get("body", "").strip() - event_id = event.get("event_id", "") - - logger.info(f"Matrix收到消息: [{room_id}] {sender}: {message_text}") - - # 保存房间ID(用于网页端同步) - self.last_room_id = room_id + # 保存房间ID + self.last_room_id = room.room_id db = SessionLocal() try: @@ -147,22 +106,22 @@ class MatrixBot: # 使用固定主用户 main_user = conv_service.get_or_create_user( - MAIN_USER_ID, - display_name="主用户", + MAIN_USER_ID, + display_name="主用户", user_type='web' ) # 处理 /new 命令 if message_text == "/new": conversation = conv_service.create_conversation(main_user.id) - await self.send_message(room_id, "✅ 已创建新会话") + await self.send_message(room.room_id, "✅ 已创建新会话") logger.info(f"Matrix创建新会话: {conversation.conversation_id}") if self.on_message_callback: await self.on_message_callback( action="new_conversation", conversation_id=conversation.conversation_id, - room_id=room_id + room_id=room.room_id ) return @@ -180,74 +139,78 @@ class MatrixBot: content=message_text, source='matrix', extra_data={ - 'event_id': event_id, - 'room_id': room_id, + 'event_id': event.event_id, + 'room_id': room.room_id, 'sender': sender } ) - # 调用消息处理器获取AI回复 + # 发送"正在输入"状态 + await self.client.room_typing(room.room_id, typing_state=True) + + # 获取AI回复 + messages = conv_service.get_messages(conversation.id) + ai_response = await ai_service.chat(messages) + + # 保存AI回复 + conv_service.add_message( + conversation_id=conversation.id, + role='assistant', + content=ai_response, + source='matrix' + ) + + # 发送回复 + await self.send_message(room.room_id, ai_response) + + # 关闭"正在输入"状态 + await self.client.room_typing(room.room_id, typing_state=False) + + # 调用回调(用于网页同步) if self.on_message_callback: await self.on_message_callback( action="chat", conversation_id=conversation.conversation_id, user_message=message_text, - room_id=room_id, + room_id=room.room_id, message_id=user_msg.id ) + + logger.info(f"Matrix回复已发送: {ai_response[:50]}") + + except Exception as e: + logger.error(f"处理Matrix消息失败: {e}") + await self.send_message(room.room_id, f"处理消息时出错: {str(e)}") + await self.client.room_typing(room.room_id, typing_state=False) finally: db.close() async def send_message(self, room_id: str, message: str): """发送消息到Matrix房间""" - if not self.client or not self.access_token: + if not self.client: logger.error("Matrix客户端未初始化") return False try: - # 生成 txn_id - txn_id = f"m{int(asyncio.get_event_loop().time() * 1000)}" - - resp = await self.client.put( - f"{self.homeserver}/_matrix/client/v3/rooms/{room_id}/send/m.room.message/{txn_id}", - headers={"Authorization": f"Bearer {self.access_token}"}, - json={ + await self.client.room_send( + room_id, + "m.room.message", + { "msgtype": "m.text", "body": message } ) - - if resp.status_code == 200: - logger.info(f"Matrix消息发送成功: {room_id}") - return True - else: - logger.error(f"Matrix发送失败: {resp.status_code} {resp.text}") - return False + logger.info(f"Matrix消息发送成功: {room_id}") + return True except Exception as e: logger.error(f"发送Matrix消息错误: {e}") return False - async def get_joined_rooms(self): - """获取已加入的房间列表""" - if not self.client: - return [] - - try: - resp = await self.client.get( - f"{self.homeserver}/_matrix/client/v3/joined_rooms", - headers={"Authorization": f"Bearer {self.access_token}"} - ) - if resp.status_code == 200: - return resp.json().get("joined_rooms", []) - except Exception as e: - logger.error(f"获取房间列表失败: {e}") - return [] - async def disconnect(self): """断开连接""" self.is_running = False if self.client: - await self.client.aclose() + await self.client.close() # 全局实例