初始化方言版AI对话助手

功能特点:
- 支持8种方言(普通话、四川话、粤语、上海话、客家话、闽南话、东北话、河南话)
- 用户注册登录系统(用户名、手机、邮箱可选、密码确认)
- 对话功能(文字输入、语音识别、文件图片上传)
- 多对话管理(新建、切换、删除)
- 移动端适配(响应式设计、触摸友好)

技术栈:
- Flask后端API
- 原生HTML/CSS/JS前端
- Web Speech API语音识别
- JWT用户认证
- 大模型: qwen3.5-4b

端口: 19002
This commit is contained in:
2026-04-08 12:09:46 +08:00
commit 376cdacf5e
7 changed files with 1354 additions and 0 deletions

475
backend/app.py Normal file
View File

@@ -0,0 +1,475 @@
"""
方言版AI对话助手 - 后端API
"""
from flask import Flask, request, jsonify, send_from_directory, send_file
from flask_cors import CORS
from werkzeug.security import generate_password_hash, check_password_hash
import jwt
import datetime
import json
import os
import uuid
import base64
import requests
from pathlib import Path
app = Flask(__name__, static_folder='../frontend', static_url_path='')
CORS(app)
# 配置
SECRET_KEY = 'dialect-chat-secret-key-2026'
LLM_BASE_URL = 'http://192.168.2.5:1234/v1'
LLM_API_KEY = 'sk-lm-fuP5tGU8:Hi7YU87jHyDP6Ay8Tl2j'
LLM_MODEL = 'qwen3.5-4b'
# 数据目录
DATA_DIR = Path(__file__).parent.parent / 'data'
USERS_FILE = DATA_DIR / 'users.json'
CHATS_FILE = DATA_DIR / 'chats.json'
UPLOAD_DIR = DATA_DIR / 'uploads'
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
# 方言配置
DIALECTS = {
'mandarin': {
'name': '普通话',
'prompt': '请用标准的普通话回复。',
'greeting': '你好!有什么我可以帮助你的吗?'
},
'sichuan': {
'name': '四川话',
'prompt': '请用四川话回复,使用四川方言的特色表达,比如"巴适""要得"""等。',
'greeting': '你好哇!有啥子事要得嘛?'
},
'cantonese': {
'name': '粤语',
'prompt': '请用粤语回复,使用粤语的特色表达,比如""""""""等。',
'greeting': '你好!有乜嘢可以帮你?'
},
'shanghai': {
'name': '上海话',
'prompt': '请用上海话回复,使用上海方言的特色表达。',
'greeting': '侬好!有啥事体伐?'
},
'hakka': {
'name': '客家话',
'prompt': '请用客家话回复,使用客家方言的特色表达。',
'greeting': '你好!有么个可以帮你?'
},
'minnan': {
'name': '闽南话',
'prompt': '请用闽南话回复,使用闽南方言的特色表达。',
'greeting': '你好!有啥米代志?'
},
'northeast': {
'name': '东北话',
'prompt': '请用东北话回复,使用东北方言的特色表达,比如"咋整""嘎哈""整挺好"等。',
'greeting': '哎呀妈呀,来了!有啥事儿啊?'
},
'henan': {
'name': '河南话',
'prompt': '请用河南话回复,使用河南方言的特色表达,比如"""弄啥嘞""可得劲"等。',
'greeting': '中!你有啥事儿说呗!'
}
}
# 初始化数据文件
def init_data():
if not USERS_FILE.exists():
USERS_FILE.write_text(json.dumps({}, ensure_ascii=False))
if not CHATS_FILE.exists():
CHATS_FILE.write_text(json.dumps({}, ensure_ascii=False))
init_data()
# 辅助函数
def load_users():
return json.loads(USERS_FILE.read_text(encoding='utf-8'))
def save_users(users):
USERS_FILE.write_text(json.dumps(users, ensure_ascii=False, indent=2), encoding='utf-8')
def load_chats():
return json.loads(CHATS_FILE.read_text(encoding='utf-8'))
def save_chats(chats):
CHATS_FILE.write_text(json.dumps(chats, ensure_ascii=False, indent=2), encoding='utf-8')
def generate_token(user_id):
return jwt.encode({
'user_id': user_id,
'exp': datetime.datetime.utcnow() + datetime.timedelta(days=7)
}, SECRET_KEY, algorithm='HS256')
def verify_token(token):
try:
return jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
except:
return None
def get_current_user():
token = request.headers.get('Authorization', '').replace('Bearer ', '')
if not token:
return None
data = verify_token(token)
if not data:
return None
users = load_users()
return users.get(data['user_id'])
def call_llm(messages, dialect='mandarin'):
"""调用大模型API"""
dialect_info = DIALECTS.get(dialect, DIALECTS['mandarin'])
# 添加系统提示
system_message = {
'role': 'system',
'content': f'你是一个友好的AI助手。{dialect_info["prompt"]}'
}
full_messages = [system_message] + messages[-20:] # 保留最近20条消息
try:
response = requests.post(
f'{LLM_BASE_URL}/chat/completions',
headers={
'Content-Type': 'application/json',
'Authorization': f'Bearer {LLM_API_KEY}'
},
json={
'model': LLM_MODEL,
'messages': full_messages,
'max_tokens': 1024,
'temperature': 0.8
},
timeout=60
)
if response.status_code == 200:
result = response.json()
return result['choices'][0]['message']['content']
else:
return f'抱歉,我遇到了一些问题,请稍后再试。'
except Exception as e:
return f'连接出现问题了,请稍后再试试。'
# ============ 页面路由 ============
@app.route('/')
def index():
return send_file('../frontend/index.html')
@app.route('/chat')
def chat_page():
return send_file('../frontend/chat.html')
# ============ API路由 ============
@app.route('/api/dialects')
def api_dialects():
"""获取支持的方言列表"""
return jsonify({k: {'name': v['name'], 'greeting': v['greeting']} for k, v in DIALECTS.items()})
@app.route('/api/register', methods=['POST'])
def api_register():
"""用户注册"""
data = request.json
username = data.get('username', '').strip()
password = data.get('password', '')
confirm_password = data.get('confirm_password', '')
phone = data.get('phone', '').strip()
email = data.get('email', '').strip()
# 验证
if not username or len(username) < 2:
return jsonify({'error': '用户名至少2个字符'}), 400
if not password or len(password) < 6:
return jsonify({'error': '密码至少6个字符'}), 400
if password != confirm_password:
return jsonify({'error': '两次密码不一致'}), 400
if not phone or len(phone) < 11:
return jsonify({'error': '请输入正确的手机号'}), 400
users = load_users()
# 检查用户名是否存在
for uid, user in users.items():
if user['username'] == username:
return jsonify({'error': '用户名已存在'}), 400
if user['phone'] == phone:
return jsonify({'error': '手机号已注册'}), 400
# 创建用户
user_id = str(uuid.uuid4())
users[user_id] = {
'id': user_id,
'username': username,
'password': generate_password_hash(password),
'phone': phone,
'email': email,
'created_at': datetime.datetime.now().isoformat(),
'chats': []
}
save_users(users)
token = generate_token(user_id)
return jsonify({
'success': True,
'token': token,
'user': {
'id': user_id,
'username': username,
'phone': phone,
'email': email
}
})
@app.route('/api/login', methods=['POST'])
def api_login():
"""用户登录"""
data = request.json
username = data.get('username', '').strip()
password = data.get('password', '')
if not username or not password:
return jsonify({'error': '请输入用户名和密码'}), 400
users = load_users()
for user_id, user in users.items():
if user['username'] == username:
if check_password_hash(user['password'], password):
token = generate_token(user_id)
return jsonify({
'success': True,
'token': token,
'user': {
'id': user_id,
'username': user['username'],
'phone': user['phone'],
'email': user.get('email', '')
}
})
else:
return jsonify({'error': '密码错误'}), 400
return jsonify({'error': '用户不存在'}), 400
@app.route('/api/user')
def api_user():
"""获取当前用户信息"""
user = get_current_user()
if not user:
return jsonify({'error': '未登录'}), 401
return jsonify({
'id': user['id'],
'username': user['username'],
'phone': user['phone'],
'email': user.get('email', '')
})
@app.route('/api/chats', methods=['GET'])
def api_get_chats():
"""获取用户的对话列表"""
user = get_current_user()
if not user:
return jsonify({'error': '未登录'}), 401
chats = load_chats()
user_chats = []
for chat_id in user.get('chats', []):
if chat_id in chats:
chat = chats[chat_id]
user_chats.append({
'id': chat_id,
'title': chat['title'],
'dialect': chat['dialect'],
'created_at': chat['created_at'],
'updated_at': chat.get('updated_at', chat['created_at'])
})
# 按更新时间倒序
user_chats.sort(key=lambda x: x['updated_at'], reverse=True)
return jsonify(user_chats)
@app.route('/api/chats', methods=['POST'])
def api_create_chat():
"""创建新对话"""
user = get_current_user()
if not user:
return jsonify({'error': '未登录'}), 401
data = request.json
dialect = data.get('dialect', 'mandarin')
chat_id = str(uuid.uuid4())
chats = load_chats()
chats[chat_id] = {
'id': chat_id,
'user_id': user['id'],
'title': '新对话',
'dialect': dialect,
'messages': [],
'created_at': datetime.datetime.now().isoformat(),
'updated_at': datetime.datetime.now().isoformat()
}
save_chats(chats)
# 更新用户的对话列表
users = load_users()
if chat_id not in users[user['id']]['chats']:
users[user['id']]['chats'].append(chat_id)
save_users(users)
return jsonify({
'id': chat_id,
'title': '新对话',
'dialect': dialect
})
@app.route('/api/chats/<chat_id>', methods=['GET'])
def api_get_chat(chat_id):
"""获取对话详情"""
user = get_current_user()
if not user:
return jsonify({'error': '未登录'}), 401
chats = load_chats()
chat = chats.get(chat_id)
if not chat or chat['user_id'] != user['id']:
return jsonify({'error': '对话不存在'}), 404
return jsonify(chat)
@app.route('/api/chats/<chat_id>', methods=['DELETE'])
def api_delete_chat(chat_id):
"""删除对话"""
user = get_current_user()
if not user:
return jsonify({'error': '未登录'}), 401
chats = load_chats()
chat = chats.get(chat_id)
if not chat or chat['user_id'] != user['id']:
return jsonify({'error': '对话不存在'}), 404
del chats[chat_id]
save_chats(chats)
# 更新用户的对话列表
users = load_users()
if chat_id in users[user['id']]['chats']:
users[user['id']]['chats'].remove(chat_id)
save_users(users)
return jsonify({'success': True})
@app.route('/api/chats/<chat_id>/send', methods=['POST'])
def api_send_message(chat_id):
"""发送消息"""
user = get_current_user()
if not user:
return jsonify({'error': '未登录'}), 401
data = request.json
content = data.get('content', '').strip()
if not content:
return jsonify({'error': '消息不能为空'}), 400
chats = load_chats()
chat = chats.get(chat_id)
if not chat or chat['user_id'] != user['id']:
return jsonify({'error': '对话不存在'}), 404
# 添加用户消息
user_message = {
'role': 'user',
'content': content,
'time': datetime.datetime.now().isoformat()
}
chat['messages'].append(user_message)
# 调用大模型
messages = [{'role': m['role'], 'content': m['content']} for m in chat['messages']]
ai_response = call_llm(messages, chat['dialect'])
# 添加AI回复
ai_message = {
'role': 'assistant',
'content': ai_response,
'time': datetime.datetime.now().isoformat()
}
chat['messages'].append(ai_message)
# 更新对话标题第一条消息的前20字
if len(chat['messages']) <= 2:
chat['title'] = content[:20] + ('...' if len(content) > 20 else '')
chat['updated_at'] = datetime.datetime.now().isoformat()
save_chats(chats)
return jsonify({
'user_message': user_message,
'ai_message': ai_message
})
@app.route('/api/upload', methods=['POST'])
def api_upload():
"""上传文件"""
user = get_current_user()
if not user:
return jsonify({'error': '未登录'}), 401
if 'file' not in request.files:
return jsonify({'error': '没有上传文件'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': '没有选择文件'}), 400
# 保存文件
file_id = str(uuid.uuid4())
filename = f"{file_id}_{file.filename}"
filepath = UPLOAD_DIR / filename
file.save(filepath)
# 如果是图片返回base64预览
is_image = file.content_type.startswith('image/')
preview = None
if is_image:
preview = f"/api/uploads/{filename}"
return jsonify({
'success': True,
'file_id': file_id,
'filename': file.filename,
'is_image': is_image,
'preview': preview
})
@app.route('/api/uploads/<filename>')
def api_get_upload(filename):
"""获取上传的文件"""
return send_file(UPLOAD_DIR / filename)
if __name__ == '__main__':
print("=" * 50)
print("方言版AI对话助手")
print("=" * 50)
print(f"访问地址: http://localhost:19002")
print("=" * 50)
app.run(host='0.0.0.0', port=19002, debug=True)