feat: 网页端固定主用户,Matrix /new创建新会话,实时同步

This commit is contained in:
2026-04-11 12:29:11 +08:00
parent b05a03e198
commit b228283133
5 changed files with 206 additions and 128 deletions

Binary file not shown.

View File

@@ -25,8 +25,15 @@ WARNING:nio.clieWARNING:nio.client.async_client:Timed out, sleeping for 60s
WARNING:nio.client.async_client:Timed out, sleeping for 60s
WARNING:nio.client.async_client:Timed out, sleeping for 60s
WARNING:nio.client.async_client:Timed out, sleeping for 60s
async_client:Timed out, sleeping for 25s
WARNING:nio.client.async_client:Timed out, sleeping for 51s
WARNING:nio.client.async_client:Timed out, sleeping for 60s
WARNING:nio.client.async_client:Timed out, sleeping for 60s
WARNING:nio.client.async_client:Timed out, sleeping for 60s
WARNING:nio.client.async_client:Timed out, sleeping for 60s
WARNING:nio.client.async_client:Timed out, sleeping for 60s
WARNING:nio.client.async_client:Timed out, sleeping for 60s
WARNING:nio.client.async_client:Timed out, sleeping for 60s
async_client:Timed out, sleeping for 60s
WARNING:nio.client.async_client:Timed out, sleeping for 60s
WARNING:nio.client.async_client:Timed out, sleeping for 60s
WARNING:nio.client.async_client:Timed out, sleeping for 60s
WARNING:nio.client.async_client:Timed out, sleeping for 60s

141
main.py
View File

