""" 后台管理模块 V2 """ from datetime import datetime, date, timedelta from functools import wraps from flask import Blueprint, render_template, request, jsonify, redirect, url_for, session, flash from sqlalchemy import func, desc import json from models import (db, User, Translation, TranslationCache, GuestTranslation, SystemConfig, OperationLog, DataPackage, UserPackage, DynamicConfig, UserTypeConfig, MembershipPlanConfig, BackupLLMConfig) from config import USER_LIMITS, MEMBERSHIP_PLANS admin_bp = Blueprint('admin', __name__, url_prefix='/admin') # ==================== 权限装饰器 ==================== def admin_required(f): """管理员权限装饰器""" @wraps(f) def decorated_function(*args, **kwargs): user_id = session.get('user_id') if not user_id: return redirect(url_for('login', next=request.url)) user = User.query.get(user_id) if not user or not user.is_admin: flash('需要管理员权限', 'error') return redirect(url_for('index')) return f(*args, **kwargs) return decorated_function # ==================== 后台首页 ==================== @admin_bp.route('/') @admin_required def dashboard(): """后台首页 - 数据概览""" # 用户统计 total_users = User.query.count() new_users_today = User.query.filter( func.date(User.created_at) == date.today() ).count() vip_users = User.query.filter(User.user_type.startswith('vip')).count() # 翻译统计 total_translations = Translation.query.count() today_translations = Translation.query.filter( func.date(Translation.created_at) == date.today() ).count() # 缓存统计 total_cache = TranslationCache.query.count() total_cache_hits = db.session.query(func.sum(TranslationCache.hit_count)).scalar() or 0 total_cache_size = db.session.query(func.sum(TranslationCache.file_size)).scalar() or 0 # 数据包销售统计 total_packages_sold = UserPackage.query.filter_by(payment_status='paid').count() total_revenue = db.session.query(func.sum(UserPackage.price_paid)).filter( UserPackage.payment_status == 'paid' ).scalar() or 0 # 最近翻译 recent_translations = Translation.query.order_by(desc(Translation.created_at)).limit(10).all() # 最近用户 recent_users = User.query.order_by(desc(User.created_at)).limit(10).all() # 每日翻译趋势(最近7天) daily_stats = [] for i in range(6, -1, -1): d = date.today() - timedelta(days=i) count = Translation.query.filter(func.date(Translation.created_at) == d).count() daily_stats.append({'date': d.strftime('%m-%d'), 'count': count}) return render_template('admin/dashboard.html', total_users=total_users, new_users_today=new_users_today, vip_users=vip_users, total_translations=total_translations, today_translations=today_translations, total_cache=total_cache, total_cache_hits=total_cache_hits, total_cache_size=total_cache_size, total_packages_sold=total_packages_sold, total_revenue=total_revenue, recent_translations=recent_translations, recent_users=recent_users, daily_stats=daily_stats, ) # ==================== 用户管理 ==================== @admin_bp.route('/users') @admin_required def users(): """用户列表""" page = request.args.get('page', 1, type=int) search = request.args.get('search', '') user_type = request.args.get('type', '') query = User.query if search: query = query.filter( (User.username.contains(search)) | (User.email.contains(search)) ) if user_type: query = query.filter_by(user_type=user_type) users = query.order_by(desc(User.created_at)).paginate(page=page, per_page=20) return render_template('admin/users.html', users=users, search=search, user_type=user_type, user_types=['free', 'vip_basic', 'vip_pro', 'vip_enterprise', 'admin'] ) @admin_bp.route('/user/', methods=['GET', 'POST']) @admin_required def user_detail(user_id): """用户详情/编辑""" user = User.query.get_or_404(user_id) if request.method == 'POST': data = request.json if request.is_json else request.form user.username = data.get('username', user.username) user.email = data.get('email', user.email) user.user_type = data.get('user_type', user.user_type) user.is_active = data.get('is_active', user.is_active) == 'true' or data.get('is_active') == True user.is_admin = data.get('is_admin', user.is_admin) == 'true' or data.get('is_admin') == True # 会员到期时间 expire_str = data.get('membership_expire') if expire_str: try: user.membership_expire = datetime.fromisoformat(expire_str) except: pass # 密码 new_password = data.get('new_password') if new_password: user.set_password(new_password) db.session.commit() # 记录日志 log = OperationLog( user_id=session.get('user_id'), username='admin', action='edit_user', target=user.username, detail=json.dumps(data, ensure_ascii=False) if isinstance(data, dict) else str(data) ) db.session.add(log) db.session.commit() if request.is_json: return jsonify({'success': True, 'user': user.to_dict()}) flash('用户信息已更新', 'success') return redirect(url_for('admin.user_detail', user_id=user_id)) translations = Translation.query.filter_by(user_id=user_id).order_by(desc(Translation.created_at)).limit(20).all() packages = UserPackage.query.filter_by(user_id=user_id).order_by(desc(UserPackage.purchased_at)).all() return render_template('admin/user_detail.html', user=user, translations=translations, packages=packages) @admin_bp.route('/user//delete', methods=['POST']) @admin_required def delete_user(user_id): """删除用户""" user = User.query.get_or_404(user_id) if user.is_admin and User.query.filter_by(is_admin=True).count() <= 1: return jsonify({'error': '不能删除最后一个管理员'}), 400 username = user.username db.session.delete(user) db.session.commit() log = OperationLog( user_id=session.get('user_id'), username='admin', action='delete_user', target=username ) db.session.add(log) db.session.commit() return jsonify({'success': True}) # ==================== 翻译记录管理 ==================== @admin_bp.route('/translations') @admin_required def translations(): """翻译记录列表""" page = request.args.get('page', 1, type=int) status = request.args.get('status', '') search = request.args.get('search', '') query = Translation.query if status: query = query.filter_by(status=status) if search: query = query.filter(Translation.original_filename.contains(search)) translations = query.order_by(desc(Translation.created_at)).paginate(page=page, per_page=20) return render_template('admin/translations.html', translations=translations, status=status, search=search ) @admin_bp.route('/translation/') @admin_required def translation_detail(trans_id): """翻译详情""" translation = Translation.query.get_or_404(trans_id) user = User.query.get(translation.user_id) if translation.user_id else None return render_template('admin/translation_detail.html', translation=translation, user=user ) @admin_bp.route('/translation//delete', methods=['POST']) @admin_required def delete_translation(trans_id): """删除翻译记录""" translation = Translation.query.get_or_404(trans_id) if translation.output_path: import os if os.path.exists(translation.output_path): os.remove(translation.output_path) db.session.delete(translation) db.session.commit() return jsonify({'success': True}) # ==================== 缓存管理 ==================== @admin_bp.route('/cache') @admin_required def cache_list(): """缓存列表""" page = request.args.get('page', 1, type=int) caches = TranslationCache.query.order_by(desc(TranslationCache.hit_count)).paginate(page=page, per_page=20) total_size = db.session.query(func.sum(TranslationCache.file_size)).scalar() or 0 total_hits = db.session.query(func.sum(TranslationCache.hit_count)).scalar() or 0 return render_template('admin/cache.html', caches=caches, total_size=total_size, total_hits=total_hits ) @admin_bp.route('/cache//delete', methods=['POST']) @admin_required def delete_cache(cache_id): """删除缓存""" cache = TranslationCache.query.get_or_404(cache_id) if cache.cache_path: import os if os.path.exists(cache.cache_path): os.remove(cache.cache_path) db.session.delete(cache) db.session.commit() return jsonify({'success': True}) @admin_bp.route('/cache/clear', methods=['POST']) @admin_required def clear_cache(): """清空缓存""" import os caches = TranslationCache.query.all() for cache in caches: if cache.cache_path and os.path.exists(cache.cache_path): os.remove(cache.cache_path) TranslationCache.query.delete() db.session.commit() return jsonify({'success': True, 'deleted': len(caches)}) # ==================== 系统配置 V2 ==================== @admin_bp.route('/settings', methods=['GET', 'POST']) @admin_required def settings(): """系统配置""" # 获取LLM动态配置 llm_config = get_llm_config() # 获取网站基础配置 site_config = { 'site_name': DynamicConfig.get('site_name', 'PDF翻译助手'), 'site_footer': DynamicConfig.get('site_footer', '© 2026 PDF翻译助手'), 'max_file_size': DynamicConfig.get('max_file_size', 50), 'cache_expire_days': DynamicConfig.get('cache_expire_days', 30), 'enable_email_notify': DynamicConfig.get('enable_email_notify', True), 'enable_cache': DynamicConfig.get('enable_cache', True), 'enable_guest': DynamicConfig.get('enable_guest', True), 'default_source_lang': DynamicConfig.get('default_source_lang', 'en'), 'default_target_lang': DynamicConfig.get('default_target_lang', 'zh'), } return render_template('admin/settings.html', llm_config=llm_config, site_config=site_config ) @admin_bp.route('/settings/site', methods=['POST']) @admin_required def save_site_settings(): """保存网站基础配置""" data = request.json # 保存每个配置项 for key, value in data.items(): if key in ['max_file_size', 'cache_expire_days']: DynamicConfig.set(f'site_{key}', int(value), category='site', value_type='int', user_id=session.get('user_id')) elif key in ['enable_email_notify', 'enable_cache', 'enable_guest']: DynamicConfig.set(f'site_{key}', bool(value), category='site', value_type='bool', user_id=session.get('user_id')) else: DynamicConfig.set(f'site_{key}', value, category='site', user_id=session.get('user_id')) # 记录日志 log = OperationLog( user_id=session.get('user_id'), username='admin', action='save_site_settings', detail='保存网站基础配置' ) db.session.add(log) db.session.commit() return jsonify({'success': True}) # ==================== 用户权限配置 ==================== @admin_bp.route('/settings/user-limits', methods=['GET', 'POST']) @admin_required def user_limits_settings(): """用户权限配置""" if request.method == 'POST': data = request.json # 保存每个用户类型的配置 for user_type, limits in data.items(): for key, value in limits.items(): config_key = f"user_limit_{user_type}_{key}" DynamicConfig.set( config_key, value, category='user_limits', value_type='int' if isinstance(value, int) else 'string', user_id=session.get('user_id') ) return jsonify({'success': True}) # 获取当前配置 limits_config = {} for user_type in ['guest', 'free', 'vip_basic', 'vip_pro', 'vip_enterprise']: limits_config[user_type] = { 'daily_translations': DynamicConfig.get(f'user_limit_{user_type}_daily_translations', USER_LIMITS.get(user_type, {}).get('daily_translations', 10)), 'max_pages': DynamicConfig.get(f'user_limit_{user_type}_max_pages', USER_LIMITS.get(user_type, {}).get('max_pages', 50)), 'max_file_size': DynamicConfig.get(f'user_limit_{user_type}_max_file_size', USER_LIMITS.get(user_type, {}).get('max_file_size', 30*1024*1024)), } return render_template('admin/user_limits.html', limits_config=limits_config, default_limits=USER_LIMITS ) # ==================== 会员套餐配置 ==================== @admin_bp.route('/settings/membership', methods=['GET', 'POST']) @admin_required def membership_settings(): """会员套餐配置""" if request.method == 'POST': data = request.json for plan_key, plan_data in data.items(): for key, value in plan_data.items(): config_key = f"membership_{plan_key}_{key}" value_type = 'int' if isinstance(value, int) else 'float' if isinstance(value, float) else 'string' DynamicConfig.set( config_key, value, category='membership', value_type=value_type, user_id=session.get('user_id') ) return jsonify({'success': True}) # 获取当前配置 plans_config = {} for plan_key in ['vip_basic', 'vip_pro', 'vip_enterprise']: default = MEMBERSHIP_PLANS.get(plan_key, {}) plans_config[plan_key] = { 'name': DynamicConfig.get(f'membership_{plan_key}_name', default.get('name', plan_key)), 'price': DynamicConfig.get(f'membership_{plan_key}_price', default.get('price', 0)), 'period': DynamicConfig.get(f'membership_{plan_key}_period', default.get('period', 'month')), 'description': DynamicConfig.get(f'membership_{plan_key}_description', default.get('description', '')), } return render_template('admin/membership.html', plans_config=plans_config, default_plans=MEMBERSHIP_PLANS ) # ==================== 数据包套餐管理 ==================== @admin_bp.route('/packages') @admin_required def packages(): """数据包套餐列表""" packages = DataPackage.query.order_by(DataPackage.sort_order).all() return render_template('admin/packages.html', packages=packages) @admin_bp.route('/package/add', methods=['GET', 'POST']) @admin_required def add_package(): """添加数据包套餐""" if request.method == 'POST': data = request.json if request.is_json else request.form package = DataPackage( name=data.get('name'), description=data.get('description'), translation_count=int(data.get('translation_count', 0)), price=float(data.get('price', 0)), original_price=float(data.get('original_price')) if data.get('original_price') else None, valid_days=int(data.get('valid_days', 30)), sort_order=int(data.get('sort_order', 0)), is_active=data.get('is_active', 'true') == 'true' if isinstance(data.get('is_active'), str) else data.get('is_active', True), is_recommended=data.get('is_recommended', 'true') == 'true' if isinstance(data.get('is_recommended'), str) else data.get('is_recommended', False), ) db.session.add(package) db.session.commit() if request.is_json: return jsonify({'success': True, 'package': package.to_dict()}) flash('数据包套餐已添加', 'success') return redirect(url_for('admin.packages')) return render_template('admin/package_form.html', package=None) @admin_bp.route('/package//edit', methods=['GET', 'POST']) @admin_required def edit_package(package_id): """编辑数据包套餐""" package = DataPackage.query.get_or_404(package_id) if request.method == 'POST': data = request.json if request.is_json else request.form package.name = data.get('name', package.name) package.description = data.get('description', package.description) package.translation_count = int(data.get('translation_count', package.translation_count)) package.price = float(data.get('price', package.price)) package.original_price = float(data.get('original_price')) if data.get('original_price') else None package.valid_days = int(data.get('valid_days', package.valid_days)) package.sort_order = int(data.get('sort_order', package.sort_order)) package.is_active = data.get('is_active', 'true') == 'true' if isinstance(data.get('is_active'), str) else data.get('is_active', True) package.is_recommended = data.get('is_recommended', 'true') == 'true' if isinstance(data.get('is_recommended'), str) else data.get('is_recommended', False) db.session.commit() if request.is_json: return jsonify({'success': True, 'package': package.to_dict()}) flash('数据包套餐已更新', 'success') return redirect(url_for('admin.packages')) return render_template('admin/package_form.html', package=package) @admin_bp.route('/package//delete', methods=['POST']) @admin_required def delete_package(package_id): """删除数据包套餐""" package = DataPackage.query.get_or_404(package_id) db.session.delete(package) db.session.commit() return jsonify({'success': True}) @admin_bp.route('/package//toggle', methods=['POST']) @admin_required def toggle_package(package_id): """切换数据包套餐状态""" package = DataPackage.query.get_or_404(package_id) package.is_active = not package.is_active db.session.commit() return jsonify({'success': True, 'is_active': package.is_active}) # ==================== 操作日志 ==================== @admin_bp.route('/logs') @admin_required def logs(): """操作日志""" page = request.args.get('page', 1, type=int) action = request.args.get('action', '') query = OperationLog.query if action: query = query.filter_by(action=action) logs = query.order_by(desc(OperationLog.created_at)).paginate(page=page, per_page=50) return render_template('admin/logs.html', logs=logs, action=action) # ==================== 统计报表 ==================== @admin_bp.route('/stats') @admin_required def stats(): """统计报表""" # 用户增长趋势(最近30天) user_growth = [] for i in range(29, -1, -1): d = date.today() - timedelta(days=i) count = User.query.filter(func.date(User.created_at) == d).count() user_growth.append({'date': d.strftime('%Y-%m-%d'), 'count': count}) # 翻译量趋势(最近30天) translation_growth = [] for i in range(29, -1, -1): d = date.today() - timedelta(days=i) count = Translation.query.filter(func.date(Translation.created_at) == d).count() translation_growth.append({'date': d.strftime('%Y-%m-%d'), 'count': count}) # 收入趋势(最近30天) revenue_growth = [] for i in range(29, -1, -1): d = date.today() - timedelta(days=i) revenue = db.session.query(func.sum(UserPackage.price_paid)).filter( UserPackage.payment_status == 'paid', func.date(UserPackage.purchased_at) == d ).scalar() or 0 revenue_growth.append({'date': d.strftime('%Y-%m-%d'), 'revenue': float(revenue)}) # 用户类型分布 user_distribution = db.session.query( User.user_type, func.count(User.id) ).group_by(User.user_type).all() # 翻译状态分布 status_distribution = db.session.query( Translation.status, func.count(Translation.id) ).group_by(Translation.status).all() # Top用户 top_users = db.session.query( User.username, func.count(Translation.id).label('count') ).join(Translation).group_by(User.id).order_by(desc('count')).limit(10).all() # 数据包销售排行 top_packages = db.session.query( DataPackage.name, func.count(UserPackage.id).label('count'), func.sum(UserPackage.price_paid).label('revenue') ).join(UserPackage).filter( UserPackage.payment_status == 'paid' ).group_by(DataPackage.id).order_by(desc('count')).limit(10).all() return render_template('admin/stats.html', user_growth=user_growth, translation_growth=translation_growth, revenue_growth=revenue_growth, user_distribution=user_distribution, status_distribution=status_distribution, top_users=top_users, top_packages=top_packages ) # ==================== API接口 ==================== @admin_bp.route('/api/stats/summary') @admin_required def api_stats_summary(): """API: 统计摘要""" return jsonify({ 'total_users': User.query.count(), 'today_users': User.query.filter(func.date(User.created_at) == date.today()).count(), 'total_translations': Translation.query.count(), 'today_translations': Translation.query.filter(func.date(Translation.created_at) == date.today()).count(), 'total_cache': TranslationCache.query.count(), 'cache_hits': db.session.query(func.sum(TranslationCache.hit_count)).scalar() or 0, 'total_packages': DataPackage.query.filter_by(is_active=True).count(), 'packages_sold': UserPackage.query.filter_by(payment_status='paid').count(), 'total_revenue': float(db.session.query(func.sum(UserPackage.price_paid)).filter( UserPackage.payment_status == 'paid' ).scalar() or 0), }) @admin_bp.route('/api/user//upgrade', methods=['POST']) @admin_required def api_user_upgrade(user_id): """API: 升级用户会员""" user = User.query.get_or_404(user_id) data = request.json user_type = data.get('user_type') months = data.get('months', 1) valid_types = ['free', 'vip_basic', 'vip_pro', 'vip_enterprise', 'admin'] if user_type not in valid_types: return jsonify({'error': '无效的用户类型'}), 400 user.user_type = user_type if user_type.startswith('vip'): user.membership_expire = datetime.utcnow() + timedelta(days=30 * months) db.session.commit() return jsonify({'success': True, 'user': user.to_dict()}) @admin_bp.route('/api/user//add-package', methods=['POST']) @admin_required def api_user_add_package(user_id): """API: 为用户添加数据包""" user = User.query.get_or_404(user_id) data = request.json package_id = data.get('package_id') package = DataPackage.query.get(package_id) if not package: return jsonify({'error': '数据包不存在'}), 404 user_package = UserPackage( user_id=user.id, package_id=package.id, package_name=package.name, translation_count=package.translation_count, remaining_count=package.translation_count, expire_at=datetime.utcnow() + timedelta(days=package.valid_days) if package.valid_days > 0 else None, price_paid=0, # 管理员赠送 payment_status='paid' ) db.session.add(user_package) db.session.commit() return jsonify({'success': True, 'package': { 'id': user_package.id, 'name': user_package.package_name, 'remaining': user_package.remaining_count }}) # ==================== LLM大模型配置 ==================== @admin_bp.route('/llm_config') @admin_required def llm_config(): """LLM配置页面""" # 获取所有大模型配置 llm_configs = BackupLLMConfig.query.order_by(BackupLLMConfig.sort_order).all() # 如果数据库中没有数据,初始化默认配置 if not llm_configs: init_default_backup_llm() llm_configs = BackupLLMConfig.query.order_by(BackupLLMConfig.sort_order).all() return render_template('admin/llm_config.html', llm_configs=[c.to_dict() for c in llm_configs]) @admin_bp.route('/llm_config/save', methods=['POST']) @admin_required def save_llm_config(): """保存LLM配置""" data = request.json DynamicConfig.set('llm_api_base', data.get('api_base'), category='llm', user_id=session.get('user_id')) DynamicConfig.set('llm_api_key', data.get('api_key'), category='llm', user_id=session.get('user_id')) DynamicConfig.set('llm_model', data.get('model'), category='llm', user_id=session.get('user_id')) DynamicConfig.set('llm_max_tokens', data.get('max_tokens'), category='llm', value_type='int', user_id=session.get('user_id')) DynamicConfig.set('llm_chunk_size', data.get('chunk_size'), category='llm', value_type='int', user_id=session.get('user_id')) DynamicConfig.set('llm_timeout', data.get('timeout'), category='llm', value_type='int', user_id=session.get('user_id')) # 记录日志 log = OperationLog( user_id=session.get('user_id'), username='admin', action='update_llm_config', detail='更新大模型配置' ) db.session.add(log) db.session.commit() return jsonify({'success': True}) @admin_bp.route('/llm_config/test', methods=['POST']) @admin_required def test_llm_connection(): """测试LLM连接""" data = request.json try: from openai import OpenAI client = OpenAI( api_key=data.get('api_key', 'sk-test'), base_url=data.get('api_base'), ) # 发送简单测试请求 response = client.chat.completions.create( model=data.get('model'), messages=[{"role": "user", "content": "Hello"}], max_tokens=10, timeout=10, ) return jsonify({ 'success': True, 'model': data.get('model'), 'response': response.choices[0].message.content[:50] if response.choices else 'OK' }) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @admin_bp.route('/llm_config/reset', methods=['POST']) @admin_required def reset_llm_config(): """恢复默认LLM配置""" from config import LLM_CONFIG # 删除数据库中的LLM配置 DynamicConfig.query.filter_by(category='llm').delete() db.session.commit() return jsonify({'success': True}) # ==================== 获取当前LLM配置(供其他模块使用) ==================== def get_llm_config(): """获取当前默认LLM配置""" # 从数据库获取默认配置 default_config = BackupLLMConfig.query.filter_by(is_default=True, is_active=True).first() if default_config: return { 'api_base': default_config.api_base, 'api_key': default_config.api_key or '', 'model': default_config.model, 'max_tokens': default_config.max_tokens, 'chunk_size': default_config.chunk_size, 'timeout': default_config.timeout, 'provider_name': default_config.provider_name, } # 如果没有默认配置,尝试获取第一个启用的 first_config = BackupLLMConfig.query.filter_by(is_active=True).order_by(BackupLLMConfig.sort_order).first() if first_config: return { 'api_base': first_config.api_base, 'api_key': first_config.api_key or '', 'model': first_config.model, 'max_tokens': first_config.max_tokens, 'chunk_size': first_config.chunk_size, 'timeout': first_config.timeout, 'provider_name': first_config.provider_name, } # 如果都没有,使用配置文件默认值 from config import LLM_CONFIG return LLM_CONFIG # ==================== 用户类型配置管理(动态增删) ==================== @admin_bp.route('/user-types') @admin_required def user_types(): """用户类型配置列表""" user_types = UserTypeConfig.query.order_by(UserTypeConfig.sort_order).all() # 如果数据库中没有数据,初始化默认配置 if not user_types: init_default_user_types() user_types = UserTypeConfig.query.order_by(UserTypeConfig.sort_order).all() return render_template('admin/user_types.html', user_types=user_types) @admin_bp.route('/user-types/add', methods=['GET', 'POST']) @admin_required def add_user_type(): """添加用户类型""" if request.method == 'POST': data = request.json if request.is_json else request.form # 检查type_key是否已存在 existing = UserTypeConfig.query.filter_by(type_key=data.get('type_key')).first() if existing: return jsonify({'error': '类型标识已存在'}), 400 user_type = UserTypeConfig( type_key=data.get('type_key'), display_name=data.get('display_name'), daily_translations=int(data.get('daily_translations', 10)), max_pages=int(data.get('max_pages', 50)), max_file_size=int(data.get('max_file_size_mb', 30)) * 1024 * 1024, features=data.get('features', '[]'), sort_order=int(data.get('sort_order', 0)), is_active=data.get('is_active', True), is_system=False, ) db.session.add(user_type) db.session.commit() # 记录日志 log = OperationLog( user_id=session.get('user_id'), username='admin', action='add_user_type', target=user_type.type_key, detail=json.dumps(user_type.to_dict(), ensure_ascii=False) ) db.session.add(log) db.session.commit() if request.is_json: return jsonify({'success': True, 'user_type': user_type.to_dict()}) flash('用户类型已添加', 'success') return redirect(url_for('admin.user_types')) # 所有可用功能 all_features = [ {'key': 'basic_translate', 'name': '基础翻译'}, {'key': 'compare_view', 'name': '对照查看'}, {'key': 'retranslate', 'name': '重新翻译'}, {'key': 'history', 'name': '历史记录'}, {'key': 'priority_queue', 'name': '优先队列'}, {'key': 'export_pdf', 'name': '导出PDF'}, {'key': 'batch_translate', 'name': '批量翻译'}, {'key': 'custom_terms', 'name': '自定义术语'}, ] return render_template('admin/user_type_form.html', user_type=None, all_features=all_features) @admin_bp.route('/user-types//edit', methods=['GET', 'POST']) @admin_required def edit_user_type(type_id): """编辑用户类型""" user_type = UserTypeConfig.query.get_or_404(type_id) if request.method == 'POST': data = request.json if request.is_json else request.form user_type.display_name = data.get('display_name', user_type.display_name) user_type.daily_translations = int(data.get('daily_translations', user_type.daily_translations)) user_type.max_pages = int(data.get('max_pages', user_type.max_pages)) user_type.max_file_size = int(data.get('max_file_size_mb', user_type.max_file_size // 1024 // 1024)) * 1024 * 1024 user_type.features = data.get('features', user_type.features) user_type.sort_order = int(data.get('sort_order', user_type.sort_order)) user_type.is_active = data.get('is_active', True) if isinstance(data.get('is_active'), bool) else data.get('is_active') == 'true' db.session.commit() # 记录日志 log = OperationLog( user_id=session.get('user_id'), username='admin', action='edit_user_type', target=user_type.type_key, detail=json.dumps(user_type.to_dict(), ensure_ascii=False) ) db.session.add(log) db.session.commit() if request.is_json: return jsonify({'success': True, 'user_type': user_type.to_dict()}) flash('用户类型已更新', 'success') return redirect(url_for('admin.user_types')) # 所有可用功能 all_features = [ {'key': 'basic_translate', 'name': '基础翻译'}, {'key': 'compare_view', 'name': '对照查看'}, {'key': 'retranslate', 'name': '重新翻译'}, {'key': 'history', 'name': '历史记录'}, {'key': 'priority_queue', 'name': '优先队列'}, {'key': 'export_pdf', 'name': '导出PDF'}, {'key': 'batch_translate', 'name': '批量翻译'}, {'key': 'custom_terms', 'name': '自定义术语'}, ] return render_template('admin/user_type_form.html', user_type=user_type, all_features=all_features) @admin_bp.route('/user-types//delete', methods=['POST']) @admin_required def delete_user_type(type_id): """删除用户类型""" user_type = UserTypeConfig.query.get_or_404(type_id) # 系统内置类型不可删除 if user_type.is_system: return jsonify({'error': '系统内置类型不可删除'}), 400 # 检查是否有用户使用此类型 users_count = User.query.filter_by(user_type=user_type.type_key).count() if users_count > 0: return jsonify({'error': f'有 {users_count} 个用户使用此类型,请先修改用户类型'}), 400 type_key = user_type.type_key db.session.delete(user_type) db.session.commit() # 记录日志 log = OperationLog( user_id=session.get('user_id'), username='admin', action='delete_user_type', target=type_key ) db.session.add(log) db.session.commit() return jsonify({'success': True}) @admin_bp.route('/user-types//toggle', methods=['POST']) @admin_required def toggle_user_type(type_id): """切换用户类型状态""" user_type = UserTypeConfig.query.get_or_404(type_id) user_type.is_active = not user_type.is_active db.session.commit() return jsonify({'success': True, 'is_active': user_type.is_active}) @admin_bp.route('/user-types/init', methods=['POST']) @admin_required def init_user_types(): """初始化默认用户类型""" init_default_user_types() return jsonify({'success': True}) def init_default_user_types(): """初始化默认用户类型配置""" from config import USER_LIMITS defaults = [ ('guest', '访客', USER_LIMITS.get('guest', {}), 0, True), ('free', '免费用户', USER_LIMITS.get('free', {}), 1, True), ('vip_basic', '基础会员', USER_LIMITS.get('vip_basic', {}), 2, True), ('vip_pro', '专业会员', USER_LIMITS.get('vip_pro', {}), 3, True), ('vip_enterprise', '企业会员', USER_LIMITS.get('vip_enterprise', {}), 4, True), ] for type_key, display_name, limits, sort_order, is_system in defaults: existing = UserTypeConfig.query.filter_by(type_key=type_key).first() if not existing: user_type = UserTypeConfig( type_key=type_key, display_name=display_name, daily_translations=limits.get('daily_translations', 10), max_pages=limits.get('max_pages', 50), max_file_size=limits.get('max_file_size', 30*1024*1024), features=json.dumps(limits.get('features', []), ensure_ascii=False), sort_order=sort_order, is_active=True, is_system=is_system, ) db.session.add(user_type) db.session.commit() # ==================== 会员套餐配置管理(动态增删) ==================== @admin_bp.route('/membership-plans') @admin_required def membership_plans(): """会员套餐配置列表""" plans = MembershipPlanConfig.query.order_by(MembershipPlanConfig.sort_order).all() # 如果数据库中没有数据,初始化默认配置 if not plans: init_default_membership_plans() plans = MembershipPlanConfig.query.order_by(MembershipPlanConfig.sort_order).all() # 获取所有用户类型供选择 user_types = UserTypeConfig.query.filter_by(is_active=True).all() return render_template('admin/membership_plans.html', plans=plans, user_types=user_types) @admin_bp.route('/membership-plans/add', methods=['GET', 'POST']) @admin_required def add_membership_plan(): """添加会员套餐""" if request.method == 'POST': data = request.json if request.is_json else request.form # 检查plan_key是否已存在 existing = MembershipPlanConfig.query.filter_by(plan_key=data.get('plan_key')).first() if existing: return jsonify({'error': '套餐标识已存在'}), 400 plan = MembershipPlanConfig( plan_key=data.get('plan_key'), display_name=data.get('display_name'), price=float(data.get('price', 0)), original_price=float(data.get('original_price')) if data.get('original_price') else None, period=data.get('period', 'month'), period_days=int(data.get('period_days', 30)), description=data.get('description'), user_type_key=data.get('user_type_key'), sort_order=int(data.get('sort_order', 0)), is_active=data.get('is_active', True), is_recommended=data.get('is_recommended', False), is_system=False, ) db.session.add(plan) db.session.commit() # 记录日志 log = OperationLog( user_id=session.get('user_id'), username='admin', action='add_membership_plan', target=plan.plan_key, detail=json.dumps(plan.to_dict(), ensure_ascii=False) ) db.session.add(log) db.session.commit() if request.is_json: return jsonify({'success': True, 'plan': plan.to_dict()}) flash('会员套餐已添加', 'success') return redirect(url_for('admin.membership_plans')) # 获取所有用户类型供选择 user_types = UserTypeConfig.query.filter_by(is_active=True).all() return render_template('admin/membership_plan_form.html', plan=None, user_types=user_types) @admin_bp.route('/membership-plans//edit', methods=['GET', 'POST']) @admin_required def edit_membership_plan(plan_id): """编辑会员套餐""" plan = MembershipPlanConfig.query.get_or_404(plan_id) if request.method == 'POST': data = request.json if request.is_json else request.form plan.display_name = data.get('display_name', plan.display_name) plan.price = float(data.get('price', plan.price)) plan.original_price = float(data.get('original_price')) if data.get('original_price') else None plan.period = data.get('period', plan.period) plan.period_days = int(data.get('period_days', plan.period_days)) plan.description = data.get('description', plan.description) plan.user_type_key = data.get('user_type_key', plan.user_type_key) plan.sort_order = int(data.get('sort_order', plan.sort_order)) plan.is_active = data.get('is_active', True) if isinstance(data.get('is_active'), bool) else data.get('is_active') == 'true' plan.is_recommended = data.get('is_recommended', False) if isinstance(data.get('is_recommended'), bool) else data.get('is_recommended') == 'true' db.session.commit() # 记录日志 log = OperationLog( user_id=session.get('user_id'), username='admin', action='edit_membership_plan', target=plan.plan_key, detail=json.dumps(plan.to_dict(), ensure_ascii=False) ) db.session.add(log) db.session.commit() if request.is_json: return jsonify({'success': True, 'plan': plan.to_dict()}) flash('会员套餐已更新', 'success') return redirect(url_for('admin.membership_plans')) # 获取所有用户类型供选择 user_types = UserTypeConfig.query.filter_by(is_active=True).all() return render_template('admin/membership_plan_form.html', plan=plan, user_types=user_types) @admin_bp.route('/membership-plans//delete', methods=['POST']) @admin_required def delete_membership_plan(plan_id): """删除会员套餐""" plan = MembershipPlanConfig.query.get_or_404(plan_id) # 系统内置套餐不可删除 if plan.is_system: return jsonify({'error': '系统内置套餐不可删除'}), 400 plan_key = plan.plan_key db.session.delete(plan) db.session.commit() # 记录日志 log = OperationLog( user_id=session.get('user_id'), username='admin', action='delete_membership_plan', target=plan_key ) db.session.add(log) db.session.commit() return jsonify({'success': True}) @admin_bp.route('/membership-plans//toggle', methods=['POST']) @admin_required def toggle_membership_plan(plan_id): """切换会员套餐状态""" plan = MembershipPlanConfig.query.get_or_404(plan_id) plan.is_active = not plan.is_active db.session.commit() return jsonify({'success': True, 'is_active': plan.is_active}) @admin_bp.route('/membership-plans//recommend', methods=['POST']) @admin_required def recommend_membership_plan(plan_id): """设置推荐套餐""" plan = MembershipPlanConfig.query.get_or_404(plan_id) plan.is_recommended = not plan.is_recommended db.session.commit() return jsonify({'success': True, 'is_recommended': plan.is_recommended}) @admin_bp.route('/membership-plans/init', methods=['POST']) @admin_required def init_membership_plans(): """初始化默认会员套餐""" init_default_membership_plans() return jsonify({'success': True}) def init_default_membership_plans(): """初始化默认会员套餐配置""" from config import MEMBERSHIP_PLANS defaults = [ ('vip_basic', MEMBERSHIP_PLANS.get('vip_basic', {}), 'vip_basic', 0, True), ('vip_pro', MEMBERSHIP_PLANS.get('vip_pro', {}), 'vip_pro', 1, True), ('vip_enterprise', MEMBERSHIP_PLANS.get('vip_enterprise', {}), 'vip_enterprise', 2, True), ] for plan_key, plan_data, user_type_key, sort_order, is_system in defaults: existing = MembershipPlanConfig.query.filter_by(plan_key=plan_key).first() if not existing: plan = MembershipPlanConfig( plan_key=plan_key, display_name=plan_data.get('name', plan_key), price=plan_data.get('price', 0), original_price=None, period=plan_data.get('period', 'month'), period_days=30 if plan_data.get('period') == 'month' else 365 if plan_data.get('period') == 'year' else 90, description=plan_data.get('description', ''), user_type_key=user_type_key, sort_order=sort_order, is_active=True, is_recommended=False, is_system=is_system, ) db.session.add(plan) db.session.commit() # ==================== API: 获取用户限制配置(供其他模块使用) ==================== def get_user_limits(user_type_key): """获取指定用户类型的限制配置""" config = UserTypeConfig.query.filter_by(type_key=user_type_key, is_active=True).first() if config: return { 'daily_translations': config.daily_translations, 'max_pages': config.max_pages, 'max_file_size': config.max_file_size, 'features': config.get_features(), } # 如果数据库中没有,使用默认配置 from config import USER_LIMITS return USER_LIMITS.get(user_type_key, USER_LIMITS.get('free', {})) def get_all_user_types(): """获取所有用户类型配置""" types = UserTypeConfig.query.filter_by(is_active=True).order_by(UserTypeConfig.sort_order).all() return {t.type_key: get_user_limits(t.type_key) for t in types} def get_membership_plan(plan_key): """获取指定会员套餐配置""" plan = MembershipPlanConfig.query.filter_by(plan_key=plan_key, is_active=True).first() if plan: return plan.to_dict() from config import MEMBERSHIP_PLANS return MEMBERSHIP_PLANS.get(plan_key, {}) def get_all_membership_plans(): """获取所有会员套餐配置""" plans = MembershipPlanConfig.query.filter_by(is_active=True).order_by(MembershipPlanConfig.sort_order).all() return [p.to_dict() for p in plans] # ==================== 备用大模型接口管理 ==================== @admin_bp.route('/backup-llm') @admin_required def backup_llm_list(): """备用大模型接口列表 - 重定向到大模型配置页""" return redirect(url_for('admin.llm_config')) @admin_bp.route('/backup-llm/add', methods=['POST']) @admin_required def add_backup_llm(): """添加大模型接口""" data = request.json config = BackupLLMConfig( provider_name=data.get('provider_name'), api_base=data.get('api_base'), api_key=data.get('api_key'), model=data.get('model'), max_tokens=int(data.get('max_tokens', 8000)), chunk_size=int(data.get('chunk_size', 2000)), timeout=int(data.get('timeout', 180)), is_active=data.get('is_active', True), is_default=False, sort_order=int(data.get('sort_order', 0)), description=data.get('description'), ) db.session.add(config) db.session.commit() # 记录日志 log = OperationLog( user_id=session.get('user_id'), username='admin', action='add_llm_config', target=config.provider_name, detail=json.dumps(config.to_dict(), ensure_ascii=False) ) db.session.add(log) db.session.commit() return jsonify({'success': True, 'config': config.to_dict()}) @admin_bp.route('/backup-llm//edit', methods=['POST']) @admin_required def edit_backup_llm(config_id): """编辑大模型接口""" config = BackupLLMConfig.query.get_or_404(config_id) data = request.json config.provider_name = data.get('provider_name', config.provider_name) config.api_base = data.get('api_base', config.api_base) config.api_key = data.get('api_key', config.api_key) config.model = data.get('model', config.model) config.max_tokens = int(data.get('max_tokens', config.max_tokens)) config.chunk_size = int(data.get('chunk_size', config.chunk_size)) config.timeout = int(data.get('timeout', config.timeout)) config.is_active = data.get('is_active', True) config.sort_order = int(data.get('sort_order', config.sort_order)) config.description = data.get('description', config.description) db.session.commit() # 记录日志 log = OperationLog( user_id=session.get('user_id'), username='admin', action='edit_llm_config', target=config.provider_name, detail=json.dumps(config.to_dict(), ensure_ascii=False) ) db.session.add(log) db.session.commit() return jsonify({'success': True, 'config': config.to_dict()}) @admin_bp.route('/backup-llm//delete', methods=['POST']) @admin_required def delete_backup_llm(config_id): """删除备用大模型接口""" config = BackupLLMConfig.query.get_or_404(config_id) provider_name = config.provider_name db.session.delete(config) db.session.commit() # 记录日志 log = OperationLog( user_id=session.get('user_id'), username='admin', action='delete_backup_llm', target=provider_name ) db.session.add(log) db.session.commit() return jsonify({'success': True}) @admin_bp.route('/backup-llm//toggle', methods=['POST']) @admin_required def toggle_backup_llm(config_id): """切换大模型接口状态""" config = BackupLLMConfig.query.get_or_404(config_id) config.is_active = not config.is_active db.session.commit() return jsonify({'success': True, 'is_active': config.is_active}) @admin_bp.route('/backup-llm//set-default', methods=['POST']) @admin_required def set_default_llm(config_id): """设置默认大模型接口""" config = BackupLLMConfig.query.get_or_404(config_id) # 先清除所有默认标记 BackupLLMConfig.query.update({'is_default': False}) # 设置当前为默认 config.is_default = True config.is_active = True # 默认的必须启用 db.session.commit() # 记录日志 log = OperationLog( user_id=session.get('user_id'), username='admin', action='set_default_llm', target=config.provider_name, detail=f'设置 {config.provider_name} 为默认大模型' ) db.session.add(log) db.session.commit() return jsonify({'success': True, 'config': config.to_dict()}) @admin_bp.route('/backup-llm//test', methods=['POST']) @admin_required def test_backup_llm(config_id): """测试备用大模型接口""" config = BackupLLMConfig.query.get_or_404(config_id) try: from openai import OpenAI client = OpenAI( api_key=config.api_key or 'sk-test', base_url=config.api_base, ) model = config.model or 'default' # 发送简单测试请求 response = client.chat.completions.create( model=model, messages=[{"role": "user", "content": "Hello"}], max_tokens=10, timeout=10, ) return jsonify({ 'success': True, 'provider': config.provider_name, 'model': model, 'response': response.choices[0].message.content[:50] if response.choices else 'OK' }) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @admin_bp.route('/backup-llm/init', methods=['POST']) @admin_required def init_backup_llm(): """初始化默认备用大模型""" init_default_backup_llm() return jsonify({'success': True}) def init_default_backup_llm(): """初始化默认备用大模型接口配置""" defaults = [ ('本地LM Studio', 'http://localhost:1234/v1', None, None, 0), ('OpenAI', 'https://api.openai.com/v1', None, 'gpt-4', 1), ('DeepSeek', 'https://api.deepseek.com/v1', None, 'deepseek-chat', 2), ('阿里百炼', 'https://dashscope.aliyuncs.com/compatible-mode/v1', None, 'qwen-turbo', 3), ('SiliconFlow', 'https://api.siliconflow.cn/v1', None, 'Qwen/Qwen2.5-72B-Instruct', 4), ] for provider_name, api_base, api_key, model, sort_order in defaults: existing = BackupLLMConfig.query.filter_by(provider_name=provider_name).first() if not existing: config = BackupLLMConfig( provider_name=provider_name, api_base=api_base, api_key=api_key, model=model, is_active=True, sort_order=sort_order, description=f'{provider_name} 默认接口', ) db.session.add(config) db.session.commit() def get_backup_llm_configs(): """获取所有备用大模型配置(供其他模块使用)""" configs = BackupLLMConfig.query.filter_by(is_active=True).order_by(BackupLLMConfig.sort_order).all() return [c.to_dict() for c in configs] # ==================== 获取网站基础配置(供其他模块使用) ==================== def get_site_config(): """获取网站基础配置""" return { 'site_name': DynamicConfig.get('site_name', 'PDF翻译助手'), 'site_footer': DynamicConfig.get('site_footer', '© 2026 PDF翻译助手'), 'max_file_size': DynamicConfig.get('max_file_size', 50), 'cache_expire_days': DynamicConfig.get('cache_expire_days', 30), 'enable_email_notify': DynamicConfig.get('enable_email_notify', True), 'enable_cache': DynamicConfig.get('enable_cache', True), 'enable_guest': DynamicConfig.get('enable_guest', True), 'default_source_lang': DynamicConfig.get('default_source_lang', 'en'), 'default_target_lang': DynamicConfig.get('default_target_lang', 'zh'), }