feat: Matrix改用nio库支持加密消息

- 使用 matrix-nio 库替代 HTTP API
- 支持解密加密消息(MegolmEvent)
- 添加 matrix_password 配置项
- 发送'正在输入'状态提示
This commit is contained in:
2026-04-11 21:39:04 +08:00
parent 65297d7321
commit 0d25cdc344
3 changed files with 89 additions and 126 deletions

Binary file not shown.

View File

@@ -1,13 +1,14 @@
"""
Matrix Bot 服务 - 使用HTTP API处理消息收发
Matrix Bot 服务 - 使用 nio 库处理消息收发(支持加密)
"""
import asyncio
import httpx
import logging
from typing import Optional, Callable
from nio import AsyncClient, RoomMessageText, RoomMessageEmote
from models import SessionLocal, User, Conversation, Message, SystemConfig
from services.conversation_service import ConversationService
from services import ai_service
logger = logging.getLogger(__name__)
@@ -16,14 +17,13 @@ MAIN_USER_ID = "main_user"
class MatrixBot:
def __init__(self):
self.client: Optional[AsyncClient] = None
self.homeserver: str = ""
self.access_token: str = ""
self.user_id: str = ""
self.password: str = ""
self.is_running: bool = False
self.on_message_callback: Optional[Callable] = None
self.sync_token: str = ""
self.client: httpx.AsyncClient = None
self.last_room_id: str = "" # 最后收到消息的房间ID用于网页端同步
self.last_room_id: str = ""
async def init_from_config(self):
"""从数据库配置初始化"""
@@ -31,115 +31,74 @@ class MatrixBot:
try:
configs = {c.key: c.value for c in db.query(SystemConfig).all()}
self.homeserver = configs.get('matrix_homeserver', 'http://matrix.tphai.com')
self.access_token = configs.get('matrix_access_token', '')
self.user_id = configs.get('matrix_username', '')
self.user_id = configs.get('matrix_username', '@tester:matrix.tphai.com')
self.password = configs.get('matrix_password', 'tester12345@!')
logger.info(f"Matrix配置: homeserver={self.homeserver}, user={self.user_id}")
if self.access_token and self.user_id:
self.client = httpx.AsyncClient(timeout=10.0)
# 验证token
try:
resp = await self.client.get(
f"{self.homeserver}/_matrix/client/v3/account/whoami",
headers={"Authorization": f"Bearer {self.access_token}"}
)
if resp.status_code == 200:
if self.user_id and self.password:
self.client = AsyncClient(self.homeserver, self.user_id)
self.is_running = True
logger.info(f"Matrix HTTP API连接成功: {self.user_id}")
# 获取已加入的房间
rooms = await self.get_joined_rooms()
if rooms:
self.last_room_id = rooms[0]
logger.info(f"Matrix房间: {self.last_room_id}")
logger.info("Matrix nio 客户端已初始化")
return True
else:
logger.error(f"Matrix验证失败: status={resp.status_code}, body={resp.text}")
except Exception as e:
logger.error(f"Matrix连接错误: {type(e).__name__}: {str(e)}")
finally:
db.close()
return False
async def start_sync(self, message_handler: Callable = None):
"""开始同步消息"""
if not self.is_running:
if not self.is_running or not self.client:
logger.warning("Matrix未连接")
return
self.on_message_callback = message_handler
# 启动后台同步任务
# 注册消息处理器
self.client.add_event_callback(self._handle_nio_message, RoomMessageText)
# 登录
try:
login_resp = await self.client.login(self.password)
logger.info(f"Matrix登录成功: {login_resp}")
# 开始同步(后台任务)
asyncio.create_task(self._sync_loop())
logger.info("Matrix HTTP同步任务已启动")
logger.info("Matrix nio 同步任务已启动")
except Exception as e:
logger.error(f"Matrix登录失败: {e}")
async def _sync_loop(self):
"""后台同步循环"""
while self.is_running:
try:
await self.sync_events()
await asyncio.sleep(2) # 每2秒同步一次
# 同步一次
sync_resp = await self.client.sync(timeout=30000)
if sync_resp:
logger.debug(f"Matrix同步完成: next_batch={sync_resp.next_batch}")
# 处理已加入的房间
for room_id, room in self.client.rooms.items():
self.last_room_id = room_id
logger.debug(f"房间: {room_id}, 名称: {room.display_name}")
await asyncio.sleep(1)
except Exception as e:
logger.error(f"Matrix同步错误: {e}")
await asyncio.sleep(10)
async def sync_events(self):
"""同步Matrix事件"""
if not self.client:
return
try:
# 使用/sync API获取新事件
params = {"timeout": 5000}
if self.sync_token:
params["since"] = self.sync_token
resp = await self.client.get(
f"{self.homeserver}/_matrix/client/v3/sync",
headers={"Authorization": f"Bearer {self.access_token}"},
params=params
)
if resp.status_code != 200:
logger.error(f"Matrix同步失败: {resp.status_code}")
return
data = resp.json()
self.sync_token = data.get("next_batch", "")
# 处理房间消息
rooms = data.get("rooms", {})
join_rooms = rooms.get("join", {})
for room_id, room_data in join_rooms.items():
events = room_data.get("timeline", {}).get("events", [])
for event in events:
if event.get("type") == "m.room.message":
await self._handle_message(room_id, event)
except Exception as e:
logger.error(f"同步事件失败: {e}")
async def _handle_message(self, room_id: str, event: dict):
"""处理消息"""
async def _handle_nio_message(self, room, event):
"""处理 nio 收到的消息"""
# 忽略自己发送的消息
sender = event.get("sender", "")
sender = event.sender
if sender == self.user_id:
return
content = event.get("content", {})
msgtype = content.get("msgtype", "")
message_text = event.body.strip()
if msgtype != "m.text":
return
logger.info(f"Matrix收到消息: [{room.room_id}] {sender}: {message_text}")
message_text = content.get("body", "").strip()
event_id = event.get("event_id", "")
logger.info(f"Matrix收到消息: [{room_id}] {sender}: {message_text}")
# 保存房间ID用于网页端同步
self.last_room_id = room_id
# 保存房间ID
self.last_room_id = room.room_id
db = SessionLocal()
try:
@@ -155,14 +114,14 @@ class MatrixBot:
# 处理 /new 命令
if message_text == "/new":
conversation = conv_service.create_conversation(main_user.id)
await self.send_message(room_id, "✅ 已创建新会话")
await self.send_message(room.room_id, "✅ 已创建新会话")
logger.info(f"Matrix创建新会话: {conversation.conversation_id}")
if self.on_message_callback:
await self.on_message_callback(
action="new_conversation",
conversation_id=conversation.conversation_id,
room_id=room_id
room_id=room.room_id
)
return
@@ -180,74 +139,78 @@ class MatrixBot:
content=message_text,
source='matrix',
extra_data={
'event_id': event_id,
'room_id': room_id,
'event_id': event.event_id,
'room_id': room.room_id,
'sender': sender
}
)
# 调用消息处理器获取AI回复
# 发送"正在输入"状态
await self.client.room_typing(room.room_id, typing_state=True)
# 获取AI回复
messages = conv_service.get_messages(conversation.id)
ai_response = await ai_service.chat(messages)
# 保存AI回复
conv_service.add_message(
conversation_id=conversation.id,
role='assistant',
content=ai_response,
source='matrix'
)
# 发送回复
await self.send_message(room.room_id, ai_response)
# 关闭"正在输入"状态
await self.client.room_typing(room.room_id, typing_state=False)
# 调用回调(用于网页同步)
if self.on_message_callback:
await self.on_message_callback(
action="chat",
conversation_id=conversation.conversation_id,
user_message=message_text,
room_id=room_id,
room_id=room.room_id,
message_id=user_msg.id
)
logger.info(f"Matrix回复已发送: {ai_response[:50]}")
except Exception as e:
logger.error(f"处理Matrix消息失败: {e}")
await self.send_message(room.room_id, f"处理消息时出错: {str(e)}")
await self.client.room_typing(room.room_id, typing_state=False)
finally:
db.close()
async def send_message(self, room_id: str, message: str):
"""发送消息到Matrix房间"""
if not self.client or not self.access_token:
if not self.client:
logger.error("Matrix客户端未初始化")
return False
try:
# 生成 txn_id
txn_id = f"m{int(asyncio.get_event_loop().time() * 1000)}"
resp = await self.client.put(
f"{self.homeserver}/_matrix/client/v3/rooms/{room_id}/send/m.room.message/{txn_id}",
headers={"Authorization": f"Bearer {self.access_token}"},
json={
await self.client.room_send(
room_id,
"m.room.message",
{
"msgtype": "m.text",
"body": message
}
)
if resp.status_code == 200:
logger.info(f"Matrix消息发送成功: {room_id}")
return True
else:
logger.error(f"Matrix发送失败: {resp.status_code} {resp.text}")
return False
except Exception as e:
logger.error(f"发送Matrix消息错误: {e}")
return False
async def get_joined_rooms(self):
"""获取已加入的房间列表"""
if not self.client:
return []
try:
resp = await self.client.get(
f"{self.homeserver}/_matrix/client/v3/joined_rooms",
headers={"Authorization": f"Bearer {self.access_token}"}
)
if resp.status_code == 200:
return resp.json().get("joined_rooms", [])
except Exception as e:
logger.error(f"获取房间列表失败: {e}")
return []
async def disconnect(self):
"""断开连接"""
self.is_running = False
if self.client:
await self.client.aclose()
await self.client.close()
# 全局实例