Files
dialect-chat/backend/app.py
hubian 376cdacf5e 初始化方言版AI对话助手
功能特点:
- 支持8种方言(普通话、四川话、粤语、上海话、客家话、闽南话、东北话、河南话)
- 用户注册登录系统(用户名、手机、邮箱可选、密码确认)
- 对话功能(文字输入、语音识别、文件图片上传)
- 多对话管理(新建、切换、删除)
- 移动端适配(响应式设计、触摸友好)

技术栈:
- Flask后端API
- 原生HTML/CSS/JS前端
- Web Speech API语音识别
- JWT用户认证
- 大模型: qwen3.5-4b

端口: 19002
2026-04-08 12:09:46 +08:00

475 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
方言版AI对话助手 - 后端API
"""
from flask import Flask, request, jsonify, send_from_directory, send_file
from flask_cors import CORS
from werkzeug.security import generate_password_hash, check_password_hash
import jwt
import datetime
import json
import os
import uuid
import base64
import requests
from pathlib import Path
app = Flask(__name__, static_folder='../frontend', static_url_path='')
CORS(app)
# 配置
SECRET_KEY = 'dialect-chat-secret-key-2026'
LLM_BASE_URL = 'http://192.168.2.5:1234/v1'
LLM_API_KEY = 'sk-lm-fuP5tGU8:Hi7YU87jHyDP6Ay8Tl2j'
LLM_MODEL = 'qwen3.5-4b'
# 数据目录
DATA_DIR = Path(__file__).parent.parent / 'data'
USERS_FILE = DATA_DIR / 'users.json'
CHATS_FILE = DATA_DIR / 'chats.json'
UPLOAD_DIR = DATA_DIR / 'uploads'
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
# 方言配置
DIALECTS = {
'mandarin': {
'name': '普通话',
'prompt': '请用标准的普通话回复。',
'greeting': '你好!有什么我可以帮助你的吗?'
},
'sichuan': {
'name': '四川话',
'prompt': '请用四川话回复,使用四川方言的特色表达,比如"巴适""要得"""等。',
'greeting': '你好哇!有啥子事要得嘛?'
},
'cantonese': {
'name': '粤语',
'prompt': '请用粤语回复,使用粤语的特色表达,比如""""""""等。',
'greeting': '你好!有乜嘢可以帮你?'
},
'shanghai': {
'name': '上海话',
'prompt': '请用上海话回复,使用上海方言的特色表达。',
'greeting': '侬好!有啥事体伐?'
},
'hakka': {
'name': '客家话',
'prompt': '请用客家话回复,使用客家方言的特色表达。',
'greeting': '你好!有么个可以帮你?'
},
'minnan': {
'name': '闽南话',
'prompt': '请用闽南话回复,使用闽南方言的特色表达。',
'greeting': '你好!有啥米代志?'
},
'northeast': {
'name': '东北话',
'prompt': '请用东北话回复,使用东北方言的特色表达,比如"咋整""嘎哈""整挺好"等。',
'greeting': '哎呀妈呀,来了!有啥事儿啊?'
},
'henan': {
'name': '河南话',
'prompt': '请用河南话回复,使用河南方言的特色表达,比如"""弄啥嘞""可得劲"等。',
'greeting': '中!你有啥事儿说呗!'
}
}
# 初始化数据文件
def init_data():
if not USERS_FILE.exists():
USERS_FILE.write_text(json.dumps({}, ensure_ascii=False))
if not CHATS_FILE.exists():
CHATS_FILE.write_text(json.dumps({}, ensure_ascii=False))
init_data()
# 辅助函数
def load_users():
return json.loads(USERS_FILE.read_text(encoding='utf-8'))
def save_users(users):
USERS_FILE.write_text(json.dumps(users, ensure_ascii=False, indent=2), encoding='utf-8')
def load_chats():
return json.loads(CHATS_FILE.read_text(encoding='utf-8'))
def save_chats(chats):
CHATS_FILE.write_text(json.dumps(chats, ensure_ascii=False, indent=2), encoding='utf-8')
def generate_token(user_id):
return jwt.encode({
'user_id': user_id,
'exp': datetime.datetime.utcnow() + datetime.timedelta(days=7)
}, SECRET_KEY, algorithm='HS256')
def verify_token(token):
try:
return jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
except:
return None
def get_current_user():
token = request.headers.get('Authorization', '').replace('Bearer ', '')
if not token:
return None
data = verify_token(token)
if not data:
return None
users = load_users()
return users.get(data['user_id'])
def call_llm(messages, dialect='mandarin'):
"""调用大模型API"""
dialect_info = DIALECTS.get(dialect, DIALECTS['mandarin'])
# 添加系统提示
system_message = {
'role': 'system',
'content': f'你是一个友好的AI助手。{dialect_info["prompt"]}'
}
full_messages = [system_message] + messages[-20:] # 保留最近20条消息
try:
response = requests.post(
f'{LLM_BASE_URL}/chat/completions',
headers={
'Content-Type': 'application/json',
'Authorization': f'Bearer {LLM_API_KEY}'
},
json={
'model': LLM_MODEL,
'messages': full_messages,
'max_tokens': 1024,
'temperature': 0.8
},
timeout=60
)
if response.status_code == 200:
result = response.json()
return result['choices'][0]['message']['content']
else:
return f'抱歉,我遇到了一些问题,请稍后再试。'
except Exception as e:
return f'连接出现问题了,请稍后再试试。'
# ============ 页面路由 ============
@app.route('/')
def index():
return send_file('../frontend/index.html')
@app.route('/chat')
def chat_page():
return send_file('../frontend/chat.html')
# ============ API路由 ============
@app.route('/api/dialects')
def api_dialects():
"""获取支持的方言列表"""
return jsonify({k: {'name': v['name'], 'greeting': v['greeting']} for k, v in DIALECTS.items()})
@app.route('/api/register', methods=['POST'])
def api_register():
"""用户注册"""
data = request.json
username = data.get('username', '').strip()
password = data.get('password', '')
confirm_password = data.get('confirm_password', '')
phone = data.get('phone', '').strip()
email = data.get('email', '').strip()
# 验证
if not username or len(username) < 2:
return jsonify({'error': '用户名至少2个字符'}), 400
if not password or len(password) < 6:
return jsonify({'error': '密码至少6个字符'}), 400
if password != confirm_password:
return jsonify({'error': '两次密码不一致'}), 400
if not phone or len(phone) < 11:
return jsonify({'error': '请输入正确的手机号'}), 400
users = load_users()
# 检查用户名是否存在
for uid, user in users.items():
if user['username'] == username:
return jsonify({'error': '用户名已存在'}), 400
if user['phone'] == phone:
return jsonify({'error': '手机号已注册'}), 400
# 创建用户
user_id = str(uuid.uuid4())
users[user_id] = {
'id': user_id,
'username': username,
'password': generate_password_hash(password),
'phone': phone,
'email': email,
'created_at': datetime.datetime.now().isoformat(),
'chats': []
}
save_users(users)
token = generate_token(user_id)
return jsonify({
'success': True,
'token': token,
'user': {
'id': user_id,
'username': username,
'phone': phone,
'email': email
}
})
@app.route('/api/login', methods=['POST'])
def api_login():
"""用户登录"""
data = request.json
username = data.get('username', '').strip()
password = data.get('password', '')
if not username or not password:
return jsonify({'error': '请输入用户名和密码'}), 400
users = load_users()
for user_id, user in users.items():
if user['username'] == username:
if check_password_hash(user['password'], password):
token = generate_token(user_id)
return jsonify({
'success': True,
'token': token,
'user': {
'id': user_id,
'username': user['username'],
'phone': user['phone'],
'email': user.get('email', '')
}
})
else:
return jsonify({'error': '密码错误'}), 400
return jsonify({'error': '用户不存在'}), 400
@app.route('/api/user')
def api_user():
"""获取当前用户信息"""
user = get_current_user()
if not user:
return jsonify({'error': '未登录'}), 401
return jsonify({
'id': user['id'],
'username': user['username'],
'phone': user['phone'],
'email': user.get('email', '')
})
@app.route('/api/chats', methods=['GET'])
def api_get_chats():
"""获取用户的对话列表"""
user = get_current_user()
if not user:
return jsonify({'error': '未登录'}), 401
chats = load_chats()
user_chats = []
for chat_id in user.get('chats', []):
if chat_id in chats:
chat = chats[chat_id]
user_chats.append({
'id': chat_id,
'title': chat['title'],
'dialect': chat['dialect'],
'created_at': chat['created_at'],
'updated_at': chat.get('updated_at', chat['created_at'])
})
# 按更新时间倒序
user_chats.sort(key=lambda x: x['updated_at'], reverse=True)
return jsonify(user_chats)
@app.route('/api/chats', methods=['POST'])
def api_create_chat():
"""创建新对话"""
user = get_current_user()
if not user:
return jsonify({'error': '未登录'}), 401
data = request.json
dialect = data.get('dialect', 'mandarin')
chat_id = str(uuid.uuid4())
chats = load_chats()
chats[chat_id] = {
'id': chat_id,
'user_id': user['id'],
'title': '新对话',
'dialect': dialect,
'messages': [],
'created_at': datetime.datetime.now().isoformat(),
'updated_at': datetime.datetime.now().isoformat()
}
save_chats(chats)
# 更新用户的对话列表
users = load_users()
if chat_id not in users[user['id']]['chats']:
users[user['id']]['chats'].append(chat_id)
save_users(users)
return jsonify({
'id': chat_id,
'title': '新对话',
'dialect': dialect
})
@app.route('/api/chats/<chat_id>', methods=['GET'])
def api_get_chat(chat_id):
"""获取对话详情"""
user = get_current_user()
if not user:
return jsonify({'error': '未登录'}), 401
chats = load_chats()
chat = chats.get(chat_id)
if not chat or chat['user_id'] != user['id']:
return jsonify({'error': '对话不存在'}), 404
return jsonify(chat)
@app.route('/api/chats/<chat_id>', methods=['DELETE'])
def api_delete_chat(chat_id):
"""删除对话"""
user = get_current_user()
if not user:
return jsonify({'error': '未登录'}), 401
chats = load_chats()
chat = chats.get(chat_id)
if not chat or chat['user_id'] != user['id']:
return jsonify({'error': '对话不存在'}), 404
del chats[chat_id]
save_chats(chats)
# 更新用户的对话列表
users = load_users()
if chat_id in users[user['id']]['chats']:
users[user['id']]['chats'].remove(chat_id)
save_users(users)
return jsonify({'success': True})
@app.route('/api/chats/<chat_id>/send', methods=['POST'])
def api_send_message(chat_id):
"""发送消息"""
user = get_current_user()
if not user:
return jsonify({'error': '未登录'}), 401
data = request.json
content = data.get('content', '').strip()
if not content:
return jsonify({'error': '消息不能为空'}), 400
chats = load_chats()
chat = chats.get(chat_id)
if not chat or chat['user_id'] != user['id']:
return jsonify({'error': '对话不存在'}), 404
# 添加用户消息
user_message = {
'role': 'user',
'content': content,
'time': datetime.datetime.now().isoformat()
}
chat['messages'].append(user_message)
# 调用大模型
messages = [{'role': m['role'], 'content': m['content']} for m in chat['messages']]
ai_response = call_llm(messages, chat['dialect'])
# 添加AI回复
ai_message = {
'role': 'assistant',
'content': ai_response,
'time': datetime.datetime.now().isoformat()
}
chat['messages'].append(ai_message)
# 更新对话标题第一条消息的前20字
if len(chat['messages']) <= 2:
chat['title'] = content[:20] + ('...' if len(content) > 20 else '')
chat['updated_at'] = datetime.datetime.now().isoformat()
save_chats(chats)
return jsonify({
'user_message': user_message,
'ai_message': ai_message
})
@app.route('/api/upload', methods=['POST'])
def api_upload():
"""上传文件"""
user = get_current_user()
if not user:
return jsonify({'error': '未登录'}), 401
if 'file' not in request.files:
return jsonify({'error': '没有上传文件'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': '没有选择文件'}), 400
# 保存文件
file_id = str(uuid.uuid4())
filename = f"{file_id}_{file.filename}"
filepath = UPLOAD_DIR / filename
file.save(filepath)
# 如果是图片返回base64预览
is_image = file.content_type.startswith('image/')
preview = None
if is_image:
preview = f"/api/uploads/{filename}"
return jsonify({
'success': True,
'file_id': file_id,
'filename': file.filename,
'is_image': is_image,
'preview': preview
})
@app.route('/api/uploads/<filename>')
def api_get_upload(filename):
"""获取上传的文件"""
return send_file(UPLOAD_DIR / filename)
if __name__ == '__main__':
print("=" * 50)
print("方言版AI对话助手")
print("=" * 50)
print(f"访问地址: http://localhost:19002")
print("=" * 50)
app.run(host='0.0.0.0', port=19002, debug=True)