功能特点: - 支持8种方言(普通话、四川话、粤语、上海话、客家话、闽南话、东北话、河南话) - 用户注册登录系统(用户名、手机、邮箱可选、密码确认) - 对话功能(文字输入、语音识别、文件图片上传) - 多对话管理(新建、切换、删除) - 移动端适配(响应式设计、触摸友好) 技术栈: - Flask后端API - 原生HTML/CSS/JS前端 - Web Speech API语音识别 - JWT用户认证 - 大模型: qwen3.5-4b 端口: 19002
475 lines
14 KiB
Python
475 lines
14 KiB
Python
"""
|
||
方言版AI对话助手 - 后端API
|
||
"""
|
||
|
||
from flask import Flask, request, jsonify, send_from_directory, send_file
|
||
from flask_cors import CORS
|
||
from werkzeug.security import generate_password_hash, check_password_hash
|
||
import jwt
|
||
import datetime
|
||
import json
|
||
import os
|
||
import uuid
|
||
import base64
|
||
import requests
|
||
from pathlib import Path
|
||
|
||
app = Flask(__name__, static_folder='../frontend', static_url_path='')
|
||
CORS(app)
|
||
|
||
# 配置
|
||
SECRET_KEY = 'dialect-chat-secret-key-2026'
|
||
LLM_BASE_URL = 'http://192.168.2.5:1234/v1'
|
||
LLM_API_KEY = 'sk-lm-fuP5tGU8:Hi7YU87jHyDP6Ay8Tl2j'
|
||
LLM_MODEL = 'qwen3.5-4b'
|
||
|
||
# 数据目录
|
||
DATA_DIR = Path(__file__).parent.parent / 'data'
|
||
USERS_FILE = DATA_DIR / 'users.json'
|
||
CHATS_FILE = DATA_DIR / 'chats.json'
|
||
UPLOAD_DIR = DATA_DIR / 'uploads'
|
||
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
|
||
|
||
# 方言配置
|
||
DIALECTS = {
|
||
'mandarin': {
|
||
'name': '普通话',
|
||
'prompt': '请用标准的普通话回复。',
|
||
'greeting': '你好!有什么我可以帮助你的吗?'
|
||
},
|
||
'sichuan': {
|
||
'name': '四川话',
|
||
'prompt': '请用四川话回复,使用四川方言的特色表达,比如"巴适"、"要得"、"撒"等。',
|
||
'greeting': '你好哇!有啥子事要得嘛?'
|
||
},
|
||
'cantonese': {
|
||
'name': '粤语',
|
||
'prompt': '请用粤语回复,使用粤语的特色表达,比如"嘅"、"系"、"唔"、"咁"等。',
|
||
'greeting': '你好!有乜嘢可以帮你?'
|
||
},
|
||
'shanghai': {
|
||
'name': '上海话',
|
||
'prompt': '请用上海话回复,使用上海方言的特色表达。',
|
||
'greeting': '侬好!有啥事体伐?'
|
||
},
|
||
'hakka': {
|
||
'name': '客家话',
|
||
'prompt': '请用客家话回复,使用客家方言的特色表达。',
|
||
'greeting': '你好!有么个可以帮你?'
|
||
},
|
||
'minnan': {
|
||
'name': '闽南话',
|
||
'prompt': '请用闽南话回复,使用闽南方言的特色表达。',
|
||
'greeting': '你好!有啥米代志?'
|
||
},
|
||
'northeast': {
|
||
'name': '东北话',
|
||
'prompt': '请用东北话回复,使用东北方言的特色表达,比如"咋整"、"嘎哈"、"整挺好"等。',
|
||
'greeting': '哎呀妈呀,来了!有啥事儿啊?'
|
||
},
|
||
'henan': {
|
||
'name': '河南话',
|
||
'prompt': '请用河南话回复,使用河南方言的特色表达,比如"中"、"弄啥嘞"、"可得劲"等。',
|
||
'greeting': '中!你有啥事儿说呗!'
|
||
}
|
||
}
|
||
|
||
# 初始化数据文件
|
||
def init_data():
|
||
if not USERS_FILE.exists():
|
||
USERS_FILE.write_text(json.dumps({}, ensure_ascii=False))
|
||
if not CHATS_FILE.exists():
|
||
CHATS_FILE.write_text(json.dumps({}, ensure_ascii=False))
|
||
|
||
init_data()
|
||
|
||
# 辅助函数
|
||
def load_users():
|
||
return json.loads(USERS_FILE.read_text(encoding='utf-8'))
|
||
|
||
def save_users(users):
|
||
USERS_FILE.write_text(json.dumps(users, ensure_ascii=False, indent=2), encoding='utf-8')
|
||
|
||
def load_chats():
|
||
return json.loads(CHATS_FILE.read_text(encoding='utf-8'))
|
||
|
||
def save_chats(chats):
|
||
CHATS_FILE.write_text(json.dumps(chats, ensure_ascii=False, indent=2), encoding='utf-8')
|
||
|
||
def generate_token(user_id):
|
||
return jwt.encode({
|
||
'user_id': user_id,
|
||
'exp': datetime.datetime.utcnow() + datetime.timedelta(days=7)
|
||
}, SECRET_KEY, algorithm='HS256')
|
||
|
||
def verify_token(token):
|
||
try:
|
||
return jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
|
||
except:
|
||
return None
|
||
|
||
def get_current_user():
|
||
token = request.headers.get('Authorization', '').replace('Bearer ', '')
|
||
if not token:
|
||
return None
|
||
data = verify_token(token)
|
||
if not data:
|
||
return None
|
||
users = load_users()
|
||
return users.get(data['user_id'])
|
||
|
||
def call_llm(messages, dialect='mandarin'):
|
||
"""调用大模型API"""
|
||
dialect_info = DIALECTS.get(dialect, DIALECTS['mandarin'])
|
||
|
||
# 添加系统提示
|
||
system_message = {
|
||
'role': 'system',
|
||
'content': f'你是一个友好的AI助手。{dialect_info["prompt"]}'
|
||
}
|
||
|
||
full_messages = [system_message] + messages[-20:] # 保留最近20条消息
|
||
|
||
try:
|
||
response = requests.post(
|
||
f'{LLM_BASE_URL}/chat/completions',
|
||
headers={
|
||
'Content-Type': 'application/json',
|
||
'Authorization': f'Bearer {LLM_API_KEY}'
|
||
},
|
||
json={
|
||
'model': LLM_MODEL,
|
||
'messages': full_messages,
|
||
'max_tokens': 1024,
|
||
'temperature': 0.8
|
||
},
|
||
timeout=60
|
||
)
|
||
|
||
if response.status_code == 200:
|
||
result = response.json()
|
||
return result['choices'][0]['message']['content']
|
||
else:
|
||
return f'抱歉,我遇到了一些问题,请稍后再试。'
|
||
except Exception as e:
|
||
return f'连接出现问题了,请稍后再试试。'
|
||
|
||
# ============ 页面路由 ============
|
||
|
||
@app.route('/')
|
||
def index():
|
||
return send_file('../frontend/index.html')
|
||
|
||
@app.route('/chat')
|
||
def chat_page():
|
||
return send_file('../frontend/chat.html')
|
||
|
||
# ============ API路由 ============
|
||
|
||
@app.route('/api/dialects')
|
||
def api_dialects():
|
||
"""获取支持的方言列表"""
|
||
return jsonify({k: {'name': v['name'], 'greeting': v['greeting']} for k, v in DIALECTS.items()})
|
||
|
||
@app.route('/api/register', methods=['POST'])
|
||
def api_register():
|
||
"""用户注册"""
|
||
data = request.json
|
||
|
||
username = data.get('username', '').strip()
|
||
password = data.get('password', '')
|
||
confirm_password = data.get('confirm_password', '')
|
||
phone = data.get('phone', '').strip()
|
||
email = data.get('email', '').strip()
|
||
|
||
# 验证
|
||
if not username or len(username) < 2:
|
||
return jsonify({'error': '用户名至少2个字符'}), 400
|
||
if not password or len(password) < 6:
|
||
return jsonify({'error': '密码至少6个字符'}), 400
|
||
if password != confirm_password:
|
||
return jsonify({'error': '两次密码不一致'}), 400
|
||
if not phone or len(phone) < 11:
|
||
return jsonify({'error': '请输入正确的手机号'}), 400
|
||
|
||
users = load_users()
|
||
|
||
# 检查用户名是否存在
|
||
for uid, user in users.items():
|
||
if user['username'] == username:
|
||
return jsonify({'error': '用户名已存在'}), 400
|
||
if user['phone'] == phone:
|
||
return jsonify({'error': '手机号已注册'}), 400
|
||
|
||
# 创建用户
|
||
user_id = str(uuid.uuid4())
|
||
users[user_id] = {
|
||
'id': user_id,
|
||
'username': username,
|
||
'password': generate_password_hash(password),
|
||
'phone': phone,
|
||
'email': email,
|
||
'created_at': datetime.datetime.now().isoformat(),
|
||
'chats': []
|
||
}
|
||
|
||
save_users(users)
|
||
|
||
token = generate_token(user_id)
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'token': token,
|
||
'user': {
|
||
'id': user_id,
|
||
'username': username,
|
||
'phone': phone,
|
||
'email': email
|
||
}
|
||
})
|
||
|
||
@app.route('/api/login', methods=['POST'])
|
||
def api_login():
|
||
"""用户登录"""
|
||
data = request.json
|
||
|
||
username = data.get('username', '').strip()
|
||
password = data.get('password', '')
|
||
|
||
if not username or not password:
|
||
return jsonify({'error': '请输入用户名和密码'}), 400
|
||
|
||
users = load_users()
|
||
|
||
for user_id, user in users.items():
|
||
if user['username'] == username:
|
||
if check_password_hash(user['password'], password):
|
||
token = generate_token(user_id)
|
||
return jsonify({
|
||
'success': True,
|
||
'token': token,
|
||
'user': {
|
||
'id': user_id,
|
||
'username': user['username'],
|
||
'phone': user['phone'],
|
||
'email': user.get('email', '')
|
||
}
|
||
})
|
||
else:
|
||
return jsonify({'error': '密码错误'}), 400
|
||
|
||
return jsonify({'error': '用户不存在'}), 400
|
||
|
||
@app.route('/api/user')
|
||
def api_user():
|
||
"""获取当前用户信息"""
|
||
user = get_current_user()
|
||
if not user:
|
||
return jsonify({'error': '未登录'}), 401
|
||
|
||
return jsonify({
|
||
'id': user['id'],
|
||
'username': user['username'],
|
||
'phone': user['phone'],
|
||
'email': user.get('email', '')
|
||
})
|
||
|
||
@app.route('/api/chats', methods=['GET'])
|
||
def api_get_chats():
|
||
"""获取用户的对话列表"""
|
||
user = get_current_user()
|
||
if not user:
|
||
return jsonify({'error': '未登录'}), 401
|
||
|
||
chats = load_chats()
|
||
user_chats = []
|
||
|
||
for chat_id in user.get('chats', []):
|
||
if chat_id in chats:
|
||
chat = chats[chat_id]
|
||
user_chats.append({
|
||
'id': chat_id,
|
||
'title': chat['title'],
|
||
'dialect': chat['dialect'],
|
||
'created_at': chat['created_at'],
|
||
'updated_at': chat.get('updated_at', chat['created_at'])
|
||
})
|
||
|
||
# 按更新时间倒序
|
||
user_chats.sort(key=lambda x: x['updated_at'], reverse=True)
|
||
|
||
return jsonify(user_chats)
|
||
|
||
@app.route('/api/chats', methods=['POST'])
|
||
def api_create_chat():
|
||
"""创建新对话"""
|
||
user = get_current_user()
|
||
if not user:
|
||
return jsonify({'error': '未登录'}), 401
|
||
|
||
data = request.json
|
||
dialect = data.get('dialect', 'mandarin')
|
||
|
||
chat_id = str(uuid.uuid4())
|
||
|
||
chats = load_chats()
|
||
chats[chat_id] = {
|
||
'id': chat_id,
|
||
'user_id': user['id'],
|
||
'title': '新对话',
|
||
'dialect': dialect,
|
||
'messages': [],
|
||
'created_at': datetime.datetime.now().isoformat(),
|
||
'updated_at': datetime.datetime.now().isoformat()
|
||
}
|
||
save_chats(chats)
|
||
|
||
# 更新用户的对话列表
|
||
users = load_users()
|
||
if chat_id not in users[user['id']]['chats']:
|
||
users[user['id']]['chats'].append(chat_id)
|
||
save_users(users)
|
||
|
||
return jsonify({
|
||
'id': chat_id,
|
||
'title': '新对话',
|
||
'dialect': dialect
|
||
})
|
||
|
||
@app.route('/api/chats/<chat_id>', methods=['GET'])
|
||
def api_get_chat(chat_id):
|
||
"""获取对话详情"""
|
||
user = get_current_user()
|
||
if not user:
|
||
return jsonify({'error': '未登录'}), 401
|
||
|
||
chats = load_chats()
|
||
chat = chats.get(chat_id)
|
||
|
||
if not chat or chat['user_id'] != user['id']:
|
||
return jsonify({'error': '对话不存在'}), 404
|
||
|
||
return jsonify(chat)
|
||
|
||
@app.route('/api/chats/<chat_id>', methods=['DELETE'])
|
||
def api_delete_chat(chat_id):
|
||
"""删除对话"""
|
||
user = get_current_user()
|
||
if not user:
|
||
return jsonify({'error': '未登录'}), 401
|
||
|
||
chats = load_chats()
|
||
chat = chats.get(chat_id)
|
||
|
||
if not chat or chat['user_id'] != user['id']:
|
||
return jsonify({'error': '对话不存在'}), 404
|
||
|
||
del chats[chat_id]
|
||
save_chats(chats)
|
||
|
||
# 更新用户的对话列表
|
||
users = load_users()
|
||
if chat_id in users[user['id']]['chats']:
|
||
users[user['id']]['chats'].remove(chat_id)
|
||
save_users(users)
|
||
|
||
return jsonify({'success': True})
|
||
|
||
@app.route('/api/chats/<chat_id>/send', methods=['POST'])
|
||
def api_send_message(chat_id):
|
||
"""发送消息"""
|
||
user = get_current_user()
|
||
if not user:
|
||
return jsonify({'error': '未登录'}), 401
|
||
|
||
data = request.json
|
||
content = data.get('content', '').strip()
|
||
|
||
if not content:
|
||
return jsonify({'error': '消息不能为空'}), 400
|
||
|
||
chats = load_chats()
|
||
chat = chats.get(chat_id)
|
||
|
||
if not chat or chat['user_id'] != user['id']:
|
||
return jsonify({'error': '对话不存在'}), 404
|
||
|
||
# 添加用户消息
|
||
user_message = {
|
||
'role': 'user',
|
||
'content': content,
|
||
'time': datetime.datetime.now().isoformat()
|
||
}
|
||
chat['messages'].append(user_message)
|
||
|
||
# 调用大模型
|
||
messages = [{'role': m['role'], 'content': m['content']} for m in chat['messages']]
|
||
ai_response = call_llm(messages, chat['dialect'])
|
||
|
||
# 添加AI回复
|
||
ai_message = {
|
||
'role': 'assistant',
|
||
'content': ai_response,
|
||
'time': datetime.datetime.now().isoformat()
|
||
}
|
||
chat['messages'].append(ai_message)
|
||
|
||
# 更新对话标题(第一条消息的前20字)
|
||
if len(chat['messages']) <= 2:
|
||
chat['title'] = content[:20] + ('...' if len(content) > 20 else '')
|
||
|
||
chat['updated_at'] = datetime.datetime.now().isoformat()
|
||
save_chats(chats)
|
||
|
||
return jsonify({
|
||
'user_message': user_message,
|
||
'ai_message': ai_message
|
||
})
|
||
|
||
@app.route('/api/upload', methods=['POST'])
|
||
def api_upload():
|
||
"""上传文件"""
|
||
user = get_current_user()
|
||
if not user:
|
||
return jsonify({'error': '未登录'}), 401
|
||
|
||
if 'file' not in request.files:
|
||
return jsonify({'error': '没有上传文件'}), 400
|
||
|
||
file = request.files['file']
|
||
if file.filename == '':
|
||
return jsonify({'error': '没有选择文件'}), 400
|
||
|
||
# 保存文件
|
||
file_id = str(uuid.uuid4())
|
||
filename = f"{file_id}_{file.filename}"
|
||
filepath = UPLOAD_DIR / filename
|
||
file.save(filepath)
|
||
|
||
# 如果是图片,返回base64预览
|
||
is_image = file.content_type.startswith('image/')
|
||
preview = None
|
||
if is_image:
|
||
preview = f"/api/uploads/{filename}"
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'file_id': file_id,
|
||
'filename': file.filename,
|
||
'is_image': is_image,
|
||
'preview': preview
|
||
})
|
||
|
||
@app.route('/api/uploads/<filename>')
|
||
def api_get_upload(filename):
|
||
"""获取上传的文件"""
|
||
return send_file(UPLOAD_DIR / filename)
|
||
|
||
if __name__ == '__main__':
|
||
print("=" * 50)
|
||
print("方言版AI对话助手")
|
||
print("=" * 50)
|
||
print(f"访问地址: http://localhost:19002")
|
||
print("=" * 50)
|
||
|
||
app.run(host='0.0.0.0', port=19002, debug=True) |