feat: 后台管理添加用户管理功能 + 前端用户注册登录调用backend API

This commit is contained in:
2026-04-27 17:16:44 +08:00
parent 24ba04b3e3
commit e9357577cb
4 changed files with 646 additions and 80 deletions

View File

@@ -147,6 +147,25 @@ def init_db():
)
''')
# 用户表(前端注册用户)
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL,
phone TEXT NOT NULL UNIQUE,
email TEXT,
avatar TEXT DEFAULT '👤',
signature TEXT,
gender TEXT,
age INTEGER,
region TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_login_at TIMESTAMP
)
''')
# 初始化默认大模型配置
cursor.execute('SELECT COUNT(*) FROM llm_configs')
if cursor.fetchone()[0] == 0:
@@ -295,6 +314,273 @@ def admin_login():
return jsonify({'error': '用户名或密码错误'}), 401
# ==================== 用户管理 ====================
@app.route('/api/register', methods=['POST'])
def user_register():
"""用户注册"""
data = request.json
username = data.get('username')
password = data.get('password')
phone = data.get('phone')
email = data.get('email')
code = data.get('code') # 验证码(暂时只校验格式)
# 参数验证
if not username or not password or not phone:
return jsonify({'error': '请填写完整信息'}), 400
# 用户名长度检查
if len(username) < 2 or len(username) > 20:
return jsonify({'error': '用户名长度应为2-20字符'}), 400
# 手机号格式检查
import re
if not re.match(r'^1[3-9]\d{9}$', phone):
return jsonify({'error': '手机号格式不正确'}), 400
# 验证码检查模拟6位数字
if not code or not re.match(r'^\d{6}$', code):
return jsonify({'error': '验证码格式不正确'}), 400
# 邮箱格式检查(可选)
if email and not re.match(r'^[^\s@]+@[^\s@]+\.[^\s@]+$', email):
return jsonify({'error': '邮箱格式不正确'}), 400
conn = get_db()
cursor = conn.cursor()
# 检查用户名是否已存在
cursor.execute('SELECT id FROM users WHERE username = ?', (username,))
if cursor.fetchone():
conn.close()
return jsonify({'error': '用户名已存在'}), 400
# 检查手机号是否已存在
cursor.execute('SELECT id FROM users WHERE phone = ?', (phone,))
if cursor.fetchone():
conn.close()
return jsonify({'error': '手机号已注册'}), 400
# 创建用户
password_hash = hashlib.sha256(password.encode()).hexdigest()
cursor.execute('''
INSERT INTO users (username, password_hash, phone, email)
VALUES (?, ?, ?, ?)
''', (username, password_hash, phone, email))
# 记录注册统计
cursor.execute('INSERT INTO stats_logs (log_type, log_key) VALUES (?, ?)', ('user_register', 'new'))
conn.commit()
user_id = cursor.lastrowid
conn.close()
return jsonify({'success': True, 'user_id': user_id})
@app.route('/api/login', methods=['POST'])
def user_login():
"""用户登录"""
data = request.json
username = data.get('username')
password = data.get('password')
if not username or not password:
return jsonify({'error': '请输入用户名和密码'}), 400
password_hash = hashlib.sha256(password.encode()).hexdigest()
conn = get_db()
cursor = conn.cursor()
cursor.execute('SELECT * FROM users WHERE username = ? AND password_hash = ?', (username, password_hash))
user = cursor.fetchone()
if user:
# 更新最后登录时间
cursor.execute('UPDATE users SET last_login_at = CURRENT_TIMESTAMP WHERE id = ?', (user['id'],))
conn.commit()
conn.close()
return jsonify({
'success': True,
'user': {
'id': user['id'],
'username': user['username'],
'phone': user['phone'],
'email': user['email'],
'avatar': user['avatar'],
'signature': user['signature'],
'gender': user['gender'],
'age': user['age'],
'region': user['region']
}
})
else:
conn.close()
return jsonify({'error': '用户名或密码错误'}), 401
@app.route('/api/admin/users', methods=['GET'])
def get_users():
"""获取用户列表"""
conn = get_db()
cursor = conn.cursor()
# 支持搜索
search = request.args.get('search', '')
if search:
cursor.execute('''
SELECT id, username, phone, email, avatar, signature, gender, age, region,
created_at, last_login_at FROM users
WHERE username LIKE ? OR phone LIKE ? OR email LIKE ?
ORDER BY created_at DESC
''', (f'%{search}%', f'%{search}%', f'%{search}%'))
else:
cursor.execute('''
SELECT id, username, phone, email, avatar, signature, gender, age, region,
created_at, last_login_at FROM users ORDER BY created_at DESC
''')
users = [dict(row) for row in cursor.fetchall()]
conn.close()
return jsonify(users)
@app.route('/api/admin/users/<int:id>', methods=['GET'])
def get_user(id):
"""获取单个用户"""
conn = get_db()
cursor = conn.cursor()
cursor.execute('SELECT * FROM users WHERE id = ?', (id,))
user = cursor.fetchone()
conn.close()
if user:
return jsonify(dict(user))
else:
return jsonify({'error': '用户不存在'}), 404
@app.route('/api/admin/users/<int:id>', methods=['PUT'])
def update_user(id):
"""更新用户信息"""
data = request.json
conn = get_db()
cursor = conn.cursor()
# 检查用户是否存在
cursor.execute('SELECT id FROM users WHERE id = ?', (id,))
if not cursor.fetchone():
conn.close()
return jsonify({'error': '用户不存在'}), 404
# 检查用户名是否重复(如果要修改用户名)
if data.get('username'):
cursor.execute('SELECT id FROM users WHERE username = ? AND id != ?', (data['username'], id))
if cursor.fetchone():
conn.close()
return jsonify({'error': '用户名已存在'}), 400
# 更新用户信息
cursor.execute('''
UPDATE users SET username=?, email=?, avatar=?, signature=?, gender=?, age=?, region=?,
updated_at=CURRENT_TIMESTAMP WHERE id=?
''', (data.get('username'), data.get('email'), data.get('avatar', '👤'),
data.get('signature'), data.get('gender'), data.get('age'),
data.get('region'), id))
conn.commit()
conn.close()
return jsonify({'success': True})
@app.route('/api/admin/users/<int:id>', methods=['DELETE'])
def delete_user(id):
"""删除用户"""
conn = get_db()
cursor = conn.cursor()
cursor.execute('DELETE FROM users WHERE id = ?', (id,))
conn.commit()
conn.close()
return jsonify({'success': True})
@app.route('/api/admin/users/<int:id>/password', methods=['PUT'])
def reset_user_password(id):
"""重置用户密码"""
data = request.json
new_password = data.get('password')
if not new_password or len(new_password) < 6:
return jsonify({'error': '密码长度至少6位'}), 400
password_hash = hashlib.sha256(new_password.encode()).hexdigest()
conn = get_db()
cursor = conn.cursor()
cursor.execute('UPDATE users SET password_hash=?, updated_at=CURRENT_TIMESTAMP WHERE id=?',
(password_hash, id))
conn.commit()
conn.close()
return jsonify({'success': True})
@app.route('/api/user/<int:id>', methods=['PUT'])
def update_user_profile(id):
"""用户更新自己的资料"""
data = request.json
conn = get_db()
cursor = conn.cursor()
# 检查用户是否存在
cursor.execute('SELECT id FROM users WHERE id = ?', (id,))
if not cursor.fetchone():
conn.close()
return jsonify({'error': '用户不存在'}), 404
# 更新用户资料
cursor.execute('''
UPDATE users SET avatar=?, signature=?, gender=?, age=?, region=?, email=?,
updated_at=CURRENT_TIMESTAMP WHERE id=?
''', (data.get('avatar', '👤'), data.get('signature'), data.get('gender'),
data.get('age'), data.get('region'), data.get('email'), id))
conn.commit()
conn.close()
return jsonify({'success': True})
@app.route('/api/user/<int:id>/password', methods=['PUT'])
def change_user_password(id):
"""用户修改自己的密码"""
data = request.json
old_password = data.get('old_password')
new_password = data.get('new_password')
if not old_password or not new_password:
return jsonify({'error': '请输入旧密码和新密码'}), 400
if len(new_password) < 6:
return jsonify({'error': '新密码长度至少6位'}), 400
old_hash = hashlib.sha256(old_password.encode()).hexdigest()
new_hash = hashlib.sha256(new_password.encode()).hexdigest()
conn = get_db()
cursor = conn.cursor()
# 验证旧密码
cursor.execute('SELECT id FROM users WHERE id = ? AND password_hash = ?', (id, old_hash))
if not cursor.fetchone():
conn.close()
return jsonify({'error': '旧密码不正确'}), 400
# 更新密码
cursor.execute('UPDATE users SET password_hash=?, updated_at=CURRENT_TIMESTAMP WHERE id=?',
(new_hash, id))
conn.commit()
conn.close()
return jsonify({'success': True})
# ==================== 大模型接口管理 ====================
@app.route('/api/admin/llm', methods=['GET'])