Files
pdf-translate-web/admin.py
coder 44077796f8 feat: 翻译记录添加不共享开关功能
- Translation 模型新增 no_share 字段
- 管理后台翻译记录页面添加共享状态列和切换按钮
- 不共享的翻译不会被其他用户使用缓存
- 缓存匹配时检查是否有 no_share 标记
2026-04-16 19:06:43 +08:00

1512 lines
54 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
后台管理模块 V2
"""
from datetime import datetime, date, timedelta
from functools import wraps
from flask import Blueprint, render_template, request, jsonify, redirect, url_for, session, flash
from sqlalchemy import func, desc
import json
from models import (db, User, Translation, TranslationCache, GuestTranslation,
SystemConfig, OperationLog, DataPackage, UserPackage, DynamicConfig,
UserTypeConfig, MembershipPlanConfig, BackupLLMConfig)
from config import USER_LIMITS, MEMBERSHIP_PLANS
admin_bp = Blueprint('admin', __name__, url_prefix='/admin')
# ==================== 权限装饰器 ====================
def admin_required(f):
"""管理员权限装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
user_id = session.get('user_id')
if not user_id:
return redirect(url_for('login', next=request.url))
user = User.query.get(user_id)
if not user or not user.is_admin:
flash('需要管理员权限', 'error')
return redirect(url_for('index'))
return f(*args, **kwargs)
return decorated_function
# ==================== 后台首页 ====================
@admin_bp.route('/')
@admin_required
def dashboard():
"""后台首页 - 数据概览"""
# 用户统计
total_users = User.query.count()
new_users_today = User.query.filter(
func.date(User.created_at) == date.today()
).count()
vip_users = User.query.filter(User.user_type.startswith('vip')).count()
# 翻译统计
total_translations = Translation.query.count()
today_translations = Translation.query.filter(
func.date(Translation.created_at) == date.today()
).count()
# 缓存统计
total_cache = TranslationCache.query.count()
total_cache_hits = db.session.query(func.sum(TranslationCache.hit_count)).scalar() or 0
total_cache_size = db.session.query(func.sum(TranslationCache.file_size)).scalar() or 0
# 数据包销售统计
total_packages_sold = UserPackage.query.filter_by(payment_status='paid').count()
total_revenue = db.session.query(func.sum(UserPackage.price_paid)).filter(
UserPackage.payment_status == 'paid'
).scalar() or 0
# 最近翻译
recent_translations = Translation.query.order_by(desc(Translation.created_at)).limit(10).all()
# 最近用户
recent_users = User.query.order_by(desc(User.created_at)).limit(10).all()
# 每日翻译趋势最近7天
daily_stats = []
for i in range(6, -1, -1):
d = date.today() - timedelta(days=i)
count = Translation.query.filter(func.date(Translation.created_at) == d).count()
daily_stats.append({'date': d.strftime('%m-%d'), 'count': count})
return render_template('admin/dashboard.html',
total_users=total_users,
new_users_today=new_users_today,
vip_users=vip_users,
total_translations=total_translations,
today_translations=today_translations,
total_cache=total_cache,
total_cache_hits=total_cache_hits,
total_cache_size=total_cache_size,
total_packages_sold=total_packages_sold,
total_revenue=total_revenue,
recent_translations=recent_translations,
recent_users=recent_users,
daily_stats=daily_stats,
)
# ==================== 用户管理 ====================
@admin_bp.route('/users')
@admin_required
def users():
"""用户列表"""
page = request.args.get('page', 1, type=int)
search = request.args.get('search', '')
user_type = request.args.get('type', '')
query = User.query
if search:
query = query.filter(
(User.username.contains(search)) | (User.email.contains(search))
)
if user_type:
query = query.filter_by(user_type=user_type)
users = query.order_by(desc(User.created_at)).paginate(page=page, per_page=20)
return render_template('admin/users.html',
users=users,
search=search,
user_type=user_type,
user_types=['free', 'vip_basic', 'vip_pro', 'vip_enterprise', 'admin']
)
@admin_bp.route('/user/<int:user_id>', methods=['GET', 'POST'])
@admin_required
def user_detail(user_id):
"""用户详情/编辑"""
user = User.query.get_or_404(user_id)
if request.method == 'POST':
data = request.json if request.is_json else request.form
user.username = data.get('username', user.username)
user.email = data.get('email', user.email)
user.user_type = data.get('user_type', user.user_type)
user.is_active = data.get('is_active', user.is_active) == 'true' or data.get('is_active') == True
user.is_admin = data.get('is_admin', user.is_admin) == 'true' or data.get('is_admin') == True
# 会员到期时间
expire_str = data.get('membership_expire')
if expire_str:
try:
user.membership_expire = datetime.fromisoformat(expire_str)
except:
pass
# 密码
new_password = data.get('new_password')
if new_password:
user.set_password(new_password)
db.session.commit()
# 记录日志
log = OperationLog(
user_id=session.get('user_id'),
username='admin',
action='edit_user',
target=user.username,
detail=json.dumps(data, ensure_ascii=False) if isinstance(data, dict) else str(data)
)
db.session.add(log)
db.session.commit()
if request.is_json:
return jsonify({'success': True, 'user': user.to_dict()})
flash('用户信息已更新', 'success')
return redirect(url_for('admin.user_detail', user_id=user_id))
translations = Translation.query.filter_by(user_id=user_id).order_by(desc(Translation.created_at)).limit(20).all()
packages = UserPackage.query.filter_by(user_id=user_id).order_by(desc(UserPackage.purchased_at)).all()
return render_template('admin/user_detail.html', user=user, translations=translations, packages=packages)
@admin_bp.route('/user/<int:user_id>/delete', methods=['POST'])
@admin_required
def delete_user(user_id):
"""删除用户"""
user = User.query.get_or_404(user_id)
if user.is_admin and User.query.filter_by(is_admin=True).count() <= 1:
return jsonify({'error': '不能删除最后一个管理员'}), 400
username = user.username
db.session.delete(user)
db.session.commit()
log = OperationLog(
user_id=session.get('user_id'),
username='admin',
action='delete_user',
target=username
)
db.session.add(log)
db.session.commit()
return jsonify({'success': True})
# ==================== 翻译记录管理 ====================
@admin_bp.route('/translations')
@admin_required
def translations():
"""翻译记录列表"""
page = request.args.get('page', 1, type=int)
status = request.args.get('status', '')
search = request.args.get('search', '')
query = Translation.query
if status:
query = query.filter_by(status=status)
if search:
query = query.filter(Translation.original_filename.contains(search))
translations = query.order_by(desc(Translation.created_at)).paginate(page=page, per_page=20)
return render_template('admin/translations.html',
translations=translations,
status=status,
search=search
)
@admin_bp.route('/translation/<int:trans_id>')
@admin_required
def translation_detail(trans_id):
"""翻译详情"""
translation = Translation.query.get_or_404(trans_id)
user = User.query.get(translation.user_id) if translation.user_id else None
return render_template('admin/translation_detail.html',
translation=translation,
user=user
)
@admin_bp.route('/translation/<int:trans_id>/delete', methods=['POST'])
@admin_required
def delete_translation(trans_id):
"""删除翻译记录"""
translation = Translation.query.get_or_404(trans_id)
if translation.output_path:
import os
if os.path.exists(translation.output_path):
os.remove(translation.output_path)
db.session.delete(translation)
db.session.commit()
return jsonify({'success': True})
@admin_bp.route('/translation/<int:trans_id>/toggle-share', methods=['POST'])
@admin_required
def toggle_translation_share(trans_id):
"""切换翻译共享状态"""
translation = Translation.query.get_or_404(trans_id)
translation.no_share = not translation.no_share
db.session.commit()
# 记录日志
log = OperationLog(
user_id=session.get('user_id'),
username='admin',
action='toggle_translation_share',
target=f'翻译#{trans_id}',
detail=f'设置共享状态为: {not translation.no_share}'
)
db.session.add(log)
db.session.commit()
return jsonify({'success': True, 'no_share': translation.no_share})
# ==================== 缓存管理 ====================
@admin_bp.route('/cache')
@admin_required
def cache_list():
"""缓存列表"""
page = request.args.get('page', 1, type=int)
caches = TranslationCache.query.order_by(desc(TranslationCache.hit_count)).paginate(page=page, per_page=20)
total_size = db.session.query(func.sum(TranslationCache.file_size)).scalar() or 0
total_hits = db.session.query(func.sum(TranslationCache.hit_count)).scalar() or 0
return render_template('admin/cache.html',
caches=caches,
total_size=total_size,
total_hits=total_hits
)
@admin_bp.route('/cache/<int:cache_id>/delete', methods=['POST'])
@admin_required
def delete_cache(cache_id):
"""删除缓存"""
cache = TranslationCache.query.get_or_404(cache_id)
if cache.cache_path:
import os
if os.path.exists(cache.cache_path):
os.remove(cache.cache_path)
db.session.delete(cache)
db.session.commit()
return jsonify({'success': True})
@admin_bp.route('/cache/clear', methods=['POST'])
@admin_required
def clear_cache():
"""清空缓存"""
import os
caches = TranslationCache.query.all()
for cache in caches:
if cache.cache_path and os.path.exists(cache.cache_path):
os.remove(cache.cache_path)
TranslationCache.query.delete()
db.session.commit()
return jsonify({'success': True, 'deleted': len(caches)})
# ==================== 系统配置 V2 ====================
@admin_bp.route('/settings', methods=['GET', 'POST'])
@admin_required
def settings():
"""系统配置"""
# 获取LLM动态配置
llm_config = get_llm_config()
# 获取网站基础配置
site_config = {
'site_name': DynamicConfig.get('site_name', 'PDF翻译助手'),
'site_footer': DynamicConfig.get('site_footer', '© 2026 PDF翻译助手'),
'max_file_size': DynamicConfig.get('max_file_size', 50),
'cache_expire_days': DynamicConfig.get('cache_expire_days', 30),
'enable_email_notify': DynamicConfig.get('enable_email_notify', True),
'enable_cache': DynamicConfig.get('enable_cache', True),
'enable_guest': DynamicConfig.get('enable_guest', True),
'default_source_lang': DynamicConfig.get('default_source_lang', 'en'),
'default_target_lang': DynamicConfig.get('default_target_lang', 'zh'),
}
return render_template('admin/settings.html',
llm_config=llm_config,
site_config=site_config
)
@admin_bp.route('/settings/site', methods=['POST'])
@admin_required
def save_site_settings():
"""保存网站基础配置"""
data = request.json
# 保存每个配置项key 直接使用,不带 site_ 前缀)
for key, value in data.items():
if key in ['max_file_size', 'cache_expire_days']:
DynamicConfig.set(key, int(value), category='site', value_type='int', user_id=session.get('user_id'))
elif key in ['enable_email_notify', 'enable_cache', 'enable_guest']:
DynamicConfig.set(key, bool(value), category='site', value_type='bool', user_id=session.get('user_id'))
else:
DynamicConfig.set(key, value, category='site', user_id=session.get('user_id'))
# 记录日志
log = OperationLog(
user_id=session.get('user_id'),
username='admin',
action='save_site_settings',
detail='保存网站基础配置'
)
db.session.add(log)
db.session.commit()
return jsonify({'success': True})
# ==================== 用户权限配置 ====================
@admin_bp.route('/settings/user-limits', methods=['GET', 'POST'])
@admin_required
def user_limits_settings():
"""用户权限配置"""
if request.method == 'POST':
data = request.json
# 保存每个用户类型的配置
for user_type, limits in data.items():
for key, value in limits.items():
config_key = f"user_limit_{user_type}_{key}"
DynamicConfig.set(
config_key,
value,
category='user_limits',
value_type='int' if isinstance(value, int) else 'string',
user_id=session.get('user_id')
)
return jsonify({'success': True})
# 获取当前配置
limits_config = {}
for user_type in ['guest', 'free', 'vip_basic', 'vip_pro', 'vip_enterprise']:
limits_config[user_type] = {
'daily_translations': DynamicConfig.get(f'user_limit_{user_type}_daily_translations',
USER_LIMITS.get(user_type, {}).get('daily_translations', 10)),
'max_pages': DynamicConfig.get(f'user_limit_{user_type}_max_pages',
USER_LIMITS.get(user_type, {}).get('max_pages', 50)),
'max_file_size': DynamicConfig.get(f'user_limit_{user_type}_max_file_size',
USER_LIMITS.get(user_type, {}).get('max_file_size', 30*1024*1024)),
}
return render_template('admin/user_limits.html',
limits_config=limits_config,
default_limits=USER_LIMITS
)
# ==================== 会员套餐配置 ====================
@admin_bp.route('/settings/membership', methods=['GET', 'POST'])
@admin_required
def membership_settings():
"""会员套餐配置"""
if request.method == 'POST':
data = request.json
for plan_key, plan_data in data.items():
for key, value in plan_data.items():
config_key = f"membership_{plan_key}_{key}"
value_type = 'int' if isinstance(value, int) else 'float' if isinstance(value, float) else 'string'
DynamicConfig.set(
config_key,
value,
category='membership',
value_type=value_type,
user_id=session.get('user_id')
)
return jsonify({'success': True})
# 获取当前配置
plans_config = {}
for plan_key in ['vip_basic', 'vip_pro', 'vip_enterprise']:
default = MEMBERSHIP_PLANS.get(plan_key, {})
plans_config[plan_key] = {
'name': DynamicConfig.get(f'membership_{plan_key}_name', default.get('name', plan_key)),
'price': DynamicConfig.get(f'membership_{plan_key}_price', default.get('price', 0)),
'period': DynamicConfig.get(f'membership_{plan_key}_period', default.get('period', 'month')),
'description': DynamicConfig.get(f'membership_{plan_key}_description', default.get('description', '')),
}
return render_template('admin/membership.html',
plans_config=plans_config,
default_plans=MEMBERSHIP_PLANS
)
# ==================== 数据包套餐管理 ====================
@admin_bp.route('/packages')
@admin_required
def packages():
"""数据包套餐列表"""
packages = DataPackage.query.order_by(DataPackage.sort_order).all()
return render_template('admin/packages.html', packages=packages)
@admin_bp.route('/package/add', methods=['GET', 'POST'])
@admin_required
def add_package():
"""添加数据包套餐"""
if request.method == 'POST':
data = request.json if request.is_json else request.form
package = DataPackage(
name=data.get('name'),
description=data.get('description'),
translation_count=int(data.get('translation_count', 0)),
price=float(data.get('price', 0)),
original_price=float(data.get('original_price')) if data.get('original_price') else None,
valid_days=int(data.get('valid_days', 30)),
sort_order=int(data.get('sort_order', 0)),
is_active=data.get('is_active', 'true') == 'true' if isinstance(data.get('is_active'), str) else data.get('is_active', True),
is_recommended=data.get('is_recommended', 'true') == 'true' if isinstance(data.get('is_recommended'), str) else data.get('is_recommended', False),
)
db.session.add(package)
db.session.commit()
if request.is_json:
return jsonify({'success': True, 'package': package.to_dict()})
flash('数据包套餐已添加', 'success')
return redirect(url_for('admin.packages'))
return render_template('admin/package_form.html', package=None)
@admin_bp.route('/package/<int:package_id>/edit', methods=['GET', 'POST'])
@admin_required
def edit_package(package_id):
"""编辑数据包套餐"""
package = DataPackage.query.get_or_404(package_id)
if request.method == 'POST':
data = request.json if request.is_json else request.form
package.name = data.get('name', package.name)
package.description = data.get('description', package.description)
package.translation_count = int(data.get('translation_count', package.translation_count))
package.price = float(data.get('price', package.price))
package.original_price = float(data.get('original_price')) if data.get('original_price') else None
package.valid_days = int(data.get('valid_days', package.valid_days))
package.sort_order = int(data.get('sort_order', package.sort_order))
package.is_active = data.get('is_active', 'true') == 'true' if isinstance(data.get('is_active'), str) else data.get('is_active', True)
package.is_recommended = data.get('is_recommended', 'true') == 'true' if isinstance(data.get('is_recommended'), str) else data.get('is_recommended', False)
db.session.commit()
if request.is_json:
return jsonify({'success': True, 'package': package.to_dict()})
flash('数据包套餐已更新', 'success')
return redirect(url_for('admin.packages'))
return render_template('admin/package_form.html', package=package)
@admin_bp.route('/package/<int:package_id>/delete', methods=['POST'])
@admin_required
def delete_package(package_id):
"""删除数据包套餐"""
package = DataPackage.query.get_or_404(package_id)
db.session.delete(package)
db.session.commit()
return jsonify({'success': True})
@admin_bp.route('/package/<int:package_id>/toggle', methods=['POST'])
@admin_required
def toggle_package(package_id):
"""切换数据包套餐状态"""
package = DataPackage.query.get_or_404(package_id)
package.is_active = not package.is_active
db.session.commit()
return jsonify({'success': True, 'is_active': package.is_active})
# ==================== 操作日志 ====================
@admin_bp.route('/logs')
@admin_required
def logs():
"""操作日志"""
page = request.args.get('page', 1, type=int)
action = request.args.get('action', '')
query = OperationLog.query
if action:
query = query.filter_by(action=action)
logs = query.order_by(desc(OperationLog.created_at)).paginate(page=page, per_page=50)
return render_template('admin/logs.html', logs=logs, action=action)
# ==================== 统计报表 ====================
@admin_bp.route('/stats')
@admin_required
def stats():
"""统计报表"""
# 用户增长趋势最近30天
user_growth = []
for i in range(29, -1, -1):
d = date.today() - timedelta(days=i)
count = User.query.filter(func.date(User.created_at) == d).count()
user_growth.append({'date': d.strftime('%Y-%m-%d'), 'count': count})
# 翻译量趋势最近30天
translation_growth = []
for i in range(29, -1, -1):
d = date.today() - timedelta(days=i)
count = Translation.query.filter(func.date(Translation.created_at) == d).count()
translation_growth.append({'date': d.strftime('%Y-%m-%d'), 'count': count})
# 收入趋势最近30天
revenue_growth = []
for i in range(29, -1, -1):
d = date.today() - timedelta(days=i)
revenue = db.session.query(func.sum(UserPackage.price_paid)).filter(
UserPackage.payment_status == 'paid',
func.date(UserPackage.purchased_at) == d
).scalar() or 0
revenue_growth.append({'date': d.strftime('%Y-%m-%d'), 'revenue': float(revenue)})
# 用户类型分布
user_distribution = db.session.query(
User.user_type, func.count(User.id)
).group_by(User.user_type).all()
# 翻译状态分布
status_distribution = db.session.query(
Translation.status, func.count(Translation.id)
).group_by(Translation.status).all()
# Top用户
top_users = db.session.query(
User.username, func.count(Translation.id).label('count')
).join(Translation).group_by(User.id).order_by(desc('count')).limit(10).all()
# 数据包销售排行
top_packages = db.session.query(
DataPackage.name, func.count(UserPackage.id).label('count'),
func.sum(UserPackage.price_paid).label('revenue')
).join(UserPackage).filter(
UserPackage.payment_status == 'paid'
).group_by(DataPackage.id).order_by(desc('count')).limit(10).all()
return render_template('admin/stats.html',
user_growth=user_growth,
translation_growth=translation_growth,
revenue_growth=revenue_growth,
user_distribution=user_distribution,
status_distribution=status_distribution,
top_users=top_users,
top_packages=top_packages
)
# ==================== API接口 ====================
@admin_bp.route('/api/stats/summary')
@admin_required
def api_stats_summary():
"""API: 统计摘要"""
return jsonify({
'total_users': User.query.count(),
'today_users': User.query.filter(func.date(User.created_at) == date.today()).count(),
'total_translations': Translation.query.count(),
'today_translations': Translation.query.filter(func.date(Translation.created_at) == date.today()).count(),
'total_cache': TranslationCache.query.count(),
'cache_hits': db.session.query(func.sum(TranslationCache.hit_count)).scalar() or 0,
'total_packages': DataPackage.query.filter_by(is_active=True).count(),
'packages_sold': UserPackage.query.filter_by(payment_status='paid').count(),
'total_revenue': float(db.session.query(func.sum(UserPackage.price_paid)).filter(
UserPackage.payment_status == 'paid'
).scalar() or 0),
})
@admin_bp.route('/api/user/<int:user_id>/upgrade', methods=['POST'])
@admin_required
def api_user_upgrade(user_id):
"""API: 升级用户会员"""
user = User.query.get_or_404(user_id)
data = request.json
user_type = data.get('user_type')
months = data.get('months', 1)
valid_types = ['free', 'vip_basic', 'vip_pro', 'vip_enterprise', 'admin']
if user_type not in valid_types:
return jsonify({'error': '无效的用户类型'}), 400
user.user_type = user_type
if user_type.startswith('vip'):
user.membership_expire = datetime.utcnow() + timedelta(days=30 * months)
db.session.commit()
return jsonify({'success': True, 'user': user.to_dict()})
@admin_bp.route('/api/user/<int:user_id>/add-package', methods=['POST'])
@admin_required
def api_user_add_package(user_id):
"""API: 为用户添加数据包"""
user = User.query.get_or_404(user_id)
data = request.json
package_id = data.get('package_id')
package = DataPackage.query.get(package_id)
if not package:
return jsonify({'error': '数据包不存在'}), 404
user_package = UserPackage(
user_id=user.id,
package_id=package.id,
package_name=package.name,
translation_count=package.translation_count,
remaining_count=package.translation_count,
expire_at=datetime.utcnow() + timedelta(days=package.valid_days) if package.valid_days > 0 else None,
price_paid=0, # 管理员赠送
payment_status='paid'
)
db.session.add(user_package)
db.session.commit()
return jsonify({'success': True, 'package': {
'id': user_package.id,
'name': user_package.package_name,
'remaining': user_package.remaining_count
}})
# ==================== LLM大模型配置 ====================
@admin_bp.route('/llm_config')
@admin_required
def llm_config():
"""LLM配置页面"""
# 获取所有大模型配置
llm_configs = BackupLLMConfig.query.order_by(BackupLLMConfig.sort_order).all()
# 如果数据库中没有数据,初始化默认配置
if not llm_configs:
init_default_backup_llm()
llm_configs = BackupLLMConfig.query.order_by(BackupLLMConfig.sort_order).all()
return render_template('admin/llm_config.html', llm_configs=[c.to_dict() for c in llm_configs])
@admin_bp.route('/llm_config/save', methods=['POST'])
@admin_required
def save_llm_config():
"""保存LLM配置"""
data = request.json
DynamicConfig.set('llm_api_base', data.get('api_base'), category='llm', user_id=session.get('user_id'))
DynamicConfig.set('llm_api_key', data.get('api_key'), category='llm', user_id=session.get('user_id'))
DynamicConfig.set('llm_model', data.get('model'), category='llm', user_id=session.get('user_id'))
DynamicConfig.set('llm_max_tokens', data.get('max_tokens'), category='llm', value_type='int', user_id=session.get('user_id'))
DynamicConfig.set('llm_chunk_size', data.get('chunk_size'), category='llm', value_type='int', user_id=session.get('user_id'))
DynamicConfig.set('llm_timeout', data.get('timeout'), category='llm', value_type='int', user_id=session.get('user_id'))
# 记录日志
log = OperationLog(
user_id=session.get('user_id'),
username='admin',
action='update_llm_config',
detail='更新大模型配置'
)
db.session.add(log)
db.session.commit()
return jsonify({'success': True})
@admin_bp.route('/llm_config/test', methods=['POST'])
@admin_required
def test_llm_connection():
"""测试LLM连接"""
data = request.json
try:
from openai import OpenAI
client = OpenAI(
api_key=data.get('api_key', 'sk-test'),
base_url=data.get('api_base'),
)
# 发送简单测试请求
response = client.chat.completions.create(
model=data.get('model'),
messages=[{"role": "user", "content": "Hello"}],
max_tokens=10,
timeout=10,
)
return jsonify({
'success': True,
'model': data.get('model'),
'response': response.choices[0].message.content[:50] if response.choices else 'OK'
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
@admin_bp.route('/llm_config/reset', methods=['POST'])
@admin_required
def reset_llm_config():
"""恢复默认LLM配置"""
from config import LLM_CONFIG
# 删除数据库中的LLM配置
DynamicConfig.query.filter_by(category='llm').delete()
db.session.commit()
return jsonify({'success': True})
# ==================== 获取当前LLM配置供其他模块使用 ====================
def get_llm_config():
"""获取当前默认LLM配置"""
# 从数据库获取默认配置
default_config = BackupLLMConfig.query.filter_by(is_default=True, is_active=True).first()
if default_config:
return {
'api_base': default_config.api_base,
'api_key': default_config.api_key or '',
'model': default_config.model,
'max_tokens': default_config.max_tokens,
'chunk_size': default_config.chunk_size,
'timeout': default_config.timeout,
'provider_name': default_config.provider_name,
}
# 如果没有默认配置,尝试获取第一个启用的
first_config = BackupLLMConfig.query.filter_by(is_active=True).order_by(BackupLLMConfig.sort_order).first()
if first_config:
return {
'api_base': first_config.api_base,
'api_key': first_config.api_key or '',
'model': first_config.model,
'max_tokens': first_config.max_tokens,
'chunk_size': first_config.chunk_size,
'timeout': first_config.timeout,
'provider_name': first_config.provider_name,
}
# 如果都没有,使用配置文件默认值
from config import LLM_CONFIG
return LLM_CONFIG
# ==================== 用户类型配置管理(动态增删) ====================
@admin_bp.route('/user-types')
@admin_required
def user_types():
"""用户类型配置列表"""
user_types = UserTypeConfig.query.order_by(UserTypeConfig.sort_order).all()
# 如果数据库中没有数据,初始化默认配置
if not user_types:
init_default_user_types()
user_types = UserTypeConfig.query.order_by(UserTypeConfig.sort_order).all()
return render_template('admin/user_types.html', user_types=user_types)
@admin_bp.route('/user-types/add', methods=['GET', 'POST'])
@admin_required
def add_user_type():
"""添加用户类型"""
if request.method == 'POST':
data = request.json if request.is_json else request.form
# 检查type_key是否已存在
existing = UserTypeConfig.query.filter_by(type_key=data.get('type_key')).first()
if existing:
return jsonify({'error': '类型标识已存在'}), 400
user_type = UserTypeConfig(
type_key=data.get('type_key'),
display_name=data.get('display_name'),
daily_translations=int(data.get('daily_translations', 10)),
max_pages=int(data.get('max_pages', 50)),
max_file_size=int(data.get('max_file_size_mb', 30)) * 1024 * 1024,
features=data.get('features', '[]'),
sort_order=int(data.get('sort_order', 0)),
is_active=data.get('is_active', True),
is_system=False,
)
db.session.add(user_type)
db.session.commit()
# 记录日志
log = OperationLog(
user_id=session.get('user_id'),
username='admin',
action='add_user_type',
target=user_type.type_key,
detail=json.dumps(user_type.to_dict(), ensure_ascii=False)
)
db.session.add(log)
db.session.commit()
if request.is_json:
return jsonify({'success': True, 'user_type': user_type.to_dict()})
flash('用户类型已添加', 'success')
return redirect(url_for('admin.user_types'))
# 所有可用功能
all_features = [
{'key': 'basic_translate', 'name': '基础翻译'},
{'key': 'compare_view', 'name': '对照查看'},
{'key': 'retranslate', 'name': '重新翻译'},
{'key': 'history', 'name': '历史记录'},
{'key': 'priority_queue', 'name': '优先队列'},
{'key': 'export_pdf', 'name': '导出PDF'},
{'key': 'batch_translate', 'name': '批量翻译'},
{'key': 'custom_terms', 'name': '自定义术语'},
]
return render_template('admin/user_type_form.html', user_type=None, all_features=all_features)
@admin_bp.route('/user-types/<int:type_id>/edit', methods=['GET', 'POST'])
@admin_required
def edit_user_type(type_id):
"""编辑用户类型"""
user_type = UserTypeConfig.query.get_or_404(type_id)
if request.method == 'POST':
data = request.json if request.is_json else request.form
user_type.display_name = data.get('display_name', user_type.display_name)
user_type.daily_translations = int(data.get('daily_translations', user_type.daily_translations))
user_type.max_pages = int(data.get('max_pages', user_type.max_pages))
user_type.max_file_size = int(data.get('max_file_size_mb', user_type.max_file_size // 1024 // 1024)) * 1024 * 1024
user_type.features = data.get('features', user_type.features)
user_type.sort_order = int(data.get('sort_order', user_type.sort_order))
user_type.is_active = data.get('is_active', True) if isinstance(data.get('is_active'), bool) else data.get('is_active') == 'true'
db.session.commit()
# 记录日志
log = OperationLog(
user_id=session.get('user_id'),
username='admin',
action='edit_user_type',
target=user_type.type_key,
detail=json.dumps(user_type.to_dict(), ensure_ascii=False)
)
db.session.add(log)
db.session.commit()
if request.is_json:
return jsonify({'success': True, 'user_type': user_type.to_dict()})
flash('用户类型已更新', 'success')
return redirect(url_for('admin.user_types'))
# 所有可用功能
all_features = [
{'key': 'basic_translate', 'name': '基础翻译'},
{'key': 'compare_view', 'name': '对照查看'},
{'key': 'retranslate', 'name': '重新翻译'},
{'key': 'history', 'name': '历史记录'},
{'key': 'priority_queue', 'name': '优先队列'},
{'key': 'export_pdf', 'name': '导出PDF'},
{'key': 'batch_translate', 'name': '批量翻译'},
{'key': 'custom_terms', 'name': '自定义术语'},
]
return render_template('admin/user_type_form.html', user_type=user_type, all_features=all_features)
@admin_bp.route('/user-types/<int:type_id>/delete', methods=['POST'])
@admin_required
def delete_user_type(type_id):
"""删除用户类型"""
user_type = UserTypeConfig.query.get_or_404(type_id)
# 系统内置类型不可删除
if user_type.is_system:
return jsonify({'error': '系统内置类型不可删除'}), 400
# 检查是否有用户使用此类型
users_count = User.query.filter_by(user_type=user_type.type_key).count()
if users_count > 0:
return jsonify({'error': f'{users_count} 个用户使用此类型,请先修改用户类型'}), 400
type_key = user_type.type_key
db.session.delete(user_type)
db.session.commit()
# 记录日志
log = OperationLog(
user_id=session.get('user_id'),
username='admin',
action='delete_user_type',
target=type_key
)
db.session.add(log)
db.session.commit()
return jsonify({'success': True})
@admin_bp.route('/user-types/<int:type_id>/toggle', methods=['POST'])
@admin_required
def toggle_user_type(type_id):
"""切换用户类型状态"""
user_type = UserTypeConfig.query.get_or_404(type_id)
user_type.is_active = not user_type.is_active
db.session.commit()
return jsonify({'success': True, 'is_active': user_type.is_active})
@admin_bp.route('/user-types/init', methods=['POST'])
@admin_required
def init_user_types():
"""初始化默认用户类型"""
init_default_user_types()
return jsonify({'success': True})
def init_default_user_types():
"""初始化默认用户类型配置"""
from config import USER_LIMITS
defaults = [
('guest', '访客', USER_LIMITS.get('guest', {}), 0, True),
('free', '免费用户', USER_LIMITS.get('free', {}), 1, True),
('vip_basic', '基础会员', USER_LIMITS.get('vip_basic', {}), 2, True),
('vip_pro', '专业会员', USER_LIMITS.get('vip_pro', {}), 3, True),
('vip_enterprise', '企业会员', USER_LIMITS.get('vip_enterprise', {}), 4, True),
]
for type_key, display_name, limits, sort_order, is_system in defaults:
existing = UserTypeConfig.query.filter_by(type_key=type_key).first()
if not existing:
user_type = UserTypeConfig(
type_key=type_key,
display_name=display_name,
daily_translations=limits.get('daily_translations', 10),
max_pages=limits.get('max_pages', 50),
max_file_size=limits.get('max_file_size', 30*1024*1024),
features=json.dumps(limits.get('features', []), ensure_ascii=False),
sort_order=sort_order,
is_active=True,
is_system=is_system,
)
db.session.add(user_type)
db.session.commit()
# ==================== 会员套餐配置管理(动态增删) ====================
@admin_bp.route('/membership-plans')
@admin_required
def membership_plans():
"""会员套餐配置列表"""
plans = MembershipPlanConfig.query.order_by(MembershipPlanConfig.sort_order).all()
# 如果数据库中没有数据,初始化默认配置
if not plans:
init_default_membership_plans()
plans = MembershipPlanConfig.query.order_by(MembershipPlanConfig.sort_order).all()
# 获取所有用户类型供选择
user_types = UserTypeConfig.query.filter_by(is_active=True).all()
return render_template('admin/membership_plans.html', plans=plans, user_types=user_types)
@admin_bp.route('/membership-plans/add', methods=['GET', 'POST'])
@admin_required
def add_membership_plan():
"""添加会员套餐"""
if request.method == 'POST':
data = request.json if request.is_json else request.form
# 检查plan_key是否已存在
existing = MembershipPlanConfig.query.filter_by(plan_key=data.get('plan_key')).first()
if existing:
return jsonify({'error': '套餐标识已存在'}), 400
plan = MembershipPlanConfig(
plan_key=data.get('plan_key'),
display_name=data.get('display_name'),
price=float(data.get('price', 0)),
original_price=float(data.get('original_price')) if data.get('original_price') else None,
period=data.get('period', 'month'),
period_days=int(data.get('period_days', 30)),
description=data.get('description'),
user_type_key=data.get('user_type_key'),
sort_order=int(data.get('sort_order', 0)),
is_active=data.get('is_active', True),
is_recommended=data.get('is_recommended', False),
is_system=False,
)
db.session.add(plan)
db.session.commit()
# 记录日志
log = OperationLog(
user_id=session.get('user_id'),
username='admin',
action='add_membership_plan',
target=plan.plan_key,
detail=json.dumps(plan.to_dict(), ensure_ascii=False)
)
db.session.add(log)
db.session.commit()
if request.is_json:
return jsonify({'success': True, 'plan': plan.to_dict()})
flash('会员套餐已添加', 'success')
return redirect(url_for('admin.membership_plans'))
# 获取所有用户类型供选择
user_types = UserTypeConfig.query.filter_by(is_active=True).all()
return render_template('admin/membership_plan_form.html', plan=None, user_types=user_types)
@admin_bp.route('/membership-plans/<int:plan_id>/edit', methods=['GET', 'POST'])
@admin_required
def edit_membership_plan(plan_id):
"""编辑会员套餐"""
plan = MembershipPlanConfig.query.get_or_404(plan_id)
if request.method == 'POST':
data = request.json if request.is_json else request.form
plan.display_name = data.get('display_name', plan.display_name)
plan.price = float(data.get('price', plan.price))
plan.original_price = float(data.get('original_price')) if data.get('original_price') else None
plan.period = data.get('period', plan.period)
plan.period_days = int(data.get('period_days', plan.period_days))
plan.description = data.get('description', plan.description)
plan.user_type_key = data.get('user_type_key', plan.user_type_key)
plan.sort_order = int(data.get('sort_order', plan.sort_order))
plan.is_active = data.get('is_active', True) if isinstance(data.get('is_active'), bool) else data.get('is_active') == 'true'
plan.is_recommended = data.get('is_recommended', False) if isinstance(data.get('is_recommended'), bool) else data.get('is_recommended') == 'true'
db.session.commit()
# 记录日志
log = OperationLog(
user_id=session.get('user_id'),
username='admin',
action='edit_membership_plan',
target=plan.plan_key,
detail=json.dumps(plan.to_dict(), ensure_ascii=False)
)
db.session.add(log)
db.session.commit()
if request.is_json:
return jsonify({'success': True, 'plan': plan.to_dict()})
flash('会员套餐已更新', 'success')
return redirect(url_for('admin.membership_plans'))
# 获取所有用户类型供选择
user_types = UserTypeConfig.query.filter_by(is_active=True).all()
return render_template('admin/membership_plan_form.html', plan=plan, user_types=user_types)
@admin_bp.route('/membership-plans/<int:plan_id>/delete', methods=['POST'])
@admin_required
def delete_membership_plan(plan_id):
"""删除会员套餐"""
plan = MembershipPlanConfig.query.get_or_404(plan_id)
# 系统内置套餐不可删除
if plan.is_system:
return jsonify({'error': '系统内置套餐不可删除'}), 400
plan_key = plan.plan_key
db.session.delete(plan)
db.session.commit()
# 记录日志
log = OperationLog(
user_id=session.get('user_id'),
username='admin',
action='delete_membership_plan',
target=plan_key
)
db.session.add(log)
db.session.commit()
return jsonify({'success': True})
@admin_bp.route('/membership-plans/<int:plan_id>/toggle', methods=['POST'])
@admin_required
def toggle_membership_plan(plan_id):
"""切换会员套餐状态"""
plan = MembershipPlanConfig.query.get_or_404(plan_id)
plan.is_active = not plan.is_active
db.session.commit()
return jsonify({'success': True, 'is_active': plan.is_active})
@admin_bp.route('/membership-plans/<int:plan_id>/recommend', methods=['POST'])
@admin_required
def recommend_membership_plan(plan_id):
"""设置推荐套餐"""
plan = MembershipPlanConfig.query.get_or_404(plan_id)
plan.is_recommended = not plan.is_recommended
db.session.commit()
return jsonify({'success': True, 'is_recommended': plan.is_recommended})
@admin_bp.route('/membership-plans/init', methods=['POST'])
@admin_required
def init_membership_plans():
"""初始化默认会员套餐"""
init_default_membership_plans()
return jsonify({'success': True})
def init_default_membership_plans():
"""初始化默认会员套餐配置"""
from config import MEMBERSHIP_PLANS
defaults = [
('vip_basic', MEMBERSHIP_PLANS.get('vip_basic', {}), 'vip_basic', 0, True),
('vip_pro', MEMBERSHIP_PLANS.get('vip_pro', {}), 'vip_pro', 1, True),
('vip_enterprise', MEMBERSHIP_PLANS.get('vip_enterprise', {}), 'vip_enterprise', 2, True),
]
for plan_key, plan_data, user_type_key, sort_order, is_system in defaults:
existing = MembershipPlanConfig.query.filter_by(plan_key=plan_key).first()
if not existing:
plan = MembershipPlanConfig(
plan_key=plan_key,
display_name=plan_data.get('name', plan_key),
price=plan_data.get('price', 0),
original_price=None,
period=plan_data.get('period', 'month'),
period_days=30 if plan_data.get('period') == 'month' else 365 if plan_data.get('period') == 'year' else 90,
description=plan_data.get('description', ''),
user_type_key=user_type_key,
sort_order=sort_order,
is_active=True,
is_recommended=False,
is_system=is_system,
)
db.session.add(plan)
db.session.commit()
# ==================== API: 获取用户限制配置(供其他模块使用) ====================
def get_user_limits(user_type_key):
"""获取指定用户类型的限制配置"""
config = UserTypeConfig.query.filter_by(type_key=user_type_key, is_active=True).first()
if config:
return {
'daily_translations': config.daily_translations,
'max_pages': config.max_pages,
'max_file_size': config.max_file_size,
'features': config.get_features(),
}
# 如果数据库中没有,使用默认配置
from config import USER_LIMITS
return USER_LIMITS.get(user_type_key, USER_LIMITS.get('free', {}))
def get_all_user_types():
"""获取所有用户类型配置"""
types = UserTypeConfig.query.filter_by(is_active=True).order_by(UserTypeConfig.sort_order).all()
return {t.type_key: get_user_limits(t.type_key) for t in types}
def get_membership_plan(plan_key):
"""获取指定会员套餐配置"""
plan = MembershipPlanConfig.query.filter_by(plan_key=plan_key, is_active=True).first()
if plan:
return plan.to_dict()
from config import MEMBERSHIP_PLANS
return MEMBERSHIP_PLANS.get(plan_key, {})
def get_all_membership_plans():
"""获取所有会员套餐配置"""
plans = MembershipPlanConfig.query.filter_by(is_active=True).order_by(MembershipPlanConfig.sort_order).all()
return [p.to_dict() for p in plans]
# ==================== 备用大模型接口管理 ====================
@admin_bp.route('/backup-llm')
@admin_required
def backup_llm_list():
"""备用大模型接口列表 - 重定向到大模型配置页"""
return redirect(url_for('admin.llm_config'))
@admin_bp.route('/backup-llm/add', methods=['POST'])
@admin_required
def add_backup_llm():
"""添加大模型接口"""
data = request.json
config = BackupLLMConfig(
provider_name=data.get('provider_name'),
api_base=data.get('api_base'),
api_key=data.get('api_key'),
model=data.get('model'),
max_tokens=int(data.get('max_tokens', 8000)),
chunk_size=int(data.get('chunk_size', 2000)),
timeout=int(data.get('timeout', 180)),
is_active=data.get('is_active', True),
is_default=False,
sort_order=int(data.get('sort_order', 0)),
description=data.get('description'),
)
db.session.add(config)
db.session.commit()
# 记录日志
log = OperationLog(
user_id=session.get('user_id'),
username='admin',
action='add_llm_config',
target=config.provider_name,
detail=json.dumps(config.to_dict(), ensure_ascii=False)
)
db.session.add(log)
db.session.commit()
return jsonify({'success': True, 'config': config.to_dict()})
@admin_bp.route('/backup-llm/<int:config_id>/edit', methods=['POST'])
@admin_required
def edit_backup_llm(config_id):
"""编辑大模型接口"""
config = BackupLLMConfig.query.get_or_404(config_id)
data = request.json
config.provider_name = data.get('provider_name', config.provider_name)
config.api_base = data.get('api_base', config.api_base)
config.api_key = data.get('api_key', config.api_key)
config.model = data.get('model', config.model)
config.max_tokens = int(data.get('max_tokens', config.max_tokens))
config.chunk_size = int(data.get('chunk_size', config.chunk_size))
config.timeout = int(data.get('timeout', config.timeout))
config.is_active = data.get('is_active', True)
config.sort_order = int(data.get('sort_order', config.sort_order))
config.description = data.get('description', config.description)
db.session.commit()
# 记录日志
log = OperationLog(
user_id=session.get('user_id'),
username='admin',
action='edit_llm_config',
target=config.provider_name,
detail=json.dumps(config.to_dict(), ensure_ascii=False)
)
db.session.add(log)
db.session.commit()
return jsonify({'success': True, 'config': config.to_dict()})
@admin_bp.route('/backup-llm/<int:config_id>/delete', methods=['POST'])
@admin_required
def delete_backup_llm(config_id):
"""删除备用大模型接口"""
config = BackupLLMConfig.query.get_or_404(config_id)
provider_name = config.provider_name
db.session.delete(config)
db.session.commit()
# 记录日志
log = OperationLog(
user_id=session.get('user_id'),
username='admin',
action='delete_backup_llm',
target=provider_name
)
db.session.add(log)
db.session.commit()
return jsonify({'success': True})
@admin_bp.route('/backup-llm/<int:config_id>/toggle', methods=['POST'])
@admin_required
def toggle_backup_llm(config_id):
"""切换大模型接口状态"""
config = BackupLLMConfig.query.get_or_404(config_id)
config.is_active = not config.is_active
db.session.commit()
return jsonify({'success': True, 'is_active': config.is_active})
@admin_bp.route('/backup-llm/<int:config_id>/set-default', methods=['POST'])
@admin_required
def set_default_llm(config_id):
"""设置默认大模型接口"""
config = BackupLLMConfig.query.get_or_404(config_id)
# 先清除所有默认标记
BackupLLMConfig.query.update({'is_default': False})
# 设置当前为默认
config.is_default = True
config.is_active = True # 默认的必须启用
db.session.commit()
# 记录日志
log = OperationLog(
user_id=session.get('user_id'),
username='admin',
action='set_default_llm',
target=config.provider_name,
detail=f'设置 {config.provider_name} 为默认大模型'
)
db.session.add(log)
db.session.commit()
return jsonify({'success': True, 'config': config.to_dict()})
@admin_bp.route('/backup-llm/<int:config_id>/test', methods=['POST'])
@admin_required
def test_backup_llm(config_id):
"""测试备用大模型接口"""
config = BackupLLMConfig.query.get_or_404(config_id)
try:
from openai import OpenAI
client = OpenAI(
api_key=config.api_key or 'sk-test',
base_url=config.api_base,
)
model = config.model or 'default'
# 发送简单测试请求
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": "Hello"}],
max_tokens=10,
timeout=10,
)
return jsonify({
'success': True,
'provider': config.provider_name,
'model': model,
'response': response.choices[0].message.content[:50] if response.choices else 'OK'
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)})
@admin_bp.route('/backup-llm/init', methods=['POST'])
@admin_required
def init_backup_llm():
"""初始化默认备用大模型"""
init_default_backup_llm()
return jsonify({'success': True})
def init_default_backup_llm():
"""初始化默认备用大模型接口配置"""
defaults = [
('本地LM Studio', 'http://localhost:1234/v1', None, None, 0),
('OpenAI', 'https://api.openai.com/v1', None, 'gpt-4', 1),
('DeepSeek', 'https://api.deepseek.com/v1', None, 'deepseek-chat', 2),
('阿里百炼', 'https://dashscope.aliyuncs.com/compatible-mode/v1', None, 'qwen-turbo', 3),
('SiliconFlow', 'https://api.siliconflow.cn/v1', None, 'Qwen/Qwen2.5-72B-Instruct', 4),
]
for provider_name, api_base, api_key, model, sort_order in defaults:
existing = BackupLLMConfig.query.filter_by(provider_name=provider_name).first()
if not existing:
config = BackupLLMConfig(
provider_name=provider_name,
api_base=api_base,
api_key=api_key,
model=model,
is_active=True,
sort_order=sort_order,
description=f'{provider_name} 默认接口',
)
db.session.add(config)
db.session.commit()
def get_backup_llm_configs():
"""获取所有备用大模型配置(供其他模块使用)"""
configs = BackupLLMConfig.query.filter_by(is_active=True).order_by(BackupLLMConfig.sort_order).all()
return [c.to_dict() for c in configs]
# ==================== 获取网站基础配置(供其他模块使用) ====================
def get_site_config():
"""获取网站基础配置"""
return {
'site_name': DynamicConfig.get('site_name', 'PDF翻译助手'),
'site_footer': DynamicConfig.get('site_footer', '© 2026 PDF翻译助手'),
'max_file_size': DynamicConfig.get('max_file_size', 50),
'cache_expire_days': DynamicConfig.get('cache_expire_days', 30),
'enable_email_notify': DynamicConfig.get('enable_email_notify', True),
'enable_cache': DynamicConfig.get('enable_cache', True),
'enable_guest': DynamicConfig.get('enable_guest', True),
'default_source_lang': DynamicConfig.get('default_source_lang', 'en'),
'default_target_lang': DynamicConfig.get('default_target_lang', 'zh'),
}