fix: Matrix改用HTTP API,修复HTTPS不可用问题,网页端消息同步到Matrix

This commit is contained in:
2026-04-11 12:56:00 +08:00
parent d3236413f3
commit 65297d7321
6 changed files with 149 additions and 95 deletions

View File

@@ -1,117 +1,145 @@
"""
Matrix Bot 服务 - 处理Matrix消息收发
所有消息都同步到主用户的最新会话
Matrix Bot 服务 - 使用HTTP API处理消息收发
"""
import asyncio
import json
from typing import Optional, Callable
from nio import AsyncClient, MatrixRoom, RoomMessageText, LoginResponse
import httpx
import logging
from typing import Optional, Callable
from models import SessionLocal, User, Conversation, Message, SystemConfig
from services.conversation_service import ConversationService
logger = logging.getLogger(__name__)
# 主用户ID
MAIN_USER_ID = "main_user"
class MatrixBot:
def __init__(self):
self.client: Optional[AsyncClient] = None
self.homeserver: str = ""
self.username: str = ""
self.password: str = ""
self.access_token: str = ""
self.user_id: str = ""
self.is_running: bool = False
self.on_message_callback: Optional[Callable] = None
self.sync_token: str = ""
self.client: httpx.AsyncClient = None
self.last_room_id: str = "" # 最后收到消息的房间ID用于网页端同步
async def init_from_config(self):
"""从数据库配置初始化"""
db = SessionLocal()
try:
configs = {c.key: c.value for c in db.query(SystemConfig).all()}
self.homeserver = configs.get('matrix_homeserver', 'https://matrix.tphai.com')
self.username = configs.get('matrix_username', '')
self.password = configs.get('matrix_password', '')
self.homeserver = configs.get('matrix_homeserver', 'http://matrix.tphai.com')
self.access_token = configs.get('matrix_access_token', '')
self.user_id = configs.get('matrix_username', '')
if self.username and (self.password or self.access_token):
await self.connect()
logger.info(f"Matrix配置: homeserver={self.homeserver}, user={self.user_id}")
if self.access_token and self.user_id:
self.client = httpx.AsyncClient(timeout=10.0)
# 验证token
try:
resp = await self.client.get(
f"{self.homeserver}/_matrix/client/v3/account/whoami",
headers={"Authorization": f"Bearer {self.access_token}"}
)
if resp.status_code == 200:
self.is_running = True
logger.info(f"Matrix HTTP API连接成功: {self.user_id}")
# 获取已加入的房间
rooms = await self.get_joined_rooms()
if rooms:
self.last_room_id = rooms[0]
logger.info(f"Matrix房间: {self.last_room_id}")
return True
else:
logger.error(f"Matrix验证失败: status={resp.status_code}, body={resp.text}")
except Exception as e:
logger.error(f"Matrix连接错误: {type(e).__name__}: {str(e)}")
finally:
db.close()
async def connect(self):
"""连接到Matrix服务器"""
if not self.homeserver or not self.username:
logger.warning("Matrix配置不完整跳过连接")
return False
try:
self.client = AsyncClient(
self.homeserver,
self.username,
store_path="/tmp/matrix_store"
)
if self.access_token:
self.client.access_token = self.access_token
self.client.user_id = self.username
logger.info(f"Matrix已设置access_token: {self.username}")
self.is_running = True
return True
response = await self.client.login(self.password)
if isinstance(response, LoginResponse):
logger.info(f"Matrix连接成功: {self.username}")
self.is_running = True
return True
else:
logger.error(f"Matrix登录失败: {response}")
return False
except Exception as e:
logger.error(f"Matrix连接错误: {e}")
return False
return False
async def start_sync(self, message_handler: Callable = None):
"""开始同步消息"""
if not self.client:
logger.warning("Matrix客户端未初始化")
if not self.is_running:
logger.warning("Matrix未连接")
return
self.on_message_callback = message_handler
# 注册消息处理器
self.client.add_event_callback(self._handle_room_message, RoomMessageText)
try:
await self.client.sync(timeout=10000)
logger.info("Matrix初始同步完成")
except Exception as e:
logger.warning(f"Matrix初始同步失败: {e}")
# 启动后台同步任务
asyncio.create_task(self._sync_loop())
logger.info("Matrix同步任务已启动")
logger.info("Matrix HTTP同步任务已启动")
async def _sync_loop(self):
"""后台同步循环"""
while self.is_running:
try:
await self.client.sync(timeout=5000)
await asyncio.sleep(1)
await self.sync_events()
await asyncio.sleep(2) # 每2秒同步一次
except Exception as e:
logger.warning(f"Matrix同步错误: {e}")
await asyncio.sleep(5)
logger.error(f"Matrix同步错误: {e}")
await asyncio.sleep(10)
async def _handle_room_message(self, room: MatrixRoom, event: RoomMessageText):
"""处理Matrix消息 - 所有消息同步到主用户最新会话"""
# 忽略自己发送的消息
if event.sender == self.username:
async def sync_events(self):
"""同步Matrix事件"""
if not self.client:
return
message_text = event.body.strip()
try:
# 使用/sync API获取新事件
params = {"timeout": 5000}
if self.sync_token:
params["since"] = self.sync_token
resp = await self.client.get(
f"{self.homeserver}/_matrix/client/v3/sync",
headers={"Authorization": f"Bearer {self.access_token}"},
params=params
)
if resp.status_code != 200:
logger.error(f"Matrix同步失败: {resp.status_code}")
return
data = resp.json()
self.sync_token = data.get("next_batch", "")
# 处理房间消息
rooms = data.get("rooms", {})
join_rooms = rooms.get("join", {})
for room_id, room_data in join_rooms.items():
events = room_data.get("timeline", {}).get("events", [])
for event in events:
if event.get("type") == "m.room.message":
await self._handle_message(room_id, event)
except Exception as e:
logger.error(f"同步事件失败: {e}")
async def _handle_message(self, room_id: str, event: dict):
"""处理消息"""
# 忽略自己发送的消息
sender = event.get("sender", "")
if sender == self.user_id:
return
content = event.get("content", {})
msgtype = content.get("msgtype", "")
if msgtype != "m.text":
return
message_text = content.get("body", "").strip()
event_id = event.get("event_id", "")
logger.info(f"Matrix收到消息: [{room_id}] {sender}: {message_text}")
# 保存房间ID用于网页端同步
self.last_room_id = room_id
db = SessionLocal()
try:
@@ -124,28 +152,26 @@ class MatrixBot:
user_type='web'
)
# 处理 /new 命令 - 创建新会话,不回复
# 处理 /new 命令
if message_text == "/new":
conversation = conv_service.create_conversation(main_user.id)
await self.send_message(room.room_id, "✅ 已创建新会话")
await self.send_message(room_id, "✅ 已创建新会话")
logger.info(f"Matrix创建新会话: {conversation.conversation_id}")
# 通知WebSocket客户端
if self.on_message_callback:
await self.on_message_callback(
action="new_conversation",
conversation_id=conversation.conversation_id,
room_id=room.room_id
room_id=room_id
)
return
# 获取最新会话
conversations = conv_service.get_user_conversations(main_user.id)
if not conversations:
# 如果没有会话,创建一个
conversation = conv_service.create_conversation(main_user.id)
else:
conversation = conversations[0] # 最新会话
conversation = conversations[0]
# 保存用户消息
user_msg = conv_service.add_message(
@@ -154,21 +180,19 @@ class MatrixBot:
content=message_text,
source='matrix',
extra_data={
'event_id': event.event_id,
'room_id': room.room_id,
'sender': event.sender
'event_id': event_id,
'room_id': room_id,
'sender': sender
}
)
logger.info(f"Matrix消息保存到会话 {conversation.conversation_id}")
# 调用消息处理器获取AI回复并同步
# 调用消息处理器获取AI回复
if self.on_message_callback:
await self.on_message_callback(
action="chat",
conversation_id=conversation.conversation_id,
user_message=message_text,
room_id=room.room_id,
room_id=room_id,
message_id=user_msg.id
)
finally:
@@ -176,32 +200,54 @@ class MatrixBot:
async def send_message(self, room_id: str, message: str):
"""发送消息到Matrix房间"""
if not self.client:
if not self.client or not self.access_token:
logger.error("Matrix客户端未初始化")
return False
try:
await self.client.room_send(
room_id=room_id,
message_type="m.room.message",
content={
# 生成 txn_id
txn_id = f"m{int(asyncio.get_event_loop().time() * 1000)}"
resp = await self.client.put(
f"{self.homeserver}/_matrix/client/v3/rooms/{room_id}/send/m.room.message/{txn_id}",
headers={"Authorization": f"Bearer {self.access_token}"},
json={
"msgtype": "m.text",
"body": message
}
)
return True
if resp.status_code == 200:
logger.info(f"Matrix消息发送成功: {room_id}")
return True
else:
logger.error(f"Matrix发送失败: {resp.status_code} {resp.text}")
return False
except Exception as e:
logger.error(f"发送Matrix消息失败: {e}")
logger.error(f"发送Matrix消息错误: {e}")
return False
async def get_joined_rooms(self):
"""获取已加入的房间列表"""
if not self.client:
return []
try:
resp = await self.client.get(
f"{self.homeserver}/_matrix/client/v3/joined_rooms",
headers={"Authorization": f"Bearer {self.access_token}"}
)
if resp.status_code == 200:
return resp.json().get("joined_rooms", [])
except Exception as e:
logger.error(f"获取房间列表失败: {e}")
return []
async def disconnect(self):
"""断开连接"""
self.is_running = False
if self.client:
try:
await self.client.logout()
await self.client.close()
except:
pass
self.is_running = False
await self.client.aclose()
# 全局实例