Files
ai-chat-system/services/matrix_service.py

347 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Matrix Bot 服务 - 使用 nio 库处理消息收发(支持加密)
"""
import asyncio
import logging
import os
from typing import Optional, Callable
from nio import AsyncClient, RoomMessageText, MegolmEvent
from models import SessionLocal, User, Conversation, Message, SystemConfig
from services.conversation_service import ConversationService
from services import ai_service
logger = logging.getLogger(__name__)
MAIN_USER_ID = "main_user"
STORE_PATH = "/home/xian/.openclaw/workspace-coder/works/ai-chat/matrix_store"
class MatrixBot:
def __init__(self):
self.client: Optional[AsyncClient] = None
self.homeserver: str = ""
self.user_id: str = ""
self.password: str = ""
self.is_running: bool = False
self.on_message_callback: Optional[Callable] = None
self.last_room_id: str = ""
self.processed_events: set = set() # 已处理的事件ID防止重复处理
async def init_from_config(self):
"""从数据库配置初始化"""
db = SessionLocal()
try:
configs = {c.key: c.value for c in db.query(SystemConfig).all()}
self.homeserver = configs.get('matrix_homeserver', 'http://matrix.tphai.com')
self.user_id = configs.get('matrix_username', '@tester:matrix.tphai.com')
self.password = configs.get('matrix_password', 'tester12345@!')
logger.info(f"Matrix配置: homeserver={self.homeserver}, user={self.user_id}")
if self.user_id and self.password:
# 创建存储目录
os.makedirs(STORE_PATH, exist_ok=True)
# 使用加密存储
self.client = AsyncClient(
self.homeserver,
self.user_id,
store_path=STORE_PATH
)
self.is_running = True
logger.info(f"Matrix nio 客户端已初始化,存储路径: {STORE_PATH}")
return True
finally:
db.close()
return False
async def start_sync(self, message_handler: Callable = None):
"""开始同步消息"""
if not self.is_running or not self.client:
logger.warning("Matrix未连接")
return
self.on_message_callback = message_handler
# 登录
try:
login_resp = await self.client.login(self.password)
logger.info(f"Matrix登录成功: {login_resp}")
# 加载加密存储
logger.info("加载加密存储...")
# 第一次同步获取房间密钥
sync_resp = await self.client.sync(full_state=True)
logger.info(f"初始同步完成: next_batch={sync_resp.next_batch}")
# 注册消息处理器 - 普通文本消息
self.client.add_event_callback(self._handle_nio_message, RoomMessageText)
# 注册加密消息处理器(解密后)
self.client.add_event_callback(self._handle_encrypted_message, MegolmEvent)
logger.info("Matrix消息回调已注册")
# 使用 sync_forever 自动处理事件(后台任务)
asyncio.create_task(self._run_sync_forever())
logger.info("Matrix nio sync_forever 任务已启动")
except Exception as e:
logger.error(f"Matrix启动失败: {e}")
import traceback
logger.error(traceback.format_exc())
async def _run_sync_forever(self):
"""运行 sync_forever自动处理所有事件"""
try:
logger.info("开始 Matrix sync_forever...")
# 不使用 full_state只做增量同步
await self.client.sync_forever(timeout=30000)
except Exception as e:
logger.error(f"sync_forever 错误: {e}")
import traceback
logger.error(traceback.format_exc())
self.is_running = False
async def _handle_encrypted_message(self, room, event):
"""处理加密消息MegolmEvent"""
logger.info(f"收到加密消息: [{room.room_id}] event_id={event.event_id}")
# 检查是否已解密
logger.info(f"event.decrypted: {event.decrypted}")
try:
body = None
sender = event.sender
# 方法1: 检查 decrypted 属性
if event.decrypted:
logger.info(f"消息已解密: {event.decrypted}")
# 使用 parse_decrypted_event 获取解密后的完整事件
try:
decrypted_event = event.parse_decrypted_event(event.decrypted)
if decrypted_event:
logger.info(f"解密事件类型: {type(decrypted_event)}")
if hasattr(decrypted_event, 'body'):
body = decrypted_event.body
sender = decrypted_event.sender if hasattr(decrypted_event, 'sender') else event.sender
logger.info(f"从 decrypted 获取: {body}")
except Exception as e:
logger.warning(f"parse_decrypted_event 失败: {e}")
# 方法2: 从 source 获取(如果是自己发的消息)
if not body and hasattr(event, 'source') and event.source:
content = event.source.get('content', {})
# 检查是否有 plaintext body
if 'body' in content:
body = content.get('body', '')
sender = event.source.get('sender', event.sender)
logger.info(f"从source.content获取: {body[:50] if body else 'empty'}")
# 方法3: 直接尝试 event.body (某些情况下可能有)
if not body and hasattr(event, 'body') and event.body:
body = event.body
logger.info(f"从event.body获取: {body[:50]}")
if body:
logger.info(f"解密成功: sender={sender}, body={body[:50]}")
await self._process_message(room, sender, body, event.event_id)
else:
logger.warning(f"加密消息无法解密: event_id={event.event_id}")
logger.warning(f"event.decrypted={event.decrypted}, 需要密钥")
except Exception as e:
logger.error(f"处理加密消息失败: {e}")
import traceback
logger.error(traceback.format_exc())
async def _handle_nio_message(self, room, event):
"""处理 nio 收到的消息(普通文本)"""
# 忽略自己发送的消息
sender = event.sender
if sender == self.user_id:
logger.debug(f"忽略自己发送的消息: {sender}")
return
message_text = event.body.strip()
logger.info(f"Matrix收到文本消息: [{room.room_id}] {sender}: {message_text}")
# 调用统一处理方法
await self._process_message(room, sender, message_text, event.event_id)
async def _process_message(self, room, sender, message_text, event_id):
"""处理消息的核心逻辑"""
# 检查是否已处理过此事件(防止重复处理)
if event_id in self.processed_events:
logger.debug(f"忽略已处理的事件: {event_id}")
return
# 标记为已处理
self.processed_events.add(event_id)
logger.info(f"处理消息: event_id={event_id}")
# 限制集合大小保留最近的100个
if len(self.processed_events) > 100:
# 清理旧的一半
self.processed_events = set(list(self.processed_events)[-50:])
# 忽略自己发送的消息
if sender == self.user_id:
logger.debug(f"忽略自己发送的消息: {sender}")
return
# 保存房间ID
self.last_room_id = room.room_id
db = SessionLocal()
try:
conv_service = ConversationService(db)
# 使用固定主用户
main_user = conv_service.get_or_create_user(
MAIN_USER_ID,
display_name="主用户",
user_type='web'
)
# 处理 /new 命令
if message_text == "/new":
conversation = conv_service.create_conversation(main_user.id)
await self.send_message(room.room_id, "✅ 已创建新会话")
logger.info(f"Matrix创建新会话: {conversation.conversation_id}")
if self.on_message_callback:
await self.on_message_callback(
action="new_conversation",
conversation_id=conversation.conversation_id,
room_id=room.room_id
)
return
# 获取最新会话
conversations = conv_service.get_user_conversations(main_user.id)
if not conversations:
conversation = conv_service.create_conversation(main_user.id)
else:
conversation = conversations[0]
# 保存用户消息
user_msg = conv_service.add_message(
conversation_id=conversation.id,
role='user',
content=message_text,
source='matrix',
extra_data={
'event_id': event_id,
'room_id': room.room_id,
'sender': sender
}
)
# 立即通知网页端有新用户消息(通过回调)
if self.on_message_callback:
await self.on_message_callback(
action="user_message",
conversation_id=conversation.conversation_id,
user_message=message_text,
room_id=room.room_id,
message_id=user_msg.id,
sender=sender
)
# 通知回调处理 AI 回复(不再在这里直接调用 AI
if self.on_message_callback:
await self.on_message_callback(
action="generate_ai_response",
conversation_id=conversation.conversation_id,
room_id=room.room_id
)
except Exception as e:
logger.error(f"处理Matrix消息失败: {e}")
import traceback
logger.error(traceback.format_exc())
await self.send_message(room.room_id, f"处理消息时出错: {str(e)}")
try:
await self.client.room_typing(room.room_id, typing_state=False)
except:
pass
finally:
db.close()
async def send_message(self, room_id: str, message: str):
"""发送消息到Matrix房间"""
if not self.client:
logger.error("Matrix客户端未初始化")
return False
try:
# 先尝试直接发送
await self.client.room_send(
room_id,
"m.room.message",
{
"msgtype": "m.text",
"body": message
}
)
logger.info(f"Matrix消息发送成功: {room_id}")
return True
except Exception as e:
error_str = str(e)
if "not verified" in error_str or "OlmUnverifiedDeviceError" in error_str:
logger.warning(f"设备未验证,尝试验证设备后发送: {e}")
try:
# 验证所有未验证的设备
for user_id, devices in self.client.device_store.items():
for device_id, device in devices.items():
if not device.verified:
logger.info(f"验证设备: {user_id} {device_id}")
self.client.verify_device(device)
# 再次尝试发送
await self.client.room_send(
room_id,
"m.room.message",
{
"msgtype": "m.text",
"body": message
}
)
logger.info(f"Matrix消息发送成功验证设备后: {room_id}")
return True
except Exception as e2:
logger.error(f"验证设备后发送仍失败: {e2}")
# 最后尝试用 HTTP API 发送(不加密)
try:
import httpx
async with httpx.AsyncClient() as http_client:
txn_id = f"m{int(asyncio.get_event_loop().time() * 1000)}"
resp = await http_client.put(
f"{self.homeserver}/_matrix/client/v3/rooms/{room_id}/send/m.room.message/{txn_id}",
headers={"Authorization": f"Bearer {self.client.access_token}"},
json={"msgtype": "m.text", "body": message}
)
if resp.status_code == 200:
logger.info(f"Matrix消息通过HTTP发送成功: {room_id}")
return True
else:
logger.error(f"HTTP发送失败: {resp.status_code}")
return False
except Exception as e3:
logger.error(f"HTTP发送失败: {e3}")
return False
else:
logger.error(f"发送Matrix消息错误: {e}")
return False
async def disconnect(self):
"""断开连接"""
self.is_running = False
if self.client:
await self.client.close()
# 全局实例
matrix_bot = MatrixBot()