347 lines
14 KiB
Python
347 lines
14 KiB
Python
"""
|
||
Matrix Bot 服务 - 使用 nio 库处理消息收发(支持加密)
|
||
"""
|
||
import asyncio
|
||
import logging
|
||
import os
|
||
from typing import Optional, Callable
|
||
from nio import AsyncClient, RoomMessageText, MegolmEvent
|
||
|
||
from models import SessionLocal, User, Conversation, Message, SystemConfig
|
||
from services.conversation_service import ConversationService
|
||
from services import ai_service
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
MAIN_USER_ID = "main_user"
|
||
STORE_PATH = "/home/xian/.openclaw/workspace-coder/works/ai-chat/matrix_store"
|
||
|
||
|
||
class MatrixBot:
|
||
def __init__(self):
|
||
self.client: Optional[AsyncClient] = None
|
||
self.homeserver: str = ""
|
||
self.user_id: str = ""
|
||
self.password: str = ""
|
||
self.is_running: bool = False
|
||
self.on_message_callback: Optional[Callable] = None
|
||
self.last_room_id: str = ""
|
||
self.processed_events: set = set() # 已处理的事件ID,防止重复处理
|
||
|
||
async def init_from_config(self):
|
||
"""从数据库配置初始化"""
|
||
db = SessionLocal()
|
||
try:
|
||
configs = {c.key: c.value for c in db.query(SystemConfig).all()}
|
||
self.homeserver = configs.get('matrix_homeserver', 'http://matrix.tphai.com')
|
||
self.user_id = configs.get('matrix_username', '@tester:matrix.tphai.com')
|
||
self.password = configs.get('matrix_password', 'tester12345@!')
|
||
|
||
logger.info(f"Matrix配置: homeserver={self.homeserver}, user={self.user_id}")
|
||
|
||
if self.user_id and self.password:
|
||
# 创建存储目录
|
||
os.makedirs(STORE_PATH, exist_ok=True)
|
||
|
||
# 使用加密存储
|
||
self.client = AsyncClient(
|
||
self.homeserver,
|
||
self.user_id,
|
||
store_path=STORE_PATH
|
||
)
|
||
self.is_running = True
|
||
logger.info(f"Matrix nio 客户端已初始化,存储路径: {STORE_PATH}")
|
||
return True
|
||
finally:
|
||
db.close()
|
||
return False
|
||
|
||
async def start_sync(self, message_handler: Callable = None):
|
||
"""开始同步消息"""
|
||
if not self.is_running or not self.client:
|
||
logger.warning("Matrix未连接")
|
||
return
|
||
|
||
self.on_message_callback = message_handler
|
||
|
||
# 登录
|
||
try:
|
||
login_resp = await self.client.login(self.password)
|
||
logger.info(f"Matrix登录成功: {login_resp}")
|
||
|
||
# 加载加密存储
|
||
logger.info("加载加密存储...")
|
||
|
||
# 第一次同步获取房间密钥
|
||
sync_resp = await self.client.sync(full_state=True)
|
||
logger.info(f"初始同步完成: next_batch={sync_resp.next_batch}")
|
||
|
||
# 注册消息处理器 - 普通文本消息
|
||
self.client.add_event_callback(self._handle_nio_message, RoomMessageText)
|
||
# 注册加密消息处理器(解密后)
|
||
self.client.add_event_callback(self._handle_encrypted_message, MegolmEvent)
|
||
|
||
logger.info("Matrix消息回调已注册")
|
||
|
||
# 使用 sync_forever 自动处理事件(后台任务)
|
||
asyncio.create_task(self._run_sync_forever())
|
||
logger.info("Matrix nio sync_forever 任务已启动")
|
||
except Exception as e:
|
||
logger.error(f"Matrix启动失败: {e}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
|
||
async def _run_sync_forever(self):
|
||
"""运行 sync_forever,自动处理所有事件"""
|
||
try:
|
||
logger.info("开始 Matrix sync_forever...")
|
||
# 不使用 full_state,只做增量同步
|
||
await self.client.sync_forever(timeout=30000)
|
||
except Exception as e:
|
||
logger.error(f"sync_forever 错误: {e}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
self.is_running = False
|
||
|
||
async def _handle_encrypted_message(self, room, event):
|
||
"""处理加密消息(MegolmEvent)"""
|
||
logger.info(f"收到加密消息: [{room.room_id}] event_id={event.event_id}")
|
||
|
||
# 检查是否已解密
|
||
logger.info(f"event.decrypted: {event.decrypted}")
|
||
|
||
try:
|
||
body = None
|
||
sender = event.sender
|
||
|
||
# 方法1: 检查 decrypted 属性
|
||
if event.decrypted:
|
||
logger.info(f"消息已解密: {event.decrypted}")
|
||
# 使用 parse_decrypted_event 获取解密后的完整事件
|
||
try:
|
||
decrypted_event = event.parse_decrypted_event(event.decrypted)
|
||
if decrypted_event:
|
||
logger.info(f"解密事件类型: {type(decrypted_event)}")
|
||
if hasattr(decrypted_event, 'body'):
|
||
body = decrypted_event.body
|
||
sender = decrypted_event.sender if hasattr(decrypted_event, 'sender') else event.sender
|
||
logger.info(f"从 decrypted 获取: {body}")
|
||
except Exception as e:
|
||
logger.warning(f"parse_decrypted_event 失败: {e}")
|
||
|
||
# 方法2: 从 source 获取(如果是自己发的消息)
|
||
if not body and hasattr(event, 'source') and event.source:
|
||
content = event.source.get('content', {})
|
||
# 检查是否有 plaintext body
|
||
if 'body' in content:
|
||
body = content.get('body', '')
|
||
sender = event.source.get('sender', event.sender)
|
||
logger.info(f"从source.content获取: {body[:50] if body else 'empty'}")
|
||
|
||
# 方法3: 直接尝试 event.body (某些情况下可能有)
|
||
if not body and hasattr(event, 'body') and event.body:
|
||
body = event.body
|
||
logger.info(f"从event.body获取: {body[:50]}")
|
||
|
||
if body:
|
||
logger.info(f"解密成功: sender={sender}, body={body[:50]}")
|
||
await self._process_message(room, sender, body, event.event_id)
|
||
else:
|
||
logger.warning(f"加密消息无法解密: event_id={event.event_id}")
|
||
logger.warning(f"event.decrypted={event.decrypted}, 需要密钥")
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理加密消息失败: {e}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
|
||
async def _handle_nio_message(self, room, event):
|
||
"""处理 nio 收到的消息(普通文本)"""
|
||
# 忽略自己发送的消息
|
||
sender = event.sender
|
||
if sender == self.user_id:
|
||
logger.debug(f"忽略自己发送的消息: {sender}")
|
||
return
|
||
|
||
message_text = event.body.strip()
|
||
|
||
logger.info(f"Matrix收到文本消息: [{room.room_id}] {sender}: {message_text}")
|
||
|
||
# 调用统一处理方法
|
||
await self._process_message(room, sender, message_text, event.event_id)
|
||
|
||
async def _process_message(self, room, sender, message_text, event_id):
|
||
"""处理消息的核心逻辑"""
|
||
# 检查是否已处理过此事件(防止重复处理)
|
||
if event_id in self.processed_events:
|
||
logger.debug(f"忽略已处理的事件: {event_id}")
|
||
return
|
||
|
||
# 标记为已处理
|
||
self.processed_events.add(event_id)
|
||
logger.info(f"处理消息: event_id={event_id}")
|
||
|
||
# 限制集合大小,保留最近的100个
|
||
if len(self.processed_events) > 100:
|
||
# 清理旧的一半
|
||
self.processed_events = set(list(self.processed_events)[-50:])
|
||
|
||
# 忽略自己发送的消息
|
||
if sender == self.user_id:
|
||
logger.debug(f"忽略自己发送的消息: {sender}")
|
||
return
|
||
|
||
# 保存房间ID
|
||
self.last_room_id = room.room_id
|
||
|
||
db = SessionLocal()
|
||
try:
|
||
conv_service = ConversationService(db)
|
||
|
||
# 使用固定主用户
|
||
main_user = conv_service.get_or_create_user(
|
||
MAIN_USER_ID,
|
||
display_name="主用户",
|
||
user_type='web'
|
||
)
|
||
|
||
# 处理 /new 命令
|
||
if message_text == "/new":
|
||
conversation = conv_service.create_conversation(main_user.id)
|
||
await self.send_message(room.room_id, "✅ 已创建新会话")
|
||
logger.info(f"Matrix创建新会话: {conversation.conversation_id}")
|
||
|
||
if self.on_message_callback:
|
||
await self.on_message_callback(
|
||
action="new_conversation",
|
||
conversation_id=conversation.conversation_id,
|
||
room_id=room.room_id
|
||
)
|
||
return
|
||
|
||
# 获取最新会话
|
||
conversations = conv_service.get_user_conversations(main_user.id)
|
||
if not conversations:
|
||
conversation = conv_service.create_conversation(main_user.id)
|
||
else:
|
||
conversation = conversations[0]
|
||
|
||
# 保存用户消息
|
||
user_msg = conv_service.add_message(
|
||
conversation_id=conversation.id,
|
||
role='user',
|
||
content=message_text,
|
||
source='matrix',
|
||
extra_data={
|
||
'event_id': event_id,
|
||
'room_id': room.room_id,
|
||
'sender': sender
|
||
}
|
||
)
|
||
|
||
# 立即通知网页端有新用户消息(通过回调)
|
||
if self.on_message_callback:
|
||
await self.on_message_callback(
|
||
action="user_message",
|
||
conversation_id=conversation.conversation_id,
|
||
user_message=message_text,
|
||
room_id=room.room_id,
|
||
message_id=user_msg.id,
|
||
sender=sender
|
||
)
|
||
|
||
# 通知回调处理 AI 回复(不再在这里直接调用 AI)
|
||
if self.on_message_callback:
|
||
await self.on_message_callback(
|
||
action="generate_ai_response",
|
||
conversation_id=conversation.conversation_id,
|
||
room_id=room.room_id
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理Matrix消息失败: {e}")
|
||
import traceback
|
||
logger.error(traceback.format_exc())
|
||
await self.send_message(room.room_id, f"处理消息时出错: {str(e)}")
|
||
try:
|
||
await self.client.room_typing(room.room_id, typing_state=False)
|
||
except:
|
||
pass
|
||
finally:
|
||
db.close()
|
||
|
||
async def send_message(self, room_id: str, message: str):
|
||
"""发送消息到Matrix房间"""
|
||
if not self.client:
|
||
logger.error("Matrix客户端未初始化")
|
||
return False
|
||
|
||
try:
|
||
# 先尝试直接发送
|
||
await self.client.room_send(
|
||
room_id,
|
||
"m.room.message",
|
||
{
|
||
"msgtype": "m.text",
|
||
"body": message
|
||
}
|
||
)
|
||
logger.info(f"Matrix消息发送成功: {room_id}")
|
||
return True
|
||
except Exception as e:
|
||
error_str = str(e)
|
||
if "not verified" in error_str or "OlmUnverifiedDeviceError" in error_str:
|
||
logger.warning(f"设备未验证,尝试验证设备后发送: {e}")
|
||
try:
|
||
# 验证所有未验证的设备
|
||
for user_id, devices in self.client.device_store.items():
|
||
for device_id, device in devices.items():
|
||
if not device.verified:
|
||
logger.info(f"验证设备: {user_id} {device_id}")
|
||
self.client.verify_device(device)
|
||
|
||
# 再次尝试发送
|
||
await self.client.room_send(
|
||
room_id,
|
||
"m.room.message",
|
||
{
|
||
"msgtype": "m.text",
|
||
"body": message
|
||
}
|
||
)
|
||
logger.info(f"Matrix消息发送成功(验证设备后): {room_id}")
|
||
return True
|
||
except Exception as e2:
|
||
logger.error(f"验证设备后发送仍失败: {e2}")
|
||
# 最后尝试用 HTTP API 发送(不加密)
|
||
try:
|
||
import httpx
|
||
async with httpx.AsyncClient() as http_client:
|
||
txn_id = f"m{int(asyncio.get_event_loop().time() * 1000)}"
|
||
resp = await http_client.put(
|
||
f"{self.homeserver}/_matrix/client/v3/rooms/{room_id}/send/m.room.message/{txn_id}",
|
||
headers={"Authorization": f"Bearer {self.client.access_token}"},
|
||
json={"msgtype": "m.text", "body": message}
|
||
)
|
||
if resp.status_code == 200:
|
||
logger.info(f"Matrix消息通过HTTP发送成功: {room_id}")
|
||
return True
|
||
else:
|
||
logger.error(f"HTTP发送失败: {resp.status_code}")
|
||
return False
|
||
except Exception as e3:
|
||
logger.error(f"HTTP发送失败: {e3}")
|
||
return False
|
||
else:
|
||
logger.error(f"发送Matrix消息错误: {e}")
|
||
return False
|
||
|
||
async def disconnect(self):
|
||
"""断开连接"""
|
||
self.is_running = False
|
||
if self.client:
|
||
await self.client.close()
|
||
|
||
|
||
# 全局实例
|
||
matrix_bot = MatrixBot() |