""" 后台管理模块 V2 """ from datetime import datetime, date, timedelta from functools import wraps from flask import Blueprint, render_template, request, jsonify, redirect, url_for, session, flash from sqlalchemy import func, desc import json from models import (db, User, Translation, TranslationCache, GuestTranslation, SystemConfig, OperationLog, DataPackage, UserPackage, DynamicConfig) from config import USER_LIMITS, MEMBERSHIP_PLANS admin_bp = Blueprint('admin', __name__, url_prefix='/admin') # ==================== 权限装饰器 ==================== def admin_required(f): """管理员权限装饰器""" @wraps(f) def decorated_function(*args, **kwargs): user_id = session.get('user_id') if not user_id: return redirect(url_for('login', next=request.url)) user = User.query.get(user_id) if not user or not user.is_admin: flash('需要管理员权限', 'error') return redirect(url_for('index')) return f(*args, **kwargs) return decorated_function # ==================== 后台首页 ==================== @admin_bp.route('/') @admin_required def dashboard(): """后台首页 - 数据概览""" # 用户统计 total_users = User.query.count() new_users_today = User.query.filter( func.date(User.created_at) == date.today() ).count() vip_users = User.query.filter(User.user_type.startswith('vip')).count() # 翻译统计 total_translations = Translation.query.count() today_translations = Translation.query.filter( func.date(Translation.created_at) == date.today() ).count() # 缓存统计 total_cache = TranslationCache.query.count() total_cache_hits = db.session.query(func.sum(TranslationCache.hit_count)).scalar() or 0 total_cache_size = db.session.query(func.sum(TranslationCache.file_size)).scalar() or 0 # 数据包销售统计 total_packages_sold = UserPackage.query.filter_by(payment_status='paid').count() total_revenue = db.session.query(func.sum(UserPackage.price_paid)).filter( UserPackage.payment_status == 'paid' ).scalar() or 0 # 最近翻译 recent_translations = Translation.query.order_by(desc(Translation.created_at)).limit(10).all() # 最近用户 recent_users = User.query.order_by(desc(User.created_at)).limit(10).all() # 每日翻译趋势(最近7天) daily_stats = [] for i in range(6, -1, -1): d = date.today() - timedelta(days=i) count = Translation.query.filter(func.date(Translation.created_at) == d).count() daily_stats.append({'date': d.strftime('%m-%d'), 'count': count}) return render_template('admin/dashboard.html', total_users=total_users, new_users_today=new_users_today, vip_users=vip_users, total_translations=total_translations, today_translations=today_translations, total_cache=total_cache, total_cache_hits=total_cache_hits, total_cache_size=total_cache_size, total_packages_sold=total_packages_sold, total_revenue=total_revenue, recent_translations=recent_translations, recent_users=recent_users, daily_stats=daily_stats, ) # ==================== 用户管理 ==================== @admin_bp.route('/users') @admin_required def users(): """用户列表""" page = request.args.get('page', 1, type=int) search = request.args.get('search', '') user_type = request.args.get('type', '') query = User.query if search: query = query.filter( (User.username.contains(search)) | (User.email.contains(search)) ) if user_type: query = query.filter_by(user_type=user_type) users = query.order_by(desc(User.created_at)).paginate(page=page, per_page=20) return render_template('admin/users.html', users=users, search=search, user_type=user_type, user_types=['free', 'vip_basic', 'vip_pro', 'vip_enterprise', 'admin'] ) @admin_bp.route('/user/', methods=['GET', 'POST']) @admin_required def user_detail(user_id): """用户详情/编辑""" user = User.query.get_or_404(user_id) if request.method == 'POST': data = request.json if request.is_json else request.form user.username = data.get('username', user.username) user.email = data.get('email', user.email) user.user_type = data.get('user_type', user.user_type) user.is_active = data.get('is_active', user.is_active) == 'true' or data.get('is_active') == True user.is_admin = data.get('is_admin', user.is_admin) == 'true' or data.get('is_admin') == True # 会员到期时间 expire_str = data.get('membership_expire') if expire_str: try: user.membership_expire = datetime.fromisoformat(expire_str) except: pass # 密码 new_password = data.get('new_password') if new_password: user.set_password(new_password) db.session.commit() # 记录日志 log = OperationLog( user_id=session.get('user_id'), username='admin', action='edit_user', target=user.username, detail=json.dumps(data, ensure_ascii=False) if isinstance(data, dict) else str(data) ) db.session.add(log) db.session.commit() if request.is_json: return jsonify({'success': True, 'user': user.to_dict()}) flash('用户信息已更新', 'success') return redirect(url_for('admin.user_detail', user_id=user_id)) translations = Translation.query.filter_by(user_id=user_id).order_by(desc(Translation.created_at)).limit(20).all() packages = UserPackage.query.filter_by(user_id=user_id).order_by(desc(UserPackage.purchased_at)).all() return render_template('admin/user_detail.html', user=user, translations=translations, packages=packages) @admin_bp.route('/user//delete', methods=['POST']) @admin_required def delete_user(user_id): """删除用户""" user = User.query.get_or_404(user_id) if user.is_admin and User.query.filter_by(is_admin=True).count() <= 1: return jsonify({'error': '不能删除最后一个管理员'}), 400 username = user.username db.session.delete(user) db.session.commit() log = OperationLog( user_id=session.get('user_id'), username='admin', action='delete_user', target=username ) db.session.add(log) db.session.commit() return jsonify({'success': True}) # ==================== 翻译记录管理 ==================== @admin_bp.route('/translations') @admin_required def translations(): """翻译记录列表""" page = request.args.get('page', 1, type=int) status = request.args.get('status', '') search = request.args.get('search', '') query = Translation.query if status: query = query.filter_by(status=status) if search: query = query.filter(Translation.original_filename.contains(search)) translations = query.order_by(desc(Translation.created_at)).paginate(page=page, per_page=20) return render_template('admin/translations.html', translations=translations, status=status, search=search ) @admin_bp.route('/translation/') @admin_required def translation_detail(trans_id): """翻译详情""" translation = Translation.query.get_or_404(trans_id) user = User.query.get(translation.user_id) if translation.user_id else None return render_template('admin/translation_detail.html', translation=translation, user=user ) @admin_bp.route('/translation//delete', methods=['POST']) @admin_required def delete_translation(trans_id): """删除翻译记录""" translation = Translation.query.get_or_404(trans_id) if translation.output_path: import os if os.path.exists(translation.output_path): os.remove(translation.output_path) db.session.delete(translation) db.session.commit() return jsonify({'success': True}) # ==================== 缓存管理 ==================== @admin_bp.route('/cache') @admin_required def cache_list(): """缓存列表""" page = request.args.get('page', 1, type=int) caches = TranslationCache.query.order_by(desc(TranslationCache.hit_count)).paginate(page=page, per_page=20) total_size = db.session.query(func.sum(TranslationCache.file_size)).scalar() or 0 total_hits = db.session.query(func.sum(TranslationCache.hit_count)).scalar() or 0 return render_template('admin/cache.html', caches=caches, total_size=total_size, total_hits=total_hits ) @admin_bp.route('/cache//delete', methods=['POST']) @admin_required def delete_cache(cache_id): """删除缓存""" cache = TranslationCache.query.get_or_404(cache_id) if cache.cache_path: import os if os.path.exists(cache.cache_path): os.remove(cache.cache_path) db.session.delete(cache) db.session.commit() return jsonify({'success': True}) @admin_bp.route('/cache/clear', methods=['POST']) @admin_required def clear_cache(): """清空缓存""" import os caches = TranslationCache.query.all() for cache in caches: if cache.cache_path and os.path.exists(cache.cache_path): os.remove(cache.cache_path) TranslationCache.query.delete() db.session.commit() return jsonify({'success': True, 'deleted': len(caches)}) # ==================== 系统配置 V2 ==================== @admin_bp.route('/settings', methods=['GET', 'POST']) @admin_required def settings(): """系统配置""" if request.method == 'POST': data = request.json if request.is_json else request.form for key, value in data.items(): SystemConfig.set(key, value) if request.is_json: return jsonify({'success': True}) flash('配置已保存', 'success') # 获取所有配置 configs = SystemConfig.query.all() config_dict = {c.key: c.value for c in configs} # 获取动态配置 dynamic_configs = DynamicConfig.query.all() return render_template('admin/settings.html', configs=config_dict, dynamic_configs=dynamic_configs, user_limits=USER_LIMITS, membership_plans=MEMBERSHIP_PLANS ) # ==================== 用户权限配置 ==================== @admin_bp.route('/settings/user-limits', methods=['GET', 'POST']) @admin_required def user_limits_settings(): """用户权限配置""" if request.method == 'POST': data = request.json # 保存每个用户类型的配置 for user_type, limits in data.items(): for key, value in limits.items(): config_key = f"user_limit_{user_type}_{key}" DynamicConfig.set( config_key, value, category='user_limits', value_type='int' if isinstance(value, int) else 'string', user_id=session.get('user_id') ) return jsonify({'success': True}) # 获取当前配置 limits_config = {} for user_type in ['guest', 'free', 'vip_basic', 'vip_pro', 'vip_enterprise']: limits_config[user_type] = { 'daily_translations': DynamicConfig.get(f'user_limit_{user_type}_daily_translations', USER_LIMITS.get(user_type, {}).get('daily_translations', 10)), 'max_pages': DynamicConfig.get(f'user_limit_{user_type}_max_pages', USER_LIMITS.get(user_type, {}).get('max_pages', 50)), 'max_file_size': DynamicConfig.get(f'user_limit_{user_type}_max_file_size', USER_LIMITS.get(user_type, {}).get('max_file_size', 30*1024*1024)), } return render_template('admin/user_limits.html', limits_config=limits_config, default_limits=USER_LIMITS ) # ==================== 会员套餐配置 ==================== @admin_bp.route('/settings/membership', methods=['GET', 'POST']) @admin_required def membership_settings(): """会员套餐配置""" if request.method == 'POST': data = request.json for plan_key, plan_data in data.items(): for key, value in plan_data.items(): config_key = f"membership_{plan_key}_{key}" value_type = 'int' if isinstance(value, int) else 'float' if isinstance(value, float) else 'string' DynamicConfig.set( config_key, value, category='membership', value_type=value_type, user_id=session.get('user_id') ) return jsonify({'success': True}) # 获取当前配置 plans_config = {} for plan_key in ['vip_basic', 'vip_pro', 'vip_enterprise']: default = MEMBERSHIP_PLANS.get(plan_key, {}) plans_config[plan_key] = { 'name': DynamicConfig.get(f'membership_{plan_key}_name', default.get('name', plan_key)), 'price': DynamicConfig.get(f'membership_{plan_key}_price', default.get('price', 0)), 'period': DynamicConfig.get(f'membership_{plan_key}_period', default.get('period', 'month')), 'description': DynamicConfig.get(f'membership_{plan_key}_description', default.get('description', '')), } return render_template('admin/membership.html', plans_config=plans_config, default_plans=MEMBERSHIP_PLANS ) # ==================== 数据包套餐管理 ==================== @admin_bp.route('/packages') @admin_required def packages(): """数据包套餐列表""" packages = DataPackage.query.order_by(DataPackage.sort_order).all() return render_template('admin/packages.html', packages=packages) @admin_bp.route('/package/add', methods=['GET', 'POST']) @admin_required def add_package(): """添加数据包套餐""" if request.method == 'POST': data = request.json if request.is_json else request.form package = DataPackage( name=data.get('name'), description=data.get('description'), translation_count=int(data.get('translation_count', 0)), price=float(data.get('price', 0)), original_price=float(data.get('original_price')) if data.get('original_price') else None, valid_days=int(data.get('valid_days', 30)), sort_order=int(data.get('sort_order', 0)), is_active=data.get('is_active', 'true') == 'true' if isinstance(data.get('is_active'), str) else data.get('is_active', True), is_recommended=data.get('is_recommended', 'true') == 'true' if isinstance(data.get('is_recommended'), str) else data.get('is_recommended', False), ) db.session.add(package) db.session.commit() if request.is_json: return jsonify({'success': True, 'package': package.to_dict()}) flash('数据包套餐已添加', 'success') return redirect(url_for('admin.packages')) return render_template('admin/package_form.html', package=None) @admin_bp.route('/package//edit', methods=['GET', 'POST']) @admin_required def edit_package(package_id): """编辑数据包套餐""" package = DataPackage.query.get_or_404(package_id) if request.method == 'POST': data = request.json if request.is_json else request.form package.name = data.get('name', package.name) package.description = data.get('description', package.description) package.translation_count = int(data.get('translation_count', package.translation_count)) package.price = float(data.get('price', package.price)) package.original_price = float(data.get('original_price')) if data.get('original_price') else None package.valid_days = int(data.get('valid_days', package.valid_days)) package.sort_order = int(data.get('sort_order', package.sort_order)) package.is_active = data.get('is_active', 'true') == 'true' if isinstance(data.get('is_active'), str) else data.get('is_active', True) package.is_recommended = data.get('is_recommended', 'true') == 'true' if isinstance(data.get('is_recommended'), str) else data.get('is_recommended', False) db.session.commit() if request.is_json: return jsonify({'success': True, 'package': package.to_dict()}) flash('数据包套餐已更新', 'success') return redirect(url_for('admin.packages')) return render_template('admin/package_form.html', package=package) @admin_bp.route('/package//delete', methods=['POST']) @admin_required def delete_package(package_id): """删除数据包套餐""" package = DataPackage.query.get_or_404(package_id) db.session.delete(package) db.session.commit() return jsonify({'success': True}) @admin_bp.route('/package//toggle', methods=['POST']) @admin_required def toggle_package(package_id): """切换数据包套餐状态""" package = DataPackage.query.get_or_404(package_id) package.is_active = not package.is_active db.session.commit() return jsonify({'success': True, 'is_active': package.is_active}) # ==================== 操作日志 ==================== @admin_bp.route('/logs') @admin_required def logs(): """操作日志""" page = request.args.get('page', 1, type=int) action = request.args.get('action', '') query = OperationLog.query if action: query = query.filter_by(action=action) logs = query.order_by(desc(OperationLog.created_at)).paginate(page=page, per_page=50) return render_template('admin/logs.html', logs=logs, action=action) # ==================== 统计报表 ==================== @admin_bp.route('/stats') @admin_required def stats(): """统计报表""" # 用户增长趋势(最近30天) user_growth = [] for i in range(29, -1, -1): d = date.today() - timedelta(days=i) count = User.query.filter(func.date(User.created_at) == d).count() user_growth.append({'date': d.strftime('%Y-%m-%d'), 'count': count}) # 翻译量趋势(最近30天) translation_growth = [] for i in range(29, -1, -1): d = date.today() - timedelta(days=i) count = Translation.query.filter(func.date(Translation.created_at) == d).count() translation_growth.append({'date': d.strftime('%Y-%m-%d'), 'count': count}) # 收入趋势(最近30天) revenue_growth = [] for i in range(29, -1, -1): d = date.today() - timedelta(days=i) revenue = db.session.query(func.sum(UserPackage.price_paid)).filter( UserPackage.payment_status == 'paid', func.date(UserPackage.purchased_at) == d ).scalar() or 0 revenue_growth.append({'date': d.strftime('%Y-%m-%d'), 'revenue': float(revenue)}) # 用户类型分布 user_distribution = db.session.query( User.user_type, func.count(User.id) ).group_by(User.user_type).all() # 翻译状态分布 status_distribution = db.session.query( Translation.status, func.count(Translation.id) ).group_by(Translation.status).all() # Top用户 top_users = db.session.query( User.username, func.count(Translation.id).label('count') ).join(Translation).group_by(User.id).order_by(desc('count')).limit(10).all() # 数据包销售排行 top_packages = db.session.query( DataPackage.name, func.count(UserPackage.id).label('count'), func.sum(UserPackage.price_paid).label('revenue') ).join(UserPackage).filter( UserPackage.payment_status == 'paid' ).group_by(DataPackage.id).order_by(desc('count')).limit(10).all() return render_template('admin/stats.html', user_growth=user_growth, translation_growth=translation_growth, revenue_growth=revenue_growth, user_distribution=user_distribution, status_distribution=status_distribution, top_users=top_users, top_packages=top_packages ) # ==================== API接口 ==================== @admin_bp.route('/api/stats/summary') @admin_required def api_stats_summary(): """API: 统计摘要""" return jsonify({ 'total_users': User.query.count(), 'today_users': User.query.filter(func.date(User.created_at) == date.today()).count(), 'total_translations': Translation.query.count(), 'today_translations': Translation.query.filter(func.date(Translation.created_at) == date.today()).count(), 'total_cache': TranslationCache.query.count(), 'cache_hits': db.session.query(func.sum(TranslationCache.hit_count)).scalar() or 0, 'total_packages': DataPackage.query.filter_by(is_active=True).count(), 'packages_sold': UserPackage.query.filter_by(payment_status='paid').count(), 'total_revenue': float(db.session.query(func.sum(UserPackage.price_paid)).filter( UserPackage.payment_status == 'paid' ).scalar() or 0), }) @admin_bp.route('/api/user//upgrade', methods=['POST']) @admin_required def api_user_upgrade(user_id): """API: 升级用户会员""" user = User.query.get_or_404(user_id) data = request.json user_type = data.get('user_type') months = data.get('months', 1) valid_types = ['free', 'vip_basic', 'vip_pro', 'vip_enterprise', 'admin'] if user_type not in valid_types: return jsonify({'error': '无效的用户类型'}), 400 user.user_type = user_type if user_type.startswith('vip'): user.membership_expire = datetime.utcnow() + timedelta(days=30 * months) db.session.commit() return jsonify({'success': True, 'user': user.to_dict()}) @admin_bp.route('/api/user//add-package', methods=['POST']) @admin_required def api_user_add_package(user_id): """API: 为用户添加数据包""" user = User.query.get_or_404(user_id) data = request.json package_id = data.get('package_id') package = DataPackage.query.get(package_id) if not package: return jsonify({'error': '数据包不存在'}), 404 user_package = UserPackage( user_id=user.id, package_id=package.id, package_name=package.name, translation_count=package.translation_count, remaining_count=package.translation_count, expire_at=datetime.utcnow() + timedelta(days=package.valid_days) if package.valid_days > 0 else None, price_paid=0, # 管理员赠送 payment_status='paid' ) db.session.add(user_package) db.session.commit() return jsonify({'success': True, 'package': { 'id': user_package.id, 'name': user_package.package_name, 'remaining': user_package.remaining_count }}) # ==================== LLM大模型配置 ==================== @admin_bp.route('/llm_config') @admin_required def llm_config(): """LLM配置页面""" from config import LLM_CONFIG # 从数据库获取配置,如果没有则使用默认值 config = { 'api_base': DynamicConfig.get('llm_api_base', LLM_CONFIG.get('api_base')), 'api_key': DynamicConfig.get('llm_api_key', LLM_CONFIG.get('api_key')), 'model': DynamicConfig.get('llm_model', LLM_CONFIG.get('model')), 'max_tokens': DynamicConfig.get('llm_max_tokens', LLM_CONFIG.get('max_tokens')), 'chunk_size': DynamicConfig.get('llm_chunk_size', LLM_CONFIG.get('chunk_size')), 'timeout': DynamicConfig.get('llm_timeout', LLM_CONFIG.get('timeout')), } return render_template('admin/llm_config.html', config=config) @admin_bp.route('/llm_config/save', methods=['POST']) @admin_required def save_llm_config(): """保存LLM配置""" data = request.json DynamicConfig.set('llm_api_base', data.get('api_base'), category='llm', user_id=session.get('user_id')) DynamicConfig.set('llm_api_key', data.get('api_key'), category='llm', user_id=session.get('user_id')) DynamicConfig.set('llm_model', data.get('model'), category='llm', user_id=session.get('user_id')) DynamicConfig.set('llm_max_tokens', data.get('max_tokens'), category='llm', value_type='int', user_id=session.get('user_id')) DynamicConfig.set('llm_chunk_size', data.get('chunk_size'), category='llm', value_type='int', user_id=session.get('user_id')) DynamicConfig.set('llm_timeout', data.get('timeout'), category='llm', value_type='int', user_id=session.get('user_id')) # 记录日志 log = OperationLog( user_id=session.get('user_id'), username='admin', action='update_llm_config', detail='更新大模型配置' ) db.session.add(log) db.session.commit() return jsonify({'success': True}) @admin_bp.route('/llm_config/test', methods=['POST']) @admin_required def test_llm_connection(): """测试LLM连接""" data = request.json try: from openai import OpenAI client = OpenAI( api_key=data.get('api_key', 'sk-test'), base_url=data.get('api_base'), ) # 发送简单测试请求 response = client.chat.completions.create( model=data.get('model'), messages=[{"role": "user", "content": "Hello"}], max_tokens=10, timeout=10, ) return jsonify({ 'success': True, 'model': data.get('model'), 'response': response.choices[0].message.content[:50] if response.choices else 'OK' }) except Exception as e: return jsonify({'success': False, 'error': str(e)}) @admin_bp.route('/llm_config/reset', methods=['POST']) @admin_required def reset_llm_config(): """恢复默认LLM配置""" from config import LLM_CONFIG # 删除数据库中的LLM配置 DynamicConfig.query.filter_by(category='llm').delete() db.session.commit() return jsonify({'success': True}) # ==================== 获取当前LLM配置(供其他模块使用) ==================== def get_llm_config(): """获取当前LLM配置""" from config import LLM_CONFIG return { 'api_base': DynamicConfig.get('llm_api_base', LLM_CONFIG.get('api_base')), 'api_key': DynamicConfig.get('llm_api_key', LLM_CONFIG.get('api_key')), 'model': DynamicConfig.get('llm_model', LLM_CONFIG.get('model')), 'max_tokens': DynamicConfig.get('llm_max_tokens', LLM_CONFIG.get('max_tokens')), 'chunk_size': DynamicConfig.get('llm_chunk_size', LLM_CONFIG.get('chunk_size')), 'timeout': DynamicConfig.get('llm_timeout', LLM_CONFIG.get('timeout')), }