@@ -83,15 +83,14 @@ async def admin(request: Request):
# ==================== API路由 ====================
# 固定主用户ID与Matrix AI用户关联
MAIN_USER_ID = "main_user"
@app.get("/api/conversations")
async def get_conversations(user_id: str = None, db: Session = Depends(get_db)):
"""获取会话列表"""
if not user_id:
# 临时用户ID实际应用中应该从session获取
user_id = "web_anonymous"
async def get_conversations(db: Session = Depends(get_db)):
"""获取会话列表(使用固定主用户)"""
conv_service = ConversationService(db)
user = conv_service.get_or_create_user(user_id, user_type='web')
user = conv_service.get_or_create_user(MAIN_USER_ID, display_name="主用户", user_type='web')
conversations = conv_service.get_user_conversations(user.id)
return {
@@ -106,15 +105,30 @@ async def get_conversations(user_id: str = None, db: Session = Depends(get_db)):
]
}
@app.get("/api/conversations/latest")
async def get_latest_conversation(db: Session = Depends(get_db)):
"""获取最新会话用于Matrix同步"""
conv_service = ConversationService(db)
user = conv_service.get_or_create_user(MAIN_USER_ID, display_name="主用户", user_type='web')
conversations = conv_service.get_user_conversations(user.id)
if conversations:
latest = conversations[0] # 已按更新时间倒序
return {
"conversation": {
"id": latest.conversation_id,
"title": latest.title or "新对话",
"updated_at": latest.updated_at.isoformat()
}
}
return {"conversation": None}
@app.post("/api/conversations")
async def create_conversation(user_id: str = None, db: Session = Depends(get_db)):
"""创建新会话"""
if not user_id:
user_id = "web_anonymous"
async def create_conversation(db: Session = Depends(get_db)):
"""创建新会话(使用固定主用户)"""
conv_service = ConversationService(db)
user = conv_service.get_or_create_user(user_id, user_type='web')
user = conv_service.get_or_create_user(MAIN_USER_ID, display_name="主用户", user_type='web')
conversation = conv_service.create_conversation(user.id)
return {
@@ -165,10 +179,12 @@ async def delete_conversation(conversation_id: str, db: Session = Depends(get_db
@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str, db: Session = Depends(get_db)):
"""WebSocket连接 - 实时对话"""
await manager.connect(websocket, user_id)
"""WebSocket连接 - 实时对话(所有连接使用主用户)"""
# 统一使用主用户ID
actual_user_id = MAIN_USER_ID
await manager.connect(websocket, actual_user_id)
conv_service = ConversationService(db)
user = conv_service.get_or_create_user(user_id, user_type='web')
user = conv_service.get_or_create_user(MAIN_USER_ID, display_name="主用户", user_type='web')
current_conversation_id = None
@@ -227,8 +243,8 @@ async def websocket_endpoint(websocket: WebSocket, user_id: str, db: Session = D
source='web'
)
# 通知用户消息已收到
await manager.send_to_user(user_id, {
# 广播用户消息(同步到所有客户端)
await manager.send_to_user(MAIN_USER_ID, {
"type": "user_message",
"conversation_id": conversation_id,
"message": {
@@ -255,14 +271,15 @@ async def websocket_endpoint(websocket: WebSocket, user_id: str, db: Session = D
source='web'
)
# 发送AI回复
await manager.send_to_user(user_id, {
# 广播AI回复(同步到所有客户端)
await manager.send_to_user(MAIN_USER_ID, {
"type": "assistant_message",
"conversation_id": conversation_id,
"message": {
"id": assistant_msg.id,
"role": "assistant",
"content": ai_response,
"source": "web",
"created_at": assistant_msg.created_at.isoformat()
}
})
@@ -397,42 +414,58 @@ async def update_config(data: dict, db: Session = Depends(get_db)):
# ==================== Matrix消息处理回调 ====================
async def handle_matrix_message(conversation_id: str, user_message: str, user_id: str, room_id: str):
async def handle_matrix_message(action: str, conversation_id: str = None, user_message: str = None, room_id: str = None, message_id: int = None):
"""处理从Matrix收到的消息"""
db = SessionLocal()
try:
conv_service = ConversationService(db)
# 获取会话历史
history = conv_service.get_conversation_history(conversation_id, limit=20)
# 调用AI
ai_response = await ai_service.chat(history)
# 保存AI回复
conversation = conv_service.get_conversation(conversation_id)
if conversation:
conv_service.add_message(
conversation_id=conversation.id,
role='assistant',
content=ai_response,
source='matrix'
)
# 发送到Matrix
await matrix_bot.send_message(room_id, ai_response)
# 同时推送到网页端
await manager.send_to_user(user_id, {
"type": "assistant_message",
"conversation_id": conversation_id,
"message": {
"role": "assistant",
"content": ai_response,
"source": "matrix",
"created_at": datetime.utcnow().isoformat()
}
if action == "new_conversation":
# 创建新会话 - 通知WebSocket客户端
await manager.send_to_user(MAIN_USER_ID, {
"type": "new_conversation",
"conversation_id": conversation_id
})
return
if action == "chat":
db = SessionLocal()
try:
conv_service = ConversationService(db)
# 获取会话历史
history = conv_service.get_conversation_history(conversation_id, limit=20)
# 调用AI
ai_response = await ai_service.chat(history)
# 保存AI回复
conversation = conv_service.get_conversation(conversation_id)
if conversation:
assistant_msg = conv_service.add_message(
conversation_id=conversation.id,
role='assistant',
content=ai_response,
source='matrix'
)
# 发送到Matrix
await matrix_bot.send_message(room_id, ai_response)
# 同步到网页端WebSocket
await manager.send_to_user(MAIN_USER_ID, {
"type": "assistant_message",
"conversation_id": conversation_id,
"message": {
"id": assistant_msg.id,
"role": "assistant",
"content": ai_response,
"source": "matrix",
"created_at": assistant_msg.created_at.isoformat()
}
})
except Exception as e:
logger.error(f"处理Matrix消息失败: {e}")
await matrix_bot.send_message(room_id, f"处理消息时出错: {str(e)}")
finally:
db.close()
except Exception as e:
logger.error(f"处理Matrix消息失败: {e}")

View File

@@ -1,17 +1,21 @@
"""
Matrix Bot 服务 - 处理Matrix消息收发
所有消息都同步到主用户的最新会话
"""
import asyncio
import json
from typing import Optional, Callable
from nio import AsyncClient, MatrixRoom, RoomMessageText, LoginResponse, SyncResponse
from nio import AsyncClient, MatrixRoom, RoomMessageText, LoginResponse
import logging
from models import SessionLocal, User, Conversation, Message, MatrixRoomMapping, SystemConfig
from models import SessionLocal, User, Conversation, Message, SystemConfig
from services.conversation_service import ConversationService
logger = logging.getLogger(__name__)
# 主用户ID
MAIN_USER_ID = "main_user"
class MatrixBot:
def __init__(self):
@@ -45,14 +49,12 @@ class MatrixBot:
return False
try:
# 创建客户端
self.client = AsyncClient(
self.homeserver,
self.username,
store_path="/tmp/matrix_store"
)
# 如果有access_token直接设置
if self.access_token:
self.client.access_token = self.access_token
self.client.user_id = self.username
@@ -60,7 +62,6 @@ class MatrixBot:
self.is_running = True
return True
# 否则使用密码登录
response = await self.client.login(self.password)
if isinstance(response, LoginResponse):
@@ -85,14 +86,12 @@ class MatrixBot:
# 注册消息处理器
self.client.add_event_callback(self._handle_room_message, RoomMessageText)
# 首先执行一次同步获取房间信息
try:
sync_response = await self.client.sync(timeout=10000)
logger.info(f"Matrix初始同步完成")
await self.client.sync(timeout=10000)
logger.info("Matrix初始同步完成")
except Exception as e:
logger.warning(f"Matrix初始同步失败: {e}, 将尝试继续")
logger.warning(f"Matrix初始同步失败: {e}")
# 启动后台同步任务(不阻塞)
asyncio.create_task(self._sync_loop())
logger.info("Matrix同步任务已启动")
@@ -100,65 +99,77 @@ class MatrixBot:
"""后台同步循环"""
while self.is_running:
try:
# 使用较短的超时,避免长时间阻塞
await self.client.sync(timeout=5000)
await asyncio.sleep(1) # 每秒同步一次
await asyncio.sleep(1)
except Exception as e:
logger.warning(f"Matrix同步错误: {e}")
await asyncio.sleep(5) # 出错后等待5秒再重试
await asyncio.sleep(5)
async def _handle_room_message(self, room: MatrixRoom, event: RoomMessageText):
"""处理收到的房间消息"""
"""处理Matrix消息 - 所有消息同步到主用户最新会话"""
# 忽略自己发送的消息
if event.sender == self.username:
return
message_text = event.body.strip()
db = SessionLocal()
try:
conv_service = ConversationService(db)
# 获取或创建Matrix用户
matrix_user_id = event.sender
user = conv_service.get_or_create_user(
user_id=matrix_user_id,
display_name=room.users.get(matrix_user_id, {}).get('display_name', matrix_user_id),
user_type='matrix',
matrix_user_id=matrix_user_id
# 使用固定主用户
main_user = conv_service.get_or_create_user(
MAIN_USER_ID,
display_name="主用户",
user_type='web'
)
# 获取或创建房间映射
mapping = db.query(MatrixRoomMapping).filter(
MatrixRoomMapping.room_id == room.room_id
).first()
# 处理 /new 命令 - 创建新会话,不回复
if message_text == "/new":
conversation = conv_service.create_conversation(main_user.id)
await self.send_message(room.room_id, "✅ 已创建新会话")
logger.info(f"Matrix创建新会话: {conversation.conversation_id}")
# 通知WebSocket客户端
if self.on_message_callback:
await self.on_message_callback(
action="new_conversation",
conversation_id=conversation.conversation_id,
room_id=room.room_id
)
return
if not mapping:
# 为这个房间创建新会话
conversation = conv_service.create_conversation(user.id, title=f"Matrix: {room.display_name}")
mapping = MatrixRoomMapping(
room_id=room.room_id,
user_id=user.id,
conversation_id=conversation.id
)
db.add(mapping)
db.commit()
db.refresh(mapping)
# 获取最新会话
conversations = conv_service.get_user_conversations(main_user.id)
if not conversations:
# 如果没有会话,创建一个
conversation = conv_service.create_conversation(main_user.id)
else:
conversation = conversations[0] # 最新会话
# 保存用户消息
conv_service.add_message(
conversation_id=mapping.conversation_id,
user_msg = conv_service.add_message(
conversation_id=conversation.id,
role='user',
content=event.body,
content=message_text,
source='matrix',
extra_data={'event_id': event.event_id, 'room_id': room.room_id}
extra_data={
'event_id': event.event_id,
'room_id': room.room_id,
'sender': event.sender
}
)
# 调用外部消息处理器
logger.info(f"Matrix消息保存到会话 {conversation.conversation_id}")
# 调用消息处理器获取AI回复并同步
if self.on_message_callback:
await self.on_message_callback(
conversation_id=mapping.conversation.conversation_id,
user_message=event.body,
user_id=user.user_id,
room_id=room.room_id
action="chat",
conversation_id=conversation.conversation_id,
user_message=message_text,
room_id=room.room_id,
message_id=user_msg.id
)
finally:
db.close()
@@ -182,28 +193,12 @@ class MatrixBot:
logger.error(f"发送Matrix消息失败: {e}")
return False
async def get_room_id_for_user(self, user_matrix_id: str) -> Optional[str]:
"""获取与用户的对话房间ID"""
db = SessionLocal()
try:
user = db.query(User).filter(User.matrix_user_id == user_matrix_id).first()
if user:
mapping = db.query(MatrixRoomMapping).filter(
MatrixRoomMapping.user_id == user.id
).first()
if mapping:
return mapping.room_id
return None
finally:
db.close()
async def disconnect(self):
"""断开连接"""
if self.client:
await self.client.logout()
await self.client.close()
self.is_running = False
# 全局实例
matrix_bot = MatrixBot()
try:
await self.client.logout()
await self.client.close()
except:
pass
self.is_running = False

View File

@@ -341,20 +341,39 @@
</div>
<script>
// 全局变量
// 全局变量 - 使用固定主用户与Matrix AI用户相同
let ws = null;
let userId = 'web_' + Math.random().toString(36).substr(2, 9);
let userId = 'main_user'; // 固定主用户ID
let currentConversationId = null;
let conversations = [];
// 初始化
document.addEventListener('DOMContentLoaded', () => {
document.getElementById('userIdDisplay').textContent = `用户: ${userId}`;
document.getElementById('userIdDisplay').textContent = '主用户模式';
connectWebSocket();
loadConversations();
setupTextarea();
// 监听最新会话更新
setInterval(checkLatestConversation, 2000);
});
// 检查最新会话用于Matrix同步
async function checkLatestConversation() {
try {
const response = await fetch('/api/conversations/latest');
if (response.ok) {
const data = await response.json();
if (data.conversation && data.conversation.id !== currentConversationId) {
// 有新会话,自动切换
selectConversation(data.conversation.id);
}
}
} catch (error) {
// 忽略错误
}
}
// WebSocket连接
function connectWebSocket() {
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
@@ -391,8 +410,27 @@
loadConversations();
break;
case 'new_conversation':
// Matrix创建了新会话自动切换
currentConversationId = data.conversation_id;
loadConversations();
// 清空消息区域
const container = document.getElementById('messagesContainer');
container.innerHTML = `
<div class="welcome">
<h2>👋 开始新对话</h2>
<p>输入您的问题开始对话</p>
</div>
`;
break;
case 'user_message':
appendMessage('user', data.message.content, data.message.source);
// 如果消息来自Matrix切换到对应会话
if (data.message.source === 'matrix' && data.conversation_id) {
currentConversationId = data.conversation_id;
renderConversations();
}
break;
case 'assistant_message':
@@ -451,10 +489,15 @@
// 加载会话列表
async function loadConversations() {
try {
const response = await fetch('/api/conversations?user_id=' + userId);
const response = await fetch('/api/conversations');
const data = await response.json();
conversations = data.conversations;
renderConversations();
// 如果没有当前会话且有会话列表,自动选择最新的
if (!currentConversationId && conversations.length > 0) {
selectConversation(conversations[0].id);
}
} catch (error) {
console.error('加载会话失败:', error);
}
@@ -496,7 +539,7 @@
// 创建新会话
async function createNewConversation() {
try {
const response = await fetch('/api/conversations?user_id=' + userId, {
const response = await fetch('/api/conversations', {
method: 'POST'
});
const data = await response.json();