Files
pdf-translate-web/admin.py
coder 8c35a8741a feat: 系统配置支持动态增删用户类型和会员套餐
新增功能:
- UserTypeConfig 模型:用户类型配置支持动态增删
- MembershipPlanConfig 模型:会员套餐配置支持动态增删
- 用户类型管理页面:添加、编辑、删除、启用/禁用用户类型
- 会员套餐管理页面:添加、编辑、删除、上架/下架、推荐套餐
- 功能权限配置:支持选择功能列表
- 初始化默认配置功能

改进:
- settings.html 页面重构,提供配置入口链接
- 新增API接口支持增删改查操作
2026-04-11 10:25:03 +08:00

1220 lines
44 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
后台管理模块 V2
"""
from datetime import datetime, date, timedelta
from functools import wraps
from flask import Blueprint, render_template, request, jsonify, redirect, url_for, session, flash
from sqlalchemy import func, desc
import json
from models import (db, User, Translation, TranslationCache, GuestTranslation,
SystemConfig, OperationLog, DataPackage, UserPackage, DynamicConfig,
UserTypeConfig, MembershipPlanConfig)
from config import USER_LIMITS, MEMBERSHIP_PLANS
admin_bp = Blueprint('admin', __name__, url_prefix='/admin')
# ==================== 权限装饰器 ====================
def admin_required(f):
"""管理员权限装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
user_id = session.get('user_id')
if not user_id:
return redirect(url_for('login', next=request.url))
user = User.query.get(user_id)
if not user or not user.is_admin:
flash('需要管理员权限', 'error')
return redirect(url_for('index'))
return f(*args, **kwargs)
return decorated_function
# ==================== 后台首页 ====================
@admin_bp.route('/')
@admin_required
def dashboard():
"""后台首页 - 数据概览"""
# 用户统计
total_users = User.query.count()
new_users_today = User.query.filter(
func.date(User.created_at) == date.today()
).count()
vip_users = User.query.filter(User.user_type.startswith('vip')).count()
# 翻译统计
total_translations = Translation.query.count()
today_translations = Translation.query.filter(
func.date(Translation.created_at) == date.today()
).count()
# 缓存统计
total_cache = TranslationCache.query.count()
total_cache_hits = db.session.query(func.sum(TranslationCache.hit_count)).scalar() or 0
total_cache_size = db.session.query(func.sum(TranslationCache.file_size)).scalar() or 0
# 数据包销售统计
total_packages_sold = UserPackage.query.filter_by(payment_status='paid').count()
total_revenue = db.session.query(func.sum(UserPackage.price_paid)).filter(
UserPackage.payment_status == 'paid'
).scalar() or 0
# 最近翻译
recent_translations = Translation.query.order_by(desc(Translation.created_at)).limit(10).all()
# 最近用户
recent_users = User.query.order_by(desc(User.created_at)).limit(10).all()
# 每日翻译趋势最近7天
daily_stats = []
for i in range(6, -1, -1):
d = date.today() - timedelta(days=i)
count = Translation.query.filter(func.date(Translation.created_at) == d).count()
daily_stats.append({'date': d.strftime('%m-%d'), 'count': count})
return render_template('admin/dashboard.html',
total_users=total_users,
new_users_today=new_users_today,
vip_users=vip_users,
total_translations=total_translations,
today_translations=today_translations,
total_cache=total_cache,
total_cache_hits=total_cache_hits,
total_cache_size=total_cache_size,
total_packages_sold=total_packages_sold,
total_revenue=total_revenue,
recent_translations=recent_translations,
recent_users=recent_users,
daily_stats=daily_stats,
)
# ==================== 用户管理 ====================
@admin_bp.route('/users')
@admin_required
def users():
"""用户列表"""
page = request.args.get('page', 1, type=int)
search = request.args.get('search', '')
user_type = request.args.get('type', '')
query = User.query
if search:
query = query.filter(
(User.username.contains(search)) | (User.email.contains(search))
)
if user_type:
query = query.filter_by(user_type=user_type)
users = query.order_by(desc(User.created_at)).paginate(page=page, per_page=20)
return render_template('admin/users.html',
users=users,
search=search,
user_type=user_type,
user_types=['free', 'vip_basic', 'vip_pro', 'vip_enterprise', 'admin']
)
@admin_bp.route('/user/<int:user_id>', methods=['GET', 'POST'])
@admin_required
def user_detail(user_id):
"""用户详情/编辑"""
user = User.query.get_or_404(user_id)
if request.method == 'POST':
data = request.json if request.is_json else request.form
user.username = data.get('username', user.username)
user.email = data.get('email', user.email)
user.user_type = data.get('user_type', user.user_type)
user.is_active = data.get('is_active', user.is_active) == 'true' or data.get('is_active') == True
user.is_admin = data.get('is_admin', user.is_admin) == 'true' or data.get('is_admin') == True
# 会员到期时间
expire_str = data.get('membership_expire')
if expire_str:
try:
user.membership_expire = datetime.fromisoformat(expire_str)
except:
pass
# 密码
new_password = data.get('new_password')
if new_password:
user.set_password(new_password)
db.session.commit()
# 记录日志
log = OperationLog(
user_id=session.get('user_id'),
username='admin',
action='edit_user',
target=user.username,
detail=json.dumps(data, ensure_ascii=False) if isinstance(data, dict) else str(data)
)
db.session.add(log)
db.session.commit()
if request.is_json:
return jsonify({'success': True, 'user': user.to_dict()})
flash('用户信息已更新', 'success')
return redirect(url_for('admin.user_detail', user_id=user_id))
translations = Translation.query.filter_by(user_id=user_id).order_by(desc(Translation.created_at)).limit(20).all()
packages = UserPackage.query.filter_by(user_id=user_id).order_by(desc(UserPackage.purchased_at)).all()
return render_template('admin/user_detail.html', user=user, translations=translations, packages=packages)
@admin_bp.route('/user/<int:user_id>/delete', methods=['POST'])
@admin_required
def delete_user(user_id):
"""删除用户"""
user = User.query.get_or_404(user_id)
if user.is_admin and User.query.filter_by(is_admin=True).count() <= 1:
return jsonify({'error': '不能删除最后一个管理员'}), 400
username = user.username
db.session.delete(user)
db.session.commit()
log = OperationLog(
user_id=session.get('user_id'),
username='admin',
action='delete_user',
target=username
)
db.session.add(log)
db.session.commit()
return jsonify({'success': True})
# ==================== 翻译记录管理 ====================
@admin_bp.route('/translations')
@admin_required
def translations():
"""翻译记录列表"""
page = request.args.get('page', 1, type=int)
status = request.args.get('status', '')
search = request.args.get('search', '')
query = Translation.query
if status:
query = query.filter_by(status=status)
if search:
query = query.filter(Translation.original_filename.contains(search))
translations = query.order_by(desc(Translation.created_at)).paginate(page=page, per_page=20)
return render_template('admin/translations.html',
translations=translations,
status=status,
search=search
)
@admin_bp.route('/translation/<int:trans_id>')
@admin_required
def translation_detail(trans_id):
"""翻译详情"""
translation = Translation.query.get_or_404(trans_id)
user = User.query.get(translation.user_id) if translation.user_id else None
return render_template('admin/translation_detail.html',
translation=translation,
user=user
)
@admin_bp.route('/translation/<int:trans_id>/delete', methods=['POST'])
@admin_required
def delete_translation(trans_id):
"""删除翻译记录"""
translation = Translation.query.get_or_404(trans_id)
if translation.output_path:
import os
if os.path.exists(translation.output_path):
os.remove(translation.output_path)
db.session.delete(translation)
db.session.commit()
return jsonify({'success': True})
# ==================== 缓存管理 ====================
@admin_bp.route('/cache')
@admin_required
def cache_list():
"""缓存列表"""
page = request.args.get('page', 1, type=int)
caches = TranslationCache.query.order_by(desc(TranslationCache.hit_count)).paginate(page=page, per_page=20)
total_size = db.session.query(func.sum(TranslationCache.file_size)).scalar() or 0
total_hits = db.session.query(func.sum(TranslationCache.hit_count)).scalar() or 0
return render_template('admin/cache.html',
caches=caches,
total_size=total_size,
total_hits=total_hits
)
@admin_bp.route('/cache/<int:cache_id>/delete', methods=['POST'])
@admin_required
def delete_cache(cache_id):
"""删除缓存"""
cache = TranslationCache.query.get_or_404(cache_id)
if cache.cache_path:
import os
if os.path.exists(cache.cache_path):
os.remove(cache.cache_path)
db.session.delete(cache)
db.session.commit()
return jsonify({'success': True})
@admin_bp.route('/cache/clear', methods=['POST'])
@admin_required
def clear_cache():
"""清空缓存"""
import os
caches = TranslationCache.query.all()
for cache in caches:
if cache.cache_path and os.path.exists(cache.cache_path):
os.remove(cache.cache_path)
TranslationCache.query.delete()
db.session.commit()
return jsonify({'success': True, 'deleted': len(caches)})
# ==================== 系统配置 V2 ====================
@admin_bp.route('/settings', methods=['GET', 'POST'])
@admin_required
def settings():
"""系统配置"""
if request.method == 'POST':
data = request.json if request.is_json else request.form
for key, value in data.items():
SystemConfig.set(key, value)
if request.is_json:
return jsonify({'success': True})
flash('配置已保存', 'success')
# 获取所有配置
configs = SystemConfig.query.all()
config_dict = {c.key: c.value for c in configs}
# 获取动态配置
dynamic_configs = DynamicConfig.query.all()
# 获取LLM动态配置
llm_config = get_llm_config()
return render_template('admin/settings.html',
configs=config_dict,
dynamic_configs=dynamic_configs,
user_limits=USER_LIMITS,
membership_plans=MEMBERSHIP_PLANS,
llm_config=llm_config
)
# ==================== 用户权限配置 ====================
@admin_bp.route('/settings/user-limits', methods=['GET', 'POST'])
@admin_required
def user_limits_settings():
"""用户权限配置"""
if request.method == 'POST':
data = request.json
# 保存每个用户类型的配置
for user_type, limits in data.items():
for key, value in limits.items():
config_key = f"user_limit_{user_type}_{key}"
DynamicConfig.set(
config_key,
value,
category='user_limits',
value_type='int' if isinstance(value, int) else 'string',
user_id=session.get('user_id')
)
return jsonify({'success': True})
# 获取当前配置
limits_config = {}
for user_type in ['guest', 'free', 'vip_basic', 'vip_pro', 'vip_enterprise']:
limits_config[user_type] = {
'daily_translations': DynamicConfig.get(f'user_limit_{user_type}_daily_translations',
USER_LIMITS.get(user_type, {}).get('daily_translations', 10)),
'max_pages': DynamicConfig.get(f'user_limit_{user_type}_max_pages',
USER_LIMITS.get(user_type, {}).get('max_pages', 50)),
'max_file_size': DynamicConfig.get(f'user_limit_{user_type}_max_file_size',
USER_LIMITS.get(user_type, {}).get('max_file_size', 30*1024*1024)),
}
return render_template('admin/user_limits.html',
limits_config=limits_config,
default_limits=USER_LIMITS
)
# ==================== 会员套餐配置 ====================
@admin_bp.route('/settings/membership', methods=['GET', 'POST'])
@admin_required
def membership_settings():
"""会员套餐配置"""
if request.method == 'POST':
data = request.json
for plan_key, plan_data in data.items():
for key, value in plan_data.items():
config_key = f"membership_{plan_key}_{key}"
value_type = 'int' if isinstance(value, int) else 'float' if isinstance(value, float) else 'string'
DynamicConfig.set(
config_key,
value,
category='membership',
value_type=value_type,
user_id=session.get('user_id')
)
return jsonify({'success': True})
# 获取当前配置
plans_config = {}
for plan_key in ['vip_basic', 'vip_pro', 'vip_enterprise']:
default = MEMBERSHIP_PLANS.get(plan_key, {})
plans_config[plan_key] = {
'name': DynamicConfig.get(f'membership_{plan_key}_name', default.get('name', plan_key)),
'price': DynamicConfig.get(f'membership_{plan_key}_price', default.get('price', 0)),
'period': DynamicConfig.get(f'membership_{plan_key}_period', default.get('period', 'month')),
'description': DynamicConfig.get(f'membership_{plan_key}_description', default.get('description', '')),
}
return render_template('admin/membership.html',
plans_config=plans_config,
default_plans=MEMBERSHIP_PLANS
)
# ==================== 数据包套餐管理 ====================
@admin_bp.route('/packages')
@admin_required
def packages():
"""数据包套餐列表"""
packages = DataPackage.query.order_by(DataPackage.sort_order).all()
return render_template('admin/packages.html', packages=packages)
@admin_bp.route('/package/add', methods=['GET', 'POST'])
@admin_required
def add_package():
"""添加数据包套餐"""
if request.method == 'POST':
data = request.json if request.is_json else request.form
package = DataPackage(
name=data.get('name'),
description=data.get('description'),
translation_count=int(data.get('translation_count', 0)),
price=float(data.get('price', 0)),
original_price=float(data.get('original_price')) if data.get('original_price') else None,
valid_days=int(data.get('valid_days', 30)),
sort_order=int(data.get('sort_order', 0)),
is_active=data.get('is_active', 'true') == 'true' if isinstance(data.get('is_active'), str) else data.get('is_active', True),
is_recommended=data.get('is_recommended', 'true') == 'true' if isinstance(data.get('is_recommended'), str) else data.get('is_recommended', False),
)
db.session.add(package)
db.session.commit()
if request.is_json:
return jsonify({'success': True, 'package': package.to_dict()})
flash('数据包套餐已添加', 'success')
return redirect(url_for('admin.packages'))
return render_template('admin/package_form.html', package=None)
@admin_bp.route('/package/<int:package_id>/edit', methods=['GET', 'POST'])
@admin_required
def edit_package(package_id):
"""编辑数据包套餐"""
package = DataPackage.query.get_or_404(package_id)
if request.method == 'POST':
data = request.json if request.is_json else request.form
package.name = data.get('name', package.name)
package.description = data.get('description', package.description)
package.translation_count = int(data.get('translation_count', package.translation_count))
package.price = float(data.get('price', package.price))
package.original_price = float(data.get('original_price')) if data.get('original_price') else None
package.valid_days = int(data.get('valid_days', package.valid_days))
package.sort_order = int(data.get('sort_order', package.sort_order))
package.is_active = data.get('is_active', 'true') == 'true' if isinstance(data.get('is_active'), str) else data.get('is_active', True)
package.is_recommended = data.get('is_recommended', 'true') == 'true' if isinstance(data.get('is_recommended'), str) else data.get('is_recommended', False)
db.session.commit()
if request.is_json:
return jsonify({'success': True, 'package': package.to_dict()})
flash('数据包套餐已更新', 'success')
return redirect(url_for('admin.packages'))
return render_template('admin/package_form.html', package=package)
@admin_bp.route('/package/<int:package_id>/delete', methods=['POST'])
@admin_required
def delete_package(package_id):
"""删除数据包套餐"""
package = DataPackage.query.get_or_404(package_id)
db.session.delete(package)
db.session.commit()
return jsonify({'success': True})
@admin_bp.route('/package/<int:package_id>/toggle', methods=['POST'])
@admin_required
def toggle_package(package_id):
"""切换数据包套餐状态"""
package = DataPackage.query.get_or_404(package_id)
package.is_active = not package.is_active
db.session.commit()
return jsonify({'success': True, 'is_active': package.is_active})
# ==================== 操作日志 ====================
@admin_bp.route('/logs')
@admin_required
def logs():
"""操作日志"""
page = request.args.get('page', 1, type=int)
action = request.args.get('action', '')
query = OperationLog.query
if action:
query = query.filter_by(action=action)
logs = query.order_by(desc(OperationLog.created_at)).paginate(page=page, per_page=50)
return render_template('admin/logs.html', logs=logs, action=action)
# ==================== 统计报表 ====================
@admin_bp.route('/stats')
@admin_required
def stats():
"""统计报表"""
# 用户增长趋势最近30天
user_growth = []
for i in range(29, -1, -1):
d = date.today() - timedelta(days=i)
count = User.query.filter(func.date(User.created_at) == d).count()
user_growth.append({'date': d.strftime('%Y-%m-%d'), 'count': count})
# 翻译量趋势最近30天
translation_growth = []
for i in range(29, -1, -1):
d = date.today() - timedelta(days=i)
count = Translation.query.filter(func.date(Translation.created_at) == d).count()
translation_growth.append({'date': d.strftime('%Y-%m-%d'), 'count': count})
# 收入趋势最近30天
revenue_growth = []
for i in range(29, -1, -1):
d = date.today() - timedelta(days=i)
revenue = db.session.query(func.sum(UserPackage.price_paid)).filter(
UserPackage.payment_status == 'paid',
func.date(UserPackage.purchased_at) == d
).scalar() or 0
revenue_growth.append({'date': d.strftime('%Y-%m-%d'), 'revenue': float(revenue)})
# 用户类型分布
user_distribution = db.session.query(
User.user_type, func.count(User.id)
).group_by(User.user_type).all()
# 翻译状态分布
status_distribution = db.session.query(
Translation.status, func.count(Translation.id)
).group_by(Translation.status).all()
# Top用户
top_users = db.session.query(
User.username, func.count(Translation.id).label('count')
).join(Translation).group_by(User.id).order_by(desc('count')).limit(10).all()
# 数据包销售排行
top_packages = db.session.query(
DataPackage.name, func.count(UserPackage.id).label('count'),
func.sum(UserPackage.price_paid).label('revenue')
).join(UserPackage).filter(
UserPackage.payment_status == 'paid'
).group_by(DataPackage.id).order_by(desc('count')).limit(10).all()
return render_template('admin/stats.html',
user_growth=user_growth,
translation_growth=translation_growth,
revenue_growth=revenue_growth,
user_distribution=user_distribution,
status_distribution=status_distribution,
top_users=top_users,
top_packages=top_packages
)
# ==================== API接口 ====================
@admin_bp.route('/api/stats/summary')
@admin_required
def api_stats_summary():
"""API: 统计摘要"""
return jsonify({
'total_users': User.query.count(),
'today_users': User.query.filter(func.date(User.created_at) == date.today()).count(),
'total_translations': Translation.query.count(),
'today_translations': Translation.query.filter(func.date(Translation.created_at) == date.today()).count(),
'total_cache': TranslationCache.query.count(),
'cache_hits': db.session.query(func.sum(TranslationCache.hit_count)).scalar() or 0,
'total_packages': DataPackage.query.filter_by(is_active=True).count(),
'packages_sold': UserPackage.query.filter_by(payment_status='paid').count(),
'total_revenue': float(db.session.query(func.sum(UserPackage.price_paid)).filter(
UserPackage.payment_status == 'paid'
).scalar() or 0),
})
@admin_bp.route('/api/user/<int:user_id>/upgrade', methods=['POST'])
@admin_required
def api_user_upgrade(user_id):
"""API: 升级用户会员"""
user = User.query.get_or_404(user_id)
data = request.json
user_type = data.get('user_type')
months = data.get('months', 1)
valid_types = ['free', 'vip_basic', 'vip_pro', 'vip_enterprise', 'admin']
if user_type not in valid_types:
return jsonify({'error': '无效的用户类型'}), 400
user.user_type = user_type
if user_type.startswith('vip'):
user.membership_expire = datetime.utcnow() + timedelta(days=30 * months)
db.session.commit()
return jsonify({'success': True, 'user': user.to_dict()})
@admin_bp.route('/api/user/<int:user_id>/add-package', methods=['POST'])
@admin_required
def api_user_add_package(user_id):
"""API: 为用户添加数据包"""
user = User.query.get_or_404(user_id)
data = request.json
package_id = data.get('package_id')
package = DataPackage.query.get(package_id)
if not package:
return jsonify({'error': '数据包不存在'}), 404
user_package = UserPackage(
user_id=user.id,
package_id=package.id,
package_name=package.name,
translation_count=package.translation_count,
remaining_count=package.translation_count,
expire_at=datetime.utcnow() + timedelta(days=package.valid_days) if package.valid_days > 0 else None,
price_paid=0, # 管理员赠送
payment_status='paid'
)
db.session.add(user_package)
db.session.commit()
return jsonify({'success': True, 'package': {
'id': user_package.id,
'name': user_package.package_name,
'remaining': user_package.remaining_count
}})
# ==================== LLM大模型配置 ====================
@admin_bp.route('/llm_config')
@admin_required
def llm_config():
"""LLM配置页面"""
from config import LLM_CONFIG
# 从数据库获取配置,如果没有则使用默认值
config = {
'api_base': DynamicConfig.get('llm_api_base', LLM_CONFIG.get('api_base')),
'api_key': DynamicConfig.get('llm_api_key', LLM_CONFIG.get('api_key')),
'model': DynamicConfig.get('llm_model', LLM_CONFIG.get('model')),
'max_tokens': DynamicConfig.get('llm_max_tokens', LLM_CONFIG.get('max_tokens')),
'chunk_size': DynamicConfig.get('llm_chunk_size', LLM_CONFIG.get('chunk_size')),
'timeout': DynamicConfig.get('llm_timeout', LLM_CONFIG.get('timeout')),
}
return render_template('admin/llm_config.html', config=config)
@admin_bp.route('/llm_config/save', methods=['POST'])
@admin_required
def save_llm_config():
"""保存LLM配置"""
data = request.json
DynamicConfig.set('llm_api_base', data.get('api_base'), category='llm', user_id=session.get('user_id'))
DynamicConfig.set('llm_api_key', data.get('api_key'), category='llm', user_id=session.get('user_id'))
DynamicConfig.set('llm_model', data.get('model'), category='llm', user_id=session.get('user_id'))
DynamicConfig.set('llm_max_tokens', data.get('max_tokens'), category='llm', value_type='int', user_id=session.get('user_id'))
DynamicConfig.set('llm_chunk_size', data.get('chunk_size'), category='llm', value_type='int', user_id=session.get('user_id'))
DynamicConfig.set('llm_timeout', data.get('timeout'), category='llm', value_type='int', user_id=session.get('user_id'))
# 记录日志
log = OperationLog(
user_id=session.get('user_id'),
username='admin',
action='update_llm_config',
detail='更新大模型配置'
)
db.session.add(log)
db.session.commit()
return jsonify({'success': True})
@admin_bp.route('/llm_config/test', methods=['POST'])
@admin_required
def test_llm_connection():
"""测试LLM连接"""
data = request.json
try:
from openai import OpenAI
client = OpenAI(
api_key=data.get('api_key', 'sk-test'),
base_url=data.get('api_base'),
)
# 发送简单测试请求
response = client.chat.completions.create(
model=data.get('model'),
messages=[{"role": "user", "content": "Hello"}],
max_tokens=10,
timeout=10,
)
return jsonify({
'success': True,
'model': data.get('model'),
'response': response.choices[0].message.content[:50] if response.choices else 'OK'
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
@admin_bp.route('/llm_config/reset', methods=['POST'])
@admin_required
def reset_llm_config():
"""恢复默认LLM配置"""
from config import LLM_CONFIG
# 删除数据库中的LLM配置
DynamicConfig.query.filter_by(category='llm').delete()
db.session.commit()
return jsonify({'success': True})
# ==================== 获取当前LLM配置供其他模块使用 ====================
def get_llm_config():
"""获取当前LLM配置"""
from config import LLM_CONFIG
return {
'api_base': DynamicConfig.get('llm_api_base', LLM_CONFIG.get('api_base')),
'api_key': DynamicConfig.get('llm_api_key', LLM_CONFIG.get('api_key')),
'model': DynamicConfig.get('llm_model', LLM_CONFIG.get('model')),
'max_tokens': DynamicConfig.get('llm_max_tokens', LLM_CONFIG.get('max_tokens')),
'chunk_size': DynamicConfig.get('llm_chunk_size', LLM_CONFIG.get('chunk_size')),
'timeout': DynamicConfig.get('llm_timeout', LLM_CONFIG.get('timeout')),
}
# ==================== 用户类型配置管理(动态增删) ====================
@admin_bp.route('/user-types')
@admin_required
def user_types():
"""用户类型配置列表"""
user_types = UserTypeConfig.query.order_by(UserTypeConfig.sort_order).all()
# 如果数据库中没有数据,初始化默认配置
if not user_types:
init_default_user_types()
user_types = UserTypeConfig.query.order_by(UserTypeConfig.sort_order).all()
return render_template('admin/user_types.html', user_types=user_types)
@admin_bp.route('/user-types/add', methods=['GET', 'POST'])
@admin_required
def add_user_type():
"""添加用户类型"""
if request.method == 'POST':
data = request.json if request.is_json else request.form
# 检查type_key是否已存在
existing = UserTypeConfig.query.filter_by(type_key=data.get('type_key')).first()
if existing:
return jsonify({'error': '类型标识已存在'}), 400
user_type = UserTypeConfig(
type_key=data.get('type_key'),
display_name=data.get('display_name'),
daily_translations=int(data.get('daily_translations', 10)),
max_pages=int(data.get('max_pages', 50)),
max_file_size=int(data.get('max_file_size_mb', 30)) * 1024 * 1024,
features=data.get('features', '[]'),
sort_order=int(data.get('sort_order', 0)),
is_active=data.get('is_active', True),
is_system=False,
)
db.session.add(user_type)
db.session.commit()
# 记录日志
log = OperationLog(
user_id=session.get('user_id'),
username='admin',
action='add_user_type',
target=user_type.type_key,
detail=json.dumps(user_type.to_dict(), ensure_ascii=False)
)
db.session.add(log)
db.session.commit()
if request.is_json:
return jsonify({'success': True, 'user_type': user_type.to_dict()})
flash('用户类型已添加', 'success')
return redirect(url_for('admin.user_types'))
# 所有可用功能
all_features = [
{'key': 'basic_translate', 'name': '基础翻译'},
{'key': 'compare_view', 'name': '对照查看'},
{'key': 'retranslate', 'name': '重新翻译'},
{'key': 'history', 'name': '历史记录'},
{'key': 'priority_queue', 'name': '优先队列'},
{'key': 'export_pdf', 'name': '导出PDF'},
{'key': 'batch_translate', 'name': '批量翻译'},
{'key': 'custom_terms', 'name': '自定义术语'},
]
return render_template('admin/user_type_form.html', user_type=None, all_features=all_features)
@admin_bp.route('/user-types/<int:type_id>/edit', methods=['GET', 'POST'])
@admin_required
def edit_user_type(type_id):
"""编辑用户类型"""
user_type = UserTypeConfig.query.get_or_404(type_id)
if request.method == 'POST':
data = request.json if request.is_json else request.form
user_type.display_name = data.get('display_name', user_type.display_name)
user_type.daily_translations = int(data.get('daily_translations', user_type.daily_translations))
user_type.max_pages = int(data.get('max_pages', user_type.max_pages))
user_type.max_file_size = int(data.get('max_file_size_mb', user_type.max_file_size // 1024 // 1024)) * 1024 * 1024
user_type.features = data.get('features', user_type.features)
user_type.sort_order = int(data.get('sort_order', user_type.sort_order))
user_type.is_active = data.get('is_active', True) if isinstance(data.get('is_active'), bool) else data.get('is_active') == 'true'
db.session.commit()
# 记录日志
log = OperationLog(
user_id=session.get('user_id'),
username='admin',
action='edit_user_type',
target=user_type.type_key,
detail=json.dumps(user_type.to_dict(), ensure_ascii=False)
)
db.session.add(log)
db.session.commit()
if request.is_json:
return jsonify({'success': True, 'user_type': user_type.to_dict()})
flash('用户类型已更新', 'success')
return redirect(url_for('admin.user_types'))
# 所有可用功能
all_features = [
{'key': 'basic_translate', 'name': '基础翻译'},
{'key': 'compare_view', 'name': '对照查看'},
{'key': 'retranslate', 'name': '重新翻译'},
{'key': 'history', 'name': '历史记录'},
{'key': 'priority_queue', 'name': '优先队列'},
{'key': 'export_pdf', 'name': '导出PDF'},
{'key': 'batch_translate', 'name': '批量翻译'},
{'key': 'custom_terms', 'name': '自定义术语'},
]
return render_template('admin/user_type_form.html', user_type=user_type, all_features=all_features)
@admin_bp.route('/user-types/<int:type_id>/delete', methods=['POST'])
@admin_required
def delete_user_type(type_id):
"""删除用户类型"""
user_type = UserTypeConfig.query.get_or_404(type_id)
# 系统内置类型不可删除
if user_type.is_system:
return jsonify({'error': '系统内置类型不可删除'}), 400
# 检查是否有用户使用此类型
users_count = User.query.filter_by(user_type=user_type.type_key).count()
if users_count > 0:
return jsonify({'error': f'{users_count} 个用户使用此类型,请先修改用户类型'}), 400
type_key = user_type.type_key
db.session.delete(user_type)
db.session.commit()
# 记录日志
log = OperationLog(
user_id=session.get('user_id'),
username='admin',
action='delete_user_type',
target=type_key
)
db.session.add(log)
db.session.commit()
return jsonify({'success': True})
@admin_bp.route('/user-types/<int:type_id>/toggle', methods=['POST'])
@admin_required
def toggle_user_type(type_id):
"""切换用户类型状态"""
user_type = UserTypeConfig.query.get_or_404(type_id)
user_type.is_active = not user_type.is_active
db.session.commit()
return jsonify({'success': True, 'is_active': user_type.is_active})
@admin_bp.route('/user-types/init', methods=['POST'])
@admin_required
def init_user_types():
"""初始化默认用户类型"""
init_default_user_types()
return jsonify({'success': True})
def init_default_user_types():
"""初始化默认用户类型配置"""
from config import USER_LIMITS
defaults = [
('guest', '访客', USER_LIMITS.get('guest', {}), 0, True),
('free', '免费用户', USER_LIMITS.get('free', {}), 1, True),
('vip_basic', '基础会员', USER_LIMITS.get('vip_basic', {}), 2, True),
('vip_pro', '专业会员', USER_LIMITS.get('vip_pro', {}), 3, True),
('vip_enterprise', '企业会员', USER_LIMITS.get('vip_enterprise', {}), 4, True),
]
for type_key, display_name, limits, sort_order, is_system in defaults:
existing = UserTypeConfig.query.filter_by(type_key=type_key).first()
if not existing:
user_type = UserTypeConfig(
type_key=type_key,
display_name=display_name,
daily_translations=limits.get('daily_translations', 10),
max_pages=limits.get('max_pages', 50),
max_file_size=limits.get('max_file_size', 30*1024*1024),
features=json.dumps(limits.get('features', []), ensure_ascii=False),
sort_order=sort_order,
is_active=True,
is_system=is_system,
)
db.session.add(user_type)
db.session.commit()
# ==================== 会员套餐配置管理(动态增删) ====================
@admin_bp.route('/membership-plans')
@admin_required
def membership_plans():
"""会员套餐配置列表"""
plans = MembershipPlanConfig.query.order_by(MembershipPlanConfig.sort_order).all()
# 如果数据库中没有数据,初始化默认配置
if not plans:
init_default_membership_plans()
plans = MembershipPlanConfig.query.order_by(MembershipPlanConfig.sort_order).all()
# 获取所有用户类型供选择
user_types = UserTypeConfig.query.filter_by(is_active=True).all()
return render_template('admin/membership_plans.html', plans=plans, user_types=user_types)
@admin_bp.route('/membership-plans/add', methods=['GET', 'POST'])
@admin_required
def add_membership_plan():
"""添加会员套餐"""
if request.method == 'POST':
data = request.json if request.is_json else request.form
# 检查plan_key是否已存在
existing = MembershipPlanConfig.query.filter_by(plan_key=data.get('plan_key')).first()
if existing:
return jsonify({'error': '套餐标识已存在'}), 400
plan = MembershipPlanConfig(
plan_key=data.get('plan_key'),
display_name=data.get('display_name'),
price=float(data.get('price', 0)),
original_price=float(data.get('original_price')) if data.get('original_price') else None,
period=data.get('period', 'month'),
period_days=int(data.get('period_days', 30)),
description=data.get('description'),
user_type_key=data.get('user_type_key'),
sort_order=int(data.get('sort_order', 0)),
is_active=data.get('is_active', True),
is_recommended=data.get('is_recommended', False),
is_system=False,
)
db.session.add(plan)
db.session.commit()
# 记录日志
log = OperationLog(
user_id=session.get('user_id'),
username='admin',
action='add_membership_plan',
target=plan.plan_key,
detail=json.dumps(plan.to_dict(), ensure_ascii=False)
)
db.session.add(log)
db.session.commit()
if request.is_json:
return jsonify({'success': True, 'plan': plan.to_dict()})
flash('会员套餐已添加', 'success')
return redirect(url_for('admin.membership_plans'))
# 获取所有用户类型供选择
user_types = UserTypeConfig.query.filter_by(is_active=True).all()
return render_template('admin/membership_plan_form.html', plan=None, user_types=user_types)
@admin_bp.route('/membership-plans/<int:plan_id>/edit', methods=['GET', 'POST'])
@admin_required
def edit_membership_plan(plan_id):
"""编辑会员套餐"""
plan = MembershipPlanConfig.query.get_or_404(plan_id)
if request.method == 'POST':
data = request.json if request.is_json else request.form
plan.display_name = data.get('display_name', plan.display_name)
plan.price = float(data.get('price', plan.price))
plan.original_price = float(data.get('original_price')) if data.get('original_price') else None
plan.period = data.get('period', plan.period)
plan.period_days = int(data.get('period_days', plan.period_days))
plan.description = data.get('description', plan.description)
plan.user_type_key = data.get('user_type_key', plan.user_type_key)
plan.sort_order = int(data.get('sort_order', plan.sort_order))
plan.is_active = data.get('is_active', True) if isinstance(data.get('is_active'), bool) else data.get('is_active') == 'true'
plan.is_recommended = data.get('is_recommended', False) if isinstance(data.get('is_recommended'), bool) else data.get('is_recommended') == 'true'
db.session.commit()
# 记录日志
log = OperationLog(
user_id=session.get('user_id'),
username='admin',
action='edit_membership_plan',
target=plan.plan_key,
detail=json.dumps(plan.to_dict(), ensure_ascii=False)
)
db.session.add(log)
db.session.commit()
if request.is_json:
return jsonify({'success': True, 'plan': plan.to_dict()})
flash('会员套餐已更新', 'success')
return redirect(url_for('admin.membership_plans'))
# 获取所有用户类型供选择
user_types = UserTypeConfig.query.filter_by(is_active=True).all()
return render_template('admin/membership_plan_form.html', plan=plan, user_types=user_types)
@admin_bp.route('/membership-plans/<int:plan_id>/delete', methods=['POST'])
@admin_required
def delete_membership_plan(plan_id):
"""删除会员套餐"""
plan = MembershipPlanConfig.query.get_or_404(plan_id)
# 系统内置套餐不可删除
if plan.is_system:
return jsonify({'error': '系统内置套餐不可删除'}), 400
plan_key = plan.plan_key
db.session.delete(plan)
db.session.commit()
# 记录日志
log = OperationLog(
user_id=session.get('user_id'),
username='admin',
action='delete_membership_plan',
target=plan_key
)
db.session.add(log)
db.session.commit()
return jsonify({'success': True})
@admin_bp.route('/membership-plans/<int:plan_id>/toggle', methods=['POST'])
@admin_required
def toggle_membership_plan(plan_id):
"""切换会员套餐状态"""
plan = MembershipPlanConfig.query.get_or_404(plan_id)
plan.is_active = not plan.is_active
db.session.commit()
return jsonify({'success': True, 'is_active': plan.is_active})
@admin_bp.route('/membership-plans/<int:plan_id>/recommend', methods=['POST'])
@admin_required
def recommend_membership_plan(plan_id):
"""设置推荐套餐"""
plan = MembershipPlanConfig.query.get_or_404(plan_id)
plan.is_recommended = not plan.is_recommended
db.session.commit()
return jsonify({'success': True, 'is_recommended': plan.is_recommended})
@admin_bp.route('/membership-plans/init', methods=['POST'])
@admin_required
def init_membership_plans():
"""初始化默认会员套餐"""
init_default_membership_plans()
return jsonify({'success': True})
def init_default_membership_plans():
"""初始化默认会员套餐配置"""
from config import MEMBERSHIP_PLANS
defaults = [
('vip_basic', MEMBERSHIP_PLANS.get('vip_basic', {}), 'vip_basic', 0, True),
('vip_pro', MEMBERSHIP_PLANS.get('vip_pro', {}), 'vip_pro', 1, True),
('vip_enterprise', MEMBERSHIP_PLANS.get('vip_enterprise', {}), 'vip_enterprise', 2, True),
]
for plan_key, plan_data, user_type_key, sort_order, is_system in defaults:
existing = MembershipPlanConfig.query.filter_by(plan_key=plan_key).first()
if not existing:
plan = MembershipPlanConfig(
plan_key=plan_key,
display_name=plan_data.get('name', plan_key),
price=plan_data.get('price', 0),
original_price=None,
period=plan_data.get('period', 'month'),
period_days=30 if plan_data.get('period') == 'month' else 365 if plan_data.get('period') == 'year' else 90,
description=plan_data.get('description', ''),
user_type_key=user_type_key,
sort_order=sort_order,
is_active=True,
is_recommended=False,
is_system=is_system,
)
db.session.add(plan)
db.session.commit()
# ==================== API: 获取用户限制配置(供其他模块使用) ====================
def get_user_limits(user_type_key):
"""获取指定用户类型的限制配置"""
config = UserTypeConfig.query.filter_by(type_key=user_type_key, is_active=True).first()
if config:
return {
'daily_translations': config.daily_translations,
'max_pages': config.max_pages,
'max_file_size': config.max_file_size,
'features': config.get_features(),
}
# 如果数据库中没有,使用默认配置
from config import USER_LIMITS
return USER_LIMITS.get(user_type_key, USER_LIMITS.get('free', {}))
def get_all_user_types():
"""获取所有用户类型配置"""
types = UserTypeConfig.query.filter_by(is_active=True).order_by(UserTypeConfig.sort_order).all()
return {t.type_key: get_user_limits(t.type_key) for t in types}
def get_membership_plan(plan_key):
"""获取指定会员套餐配置"""
plan = MembershipPlanConfig.query.filter_by(plan_key=plan_key, is_active=True).first()
if plan:
return plan.to_dict()
from config import MEMBERSHIP_PLANS
return MEMBERSHIP_PLANS.get(plan_key, {})
def get_all_membership_plans():
"""获取所有会员套餐配置"""
plans = MembershipPlanConfig.query.filter_by(is_active=True).order_by(MembershipPlanConfig.sort_order).all()
return [p.to_dict() for p in plans]