Files
ai-chat-system/services/matrix_service_v2.py

370 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Matrix Bot 服务 v2.0 - 使用Agent架构
支持渠道独立Agent绑定
"""
import asyncio
import logging
import os
from typing import Optional, Callable
from nio import AsyncClient, RoomMessageText, MegolmEvent
from models_v2 import SessionLocal, User, Conversation, Message, SystemConfig, Agent, Channel, ChannelAgentMapping
from services.conversation_service import ConversationService
from services.agent_service import AgentService, ChannelService
from services.llm_service import llm_service
logger = logging.getLogger(__name__)
MAIN_USER_ID = "main_user"
STORE_PATH = "/home/xian/.openclaw/workspace-coder/works/ai-chat/matrix_store"
class MatrixBotV2:
"""Matrix Bot v2.0 - 支持Agent架构"""
def __init__(self):
self.client: Optional[AsyncClient] = None
self.homeserver: str = ""
self.user_id: str = ""
self.password: str = ""
self.is_running: bool = False
self.on_message_callback: Optional[Callable] = None
self.last_room_id: str = ""
self.processed_events: set = set()
self.channel_id: Optional[int] = None # Matrix渠道ID
async def init_from_config(self):
"""从渠道配置初始化"""
db = SessionLocal()
try:
# 获取Matrix渠道配置
channel_service = ChannelService(db)
channel = channel_service.get_channel_by_type('matrix')
if not channel or not channel.is_active:
logger.warning("Matrix渠道未配置或未启用")
return False
self.channel_id = channel.id
# 从渠道配置获取Matrix参数
config = channel.config or {}
self.homeserver = config.get('homeserver', 'http://matrix.tphai.com')
self.user_id = config.get('username', '@tester:matrix.tphai.com')
self.password = config.get('password', 'tester12345@!')
logger.info(f"Matrix配置: homeserver={self.homeserver}, user={self.user_id}")
if self.user_id and self.password:
os.makedirs(STORE_PATH, exist_ok=True)
self.client = AsyncClient(
self.homeserver,
self.user_id,
store_path=STORE_PATH
)
self.is_running = True
logger.info(f"Matrix nio 客户端已初始化")
return True
finally:
db.close()
return False
async def start_sync(self, message_handler: Callable = None):
"""开始同步消息"""
if not self.is_running or not self.client:
logger.warning("Matrix未连接")
return
self.on_message_callback = message_handler
try:
login_resp = await self.client.login(self.password)
logger.info(f"Matrix登录成功: {login_resp}")
sync_resp = await self.client.sync(full_state=True)
logger.info(f"初始同步完成: next_batch={sync_resp.next_batch}")
self.client.add_event_callback(self._handle_nio_message, RoomMessageText)
self.client.add_event_callback(self._handle_encrypted_message, MegolmEvent)
logger.info("Matrix消息回调已注册")
asyncio.create_task(self._run_sync_forever())
logger.info("Matrix nio sync_forever 任务已启动")
except Exception as e:
logger.error(f"Matrix启动失败: {e}")
import traceback
logger.error(traceback.format_exc())
async def _run_sync_forever(self):
"""运行 sync_forever"""
try:
logger.info("开始 Matrix sync_forever...")
await self.client.sync_forever(timeout=30000)
except Exception as e:
logger.error(f"sync_forever 错误: {e}")
self.is_running = False
async def _handle_encrypted_message(self, room, event):
"""处理加密消息"""
logger.info(f"收到加密消息: [{room.room_id}] event_id={event.event_id}")
try:
body = None
sender = event.sender
if event.decrypted:
try:
decrypted_event = event.parse_decrypted_event(event.decrypted)
if decrypted_event and hasattr(decrypted_event, 'body'):
body = decrypted_event.body
sender = decrypted_event.sender if hasattr(decrypted_event, 'sender') else event.sender
except Exception as e:
logger.warning(f"parse_decrypted_event 失败: {e}")
if not body and hasattr(event, 'source') and event.source:
content = event.source.get('content', {})
if 'body' in content:
body = content.get('body', '')
sender = event.source.get('sender', event.sender)
if not body and hasattr(event, 'body') and event.body:
body = event.body
if body:
logger.info(f"解密成功: sender={sender}, body={body[:50]}")
await self._process_message(room, sender, body, event.event_id)
else:
logger.warning(f"加密消息无法解密: event_id={event.event_id}")
except Exception as e:
logger.error(f"处理加密消息失败: {e}")
async def _handle_nio_message(self, room, event):
"""处理普通文本消息"""
sender = event.sender
if sender == self.user_id:
return
message_text = event.body.strip()
logger.info(f"Matrix收到文本消息: [{room.room_id}] {sender}: {message_text}")
await self._process_message(room, sender, message_text, event.event_id)
async def _process_message(self, room, sender, message_text, event_id):
"""处理消息核心逻辑"""
if event_id in self.processed_events:
return
self.processed_events.add(event_id)
if len(self.processed_events) > 100:
self.processed_events = set(list(self.processed_events)[-50:])
if sender == self.user_id:
return
self.last_room_id = room.room_id
db = SessionLocal()
try:
conv_service = ConversationService(db)
agent_service = AgentService(db)
channel_service = ChannelService(db)
# 获取或创建主用户
main_user = conv_service.get_or_create_user(
MAIN_USER_ID,
display_name="主用户",
user_type='web'
)
# 获取Matrix渠道绑定的Agent
agent = agent_service.get_agent_for_channel(self.channel_id)
if not agent:
agent = agent_service.get_default_agent()
agent_config = agent_service.get_agent_config(agent.id) if agent else None
# 处理命令
if message_text == "/new":
conversation = conv_service.create_conversation(
main_user.id,
channel_id=self.channel_id
)
await self.send_message(room.room_id, "✅ 已创建新会话")
logger.info(f"Matrix创建新会话: {conversation.conversation_id}")
if self.on_message_callback:
await self.on_message_callback(
action="new_conversation",
conversation_id=conversation.conversation_id,
room_id=room.room_id
)
return
# 处理Agent切换命令
if message_text.startswith("/agent "):
agent_name = message_text[7:].strip()
new_agent = agent_service.get_agent_by_name(agent_name)
if new_agent and new_agent.is_active:
await self.send_message(room.room_id, f"✅ 已切换到Agent: {new_agent.display_name}")
# TODO: 更新会话Agent
else:
await self.send_message(room.room_id, f"❌ Agent '{agent_name}' 不存在或未启用")
return
# 处理思考开关命令
enable_thinking = True
if message_text == "/thinking off":
enable_thinking = False
await self.send_message(room.room_id, "✅ 思考功能已关闭")
return
elif message_text == "/thinking on":
enable_thinking = True
await self.send_message(room.room_id, "✅ 思考功能已开启")
return
# 获取最新会话
conversations = conv_service.get_user_conversations(main_user.id)
if not conversations:
conversation = conv_service.create_conversation(main_user.id, channel_id=self.channel_id)
else:
conversation = conversations[0]
# 保存用户消息
user_msg = conv_service.add_message(
conversation_id=conversation.id,
role='user',
content=message_text,
source='matrix',
extra_data={
'event_id': event_id,
'room_id': room.room_id,
'sender': sender
}
)
# 通知网页端
if self.on_message_callback:
await self.on_message_callback(
action="user_message",
conversation_id=conversation.conversation_id,
user_message=message_text,
room_id=room.room_id,
message_id=user_msg.id,
sender=sender
)
# 调用AI生成回复
if agent_config and agent_config.get('provider'):
try:
await self.client.room_typing(room.room_id, typing_state=True)
history = conv_service.get_conversation_history(
conversation.conversation_id,
limit=agent_config['agent'].get('max_history', 20)
)
# 调用LLM
response, thinking_content = await llm_service.chat(
messages=history,
provider_config=agent_config['provider'],
agent_config=agent_config['agent'],
enable_thinking=enable_thinking and agent_config['agent'].get('enable_thinking', True)
)
# 保存AI回复
assistant_msg = conv_service.add_message(
conversation_id=conversation.id,
role='assistant',
content=response,
source='matrix',
thinking_content=thinking_content,
agent_id=agent.id,
model_used=agent_config['provider'].get('default_model')
)
# 发送回复到Matrix
await self.send_message(room.room_id, response)
await self.client.room_typing(room.room_id, typing_state=False)
# 同步到网页端
if self.on_message_callback:
await self.on_message_callback(
action="assistant_message",
conversation_id=conversation.conversation_id,
room_id=room.room_id,
message_id=assistant_msg.id,
content=response,
thinking_content=thinking_content,
agent_id=agent.id,
agent_name=agent_config['agent'].get('display_name')
)
logger.info(f"Matrix AI回复已发送: {response[:50]}")
except Exception as e:
logger.error(f"AI调用失败: {e}")
await self.send_message(room.room_id, f"处理消息时出错: {str(e)}")
await self.client.room_typing(room.room_id, typing_state=False)
else:
await self.send_message(room.room_id, "❌ Agent配置不完整无法生成回复")
except Exception as e:
logger.error(f"处理Matrix消息失败: {e}")
import traceback
logger.error(traceback.format_exc())
await self.send_message(room.room_id, f"处理消息时出错: {str(e)}")
finally:
db.close()
async def send_message(self, room_id: str, message: str):
"""发送消息到Matrix房间"""
if not self.client:
logger.error("Matrix客户端未初始化")
return False
try:
await self.client.room_send(
room_id,
"m.room.message",
{"msgtype": "m.text", "body": message}
)
logger.info(f"Matrix消息发送成功: {room_id}")
return True
except Exception as e:
error_str = str(e)
if "not verified" in error_str or "OlmUnverifiedDeviceError" in error_str:
logger.warning(f"设备未验证: {e}")
try:
for user_id, devices in self.client.device_store.items():
for device_id, device in devices.items():
if not device.verified:
logger.info(f"验证设备: {user_id} {device_id}")
self.client.verify_device(device)
await self.client.room_send(
room_id,
"m.room.message",
{"msgtype": "m.text", "body": message}
)
return True
except Exception as e2:
logger.error(f"验证设备后发送仍失败: {e2}")
return False
else:
logger.error(f"发送Matrix消息错误: {e}")
return False
async def disconnect(self):
"""断开连接"""
self.is_running = False
if self.client:
await self.client.close()
# 全局实例
matrix_bot = MatrixBotV2()