""" 方言版AI对话助手 - 后端API """ from flask import Flask, request, jsonify, send_from_directory, send_file from flask_cors import CORS from werkzeug.security import generate_password_hash, check_password_hash import jwt import datetime import json import os import uuid import base64 import requests from pathlib import Path app = Flask(__name__, static_folder='../frontend', static_url_path='') CORS(app) # 配置 SECRET_KEY = 'dialect-chat-secret-key-2026' LLM_BASE_URL = 'http://192.168.2.5:1234/v1' LLM_API_KEY = 'sk-lm-fuP5tGU8:Hi7YU87jHyDP6Ay8Tl2j' LLM_MODEL = 'qwen3.5-4b' # 数据目录 DATA_DIR = Path(__file__).parent.parent / 'data' USERS_FILE = DATA_DIR / 'users.json' CHATS_FILE = DATA_DIR / 'chats.json' UPLOAD_DIR = DATA_DIR / 'uploads' UPLOAD_DIR.mkdir(parents=True, exist_ok=True) # 方言配置 DIALECTS = { 'mandarin': { 'name': '普通话', 'prompt': '请用标准的普通话回复。', 'greeting': '你好!有什么我可以帮助你的吗?' }, 'sichuan': { 'name': '四川话', 'prompt': '请用四川话回复,使用四川方言的特色表达,比如"巴适"、"要得"、"撒"等。', 'greeting': '你好哇!有啥子事要得嘛?' }, 'cantonese': { 'name': '粤语', 'prompt': '请用粤语回复,使用粤语的特色表达,比如"嘅"、"系"、"唔"、"咁"等。', 'greeting': '你好!有乜嘢可以帮你?' }, 'shanghai': { 'name': '上海话', 'prompt': '请用上海话回复,使用上海方言的特色表达。', 'greeting': '侬好!有啥事体伐?' }, 'hakka': { 'name': '客家话', 'prompt': '请用客家话回复,使用客家方言的特色表达。', 'greeting': '你好!有么个可以帮你?' }, 'minnan': { 'name': '闽南话', 'prompt': '请用闽南话回复,使用闽南方言的特色表达。', 'greeting': '你好!有啥米代志?' }, 'northeast': { 'name': '东北话', 'prompt': '请用东北话回复,使用东北方言的特色表达,比如"咋整"、"嘎哈"、"整挺好"等。', 'greeting': '哎呀妈呀,来了!有啥事儿啊?' }, 'henan': { 'name': '河南话', 'prompt': '请用河南话回复,使用河南方言的特色表达,比如"中"、"弄啥嘞"、"可得劲"等。', 'greeting': '中!你有啥事儿说呗!' } } # 初始化数据文件 def init_data(): if not USERS_FILE.exists(): USERS_FILE.write_text(json.dumps({}, ensure_ascii=False)) if not CHATS_FILE.exists(): CHATS_FILE.write_text(json.dumps({}, ensure_ascii=False)) init_data() # 辅助函数 def load_users(): return json.loads(USERS_FILE.read_text(encoding='utf-8')) def save_users(users): USERS_FILE.write_text(json.dumps(users, ensure_ascii=False, indent=2), encoding='utf-8') def load_chats(): return json.loads(CHATS_FILE.read_text(encoding='utf-8')) def save_chats(chats): CHATS_FILE.write_text(json.dumps(chats, ensure_ascii=False, indent=2), encoding='utf-8') def generate_token(user_id): return jwt.encode({ 'user_id': user_id, 'exp': datetime.datetime.utcnow() + datetime.timedelta(days=7) }, SECRET_KEY, algorithm='HS256') def verify_token(token): try: return jwt.decode(token, SECRET_KEY, algorithms=['HS256']) except: return None def get_current_user(): token = request.headers.get('Authorization', '').replace('Bearer ', '') if not token: return None data = verify_token(token) if not data: return None users = load_users() return users.get(data['user_id']) def call_llm(messages, dialect='mandarin'): """调用大模型API""" dialect_info = DIALECTS.get(dialect, DIALECTS['mandarin']) # 添加系统提示 system_message = { 'role': 'system', 'content': f'你是一个友好的AI助手。{dialect_info["prompt"]}' } full_messages = [system_message] + messages[-20:] # 保留最近20条消息 try: response = requests.post( f'{LLM_BASE_URL}/chat/completions', headers={ 'Content-Type': 'application/json', 'Authorization': f'Bearer {LLM_API_KEY}' }, json={ 'model': LLM_MODEL, 'messages': full_messages, 'max_tokens': 1024, 'temperature': 0.8 }, timeout=60 ) if response.status_code == 200: result = response.json() return result['choices'][0]['message']['content'] else: return f'抱歉,我遇到了一些问题,请稍后再试。' except Exception as e: return f'连接出现问题了,请稍后再试试。' # ============ 页面路由 ============ @app.route('/') def index(): return send_file('../frontend/index.html') @app.route('/chat') def chat_page(): return send_file('../frontend/chat.html') # ============ API路由 ============ @app.route('/api/dialects') def api_dialects(): """获取支持的方言列表""" return jsonify({k: {'name': v['name'], 'greeting': v['greeting']} for k, v in DIALECTS.items()}) @app.route('/api/register', methods=['POST']) def api_register(): """用户注册""" data = request.json username = data.get('username', '').strip() password = data.get('password', '') confirm_password = data.get('confirm_password', '') phone = data.get('phone', '').strip() email = data.get('email', '').strip() # 验证 if not username or len(username) < 2: return jsonify({'error': '用户名至少2个字符'}), 400 if not password or len(password) < 6: return jsonify({'error': '密码至少6个字符'}), 400 if password != confirm_password: return jsonify({'error': '两次密码不一致'}), 400 if not phone or len(phone) < 11: return jsonify({'error': '请输入正确的手机号'}), 400 users = load_users() # 检查用户名是否存在 for uid, user in users.items(): if user['username'] == username: return jsonify({'error': '用户名已存在'}), 400 if user['phone'] == phone: return jsonify({'error': '手机号已注册'}), 400 # 创建用户 user_id = str(uuid.uuid4()) users[user_id] = { 'id': user_id, 'username': username, 'password': generate_password_hash(password), 'phone': phone, 'email': email, 'created_at': datetime.datetime.now().isoformat(), 'chats': [] } save_users(users) token = generate_token(user_id) return jsonify({ 'success': True, 'token': token, 'user': { 'id': user_id, 'username': username, 'phone': phone, 'email': email } }) @app.route('/api/login', methods=['POST']) def api_login(): """用户登录""" data = request.json username = data.get('username', '').strip() password = data.get('password', '') if not username or not password: return jsonify({'error': '请输入用户名和密码'}), 400 users = load_users() for user_id, user in users.items(): if user['username'] == username: if check_password_hash(user['password'], password): token = generate_token(user_id) return jsonify({ 'success': True, 'token': token, 'user': { 'id': user_id, 'username': user['username'], 'phone': user['phone'], 'email': user.get('email', '') } }) else: return jsonify({'error': '密码错误'}), 400 return jsonify({'error': '用户不存在'}), 400 @app.route('/api/user') def api_user(): """获取当前用户信息""" user = get_current_user() if not user: return jsonify({'error': '未登录'}), 401 return jsonify({ 'id': user['id'], 'username': user['username'], 'phone': user['phone'], 'email': user.get('email', '') }) @app.route('/api/chats', methods=['GET']) def api_get_chats(): """获取用户的对话列表""" user = get_current_user() if not user: return jsonify({'error': '未登录'}), 401 chats = load_chats() user_chats = [] for chat_id in user.get('chats', []): if chat_id in chats: chat = chats[chat_id] user_chats.append({ 'id': chat_id, 'title': chat['title'], 'dialect': chat['dialect'], 'created_at': chat['created_at'], 'updated_at': chat.get('updated_at', chat['created_at']) }) # 按更新时间倒序 user_chats.sort(key=lambda x: x['updated_at'], reverse=True) return jsonify(user_chats) @app.route('/api/chats', methods=['POST']) def api_create_chat(): """创建新对话""" user = get_current_user() if not user: return jsonify({'error': '未登录'}), 401 data = request.json dialect = data.get('dialect', 'mandarin') chat_id = str(uuid.uuid4()) chats = load_chats() chats[chat_id] = { 'id': chat_id, 'user_id': user['id'], 'title': '新对话', 'dialect': dialect, 'messages': [], 'created_at': datetime.datetime.now().isoformat(), 'updated_at': datetime.datetime.now().isoformat() } save_chats(chats) # 更新用户的对话列表 users = load_users() if chat_id not in users[user['id']]['chats']: users[user['id']]['chats'].append(chat_id) save_users(users) return jsonify({ 'id': chat_id, 'title': '新对话', 'dialect': dialect }) @app.route('/api/chats/', methods=['GET']) def api_get_chat(chat_id): """获取对话详情""" user = get_current_user() if not user: return jsonify({'error': '未登录'}), 401 chats = load_chats() chat = chats.get(chat_id) if not chat or chat['user_id'] != user['id']: return jsonify({'error': '对话不存在'}), 404 return jsonify(chat) @app.route('/api/chats/', methods=['DELETE']) def api_delete_chat(chat_id): """删除对话""" user = get_current_user() if not user: return jsonify({'error': '未登录'}), 401 chats = load_chats() chat = chats.get(chat_id) if not chat or chat['user_id'] != user['id']: return jsonify({'error': '对话不存在'}), 404 del chats[chat_id] save_chats(chats) # 更新用户的对话列表 users = load_users() if chat_id in users[user['id']]['chats']: users[user['id']]['chats'].remove(chat_id) save_users(users) return jsonify({'success': True}) @app.route('/api/chats//send', methods=['POST']) def api_send_message(chat_id): """发送消息""" user = get_current_user() if not user: return jsonify({'error': '未登录'}), 401 data = request.json content = data.get('content', '').strip() if not content: return jsonify({'error': '消息不能为空'}), 400 chats = load_chats() chat = chats.get(chat_id) if not chat or chat['user_id'] != user['id']: return jsonify({'error': '对话不存在'}), 404 # 添加用户消息 user_message = { 'role': 'user', 'content': content, 'time': datetime.datetime.now().isoformat() } chat['messages'].append(user_message) # 调用大模型 messages = [{'role': m['role'], 'content': m['content']} for m in chat['messages']] ai_response = call_llm(messages, chat['dialect']) # 添加AI回复 ai_message = { 'role': 'assistant', 'content': ai_response, 'time': datetime.datetime.now().isoformat() } chat['messages'].append(ai_message) # 更新对话标题(第一条消息的前20字) if len(chat['messages']) <= 2: chat['title'] = content[:20] + ('...' if len(content) > 20 else '') chat['updated_at'] = datetime.datetime.now().isoformat() save_chats(chats) return jsonify({ 'user_message': user_message, 'ai_message': ai_message }) @app.route('/api/upload', methods=['POST']) def api_upload(): """上传文件""" user = get_current_user() if not user: return jsonify({'error': '未登录'}), 401 if 'file' not in request.files: return jsonify({'error': '没有上传文件'}), 400 file = request.files['file'] if file.filename == '': return jsonify({'error': '没有选择文件'}), 400 # 保存文件 file_id = str(uuid.uuid4()) filename = f"{file_id}_{file.filename}" filepath = UPLOAD_DIR / filename file.save(filepath) # 如果是图片,返回base64预览 is_image = file.content_type.startswith('image/') preview = None if is_image: preview = f"/api/uploads/{filename}" return jsonify({ 'success': True, 'file_id': file_id, 'filename': file.filename, 'is_image': is_image, 'preview': preview }) @app.route('/api/uploads/') def api_get_upload(filename): """获取上传的文件""" return send_file(UPLOAD_DIR / filename) if __name__ == '__main__': print("=" * 50) print("方言版AI对话助手") print("=" * 50) print(f"访问地址: http://localhost:19002") print("=" * 50) app.run(host='0.0.0.0', port=19002, debug=True)