Files
pdf-translate-web/admin.py
coder 2ef5e6da87 V2.0.0: 新增用户权限动态配置、会员套餐配置、数据包购买功能
新功能:
- 用户权限动态配置(翻译次数、页数限制)
- 会员套餐动态配置(名称、价格、周期)
- 数据包购买套餐管理
- 收入统计功能
- 数据包销售排行

技术更新:
- 新增 DynamicConfig 模型支持动态配置
- 新增 DataPackage 和 UserPackage 模型
- 后台管理增加数据包管理模块
2026-04-07 23:26:53 +08:00

664 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
后台管理模块 V2
"""
from datetime import datetime, date, timedelta
from functools import wraps
from flask import Blueprint, render_template, request, jsonify, redirect, url_for, session, flash
from sqlalchemy import func, desc
import json
from models import (db, User, Translation, TranslationCache, GuestTranslation,
SystemConfig, OperationLog, DataPackage, UserPackage, DynamicConfig)
from config import USER_LIMITS, MEMBERSHIP_PLANS
admin_bp = Blueprint('admin', __name__, url_prefix='/admin')
# ==================== 权限装饰器 ====================
def admin_required(f):
"""管理员权限装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
user_id = session.get('user_id')
if not user_id:
return redirect(url_for('login', next=request.url))
user = User.query.get(user_id)
if not user or not user.is_admin:
flash('需要管理员权限', 'error')
return redirect(url_for('index'))
return f(*args, **kwargs)
return decorated_function
# ==================== 后台首页 ====================
@admin_bp.route('/')
@admin_required
def dashboard():
"""后台首页 - 数据概览"""
# 用户统计
total_users = User.query.count()
new_users_today = User.query.filter(
func.date(User.created_at) == date.today()
).count()
vip_users = User.query.filter(User.user_type.startswith('vip')).count()
# 翻译统计
total_translations = Translation.query.count()
today_translations = Translation.query.filter(
func.date(Translation.created_at) == date.today()
).count()
# 缓存统计
total_cache = TranslationCache.query.count()
total_cache_hits = db.session.query(func.sum(TranslationCache.hit_count)).scalar() or 0
total_cache_size = db.session.query(func.sum(TranslationCache.file_size)).scalar() or 0
# 数据包销售统计
total_packages_sold = UserPackage.query.filter_by(payment_status='paid').count()
total_revenue = db.session.query(func.sum(UserPackage.price_paid)).filter(
UserPackage.payment_status == 'paid'
).scalar() or 0
# 最近翻译
recent_translations = Translation.query.order_by(desc(Translation.created_at)).limit(10).all()
# 最近用户
recent_users = User.query.order_by(desc(User.created_at)).limit(10).all()
# 每日翻译趋势最近7天
daily_stats = []
for i in range(6, -1, -1):
d = date.today() - timedelta(days=i)
count = Translation.query.filter(func.date(Translation.created_at) == d).count()
daily_stats.append({'date': d.strftime('%m-%d'), 'count': count})
return render_template('admin/dashboard.html',
total_users=total_users,
new_users_today=new_users_today,
vip_users=vip_users,
total_translations=total_translations,
today_translations=today_translations,
total_cache=total_cache,
total_cache_hits=total_cache_hits,
total_cache_size=total_cache_size,
total_packages_sold=total_packages_sold,
total_revenue=total_revenue,
recent_translations=recent_translations,
recent_users=recent_users,
daily_stats=daily_stats,
)
# ==================== 用户管理 ====================
@admin_bp.route('/users')
@admin_required
def users():
"""用户列表"""
page = request.args.get('page', 1, type=int)
search = request.args.get('search', '')
user_type = request.args.get('type', '')
query = User.query
if search:
query = query.filter(
(User.username.contains(search)) | (User.email.contains(search))
)
if user_type:
query = query.filter_by(user_type=user_type)
users = query.order_by(desc(User.created_at)).paginate(page=page, per_page=20)
return render_template('admin/users.html',
users=users,
search=search,
user_type=user_type,
user_types=['free', 'vip_basic', 'vip_pro', 'vip_enterprise', 'admin']
)
@admin_bp.route('/user/<int:user_id>', methods=['GET', 'POST'])
@admin_required
def user_detail(user_id):
"""用户详情/编辑"""
user = User.query.get_or_404(user_id)
if request.method == 'POST':
data = request.json if request.is_json else request.form
user.username = data.get('username', user.username)
user.email = data.get('email', user.email)
user.user_type = data.get('user_type', user.user_type)
user.is_active = data.get('is_active', user.is_active) == 'true' or data.get('is_active') == True
user.is_admin = data.get('is_admin', user.is_admin) == 'true' or data.get('is_admin') == True
# 会员到期时间
expire_str = data.get('membership_expire')
if expire_str:
try:
user.membership_expire = datetime.fromisoformat(expire_str)
except:
pass
# 密码
new_password = data.get('new_password')
if new_password:
user.set_password(new_password)
db.session.commit()
# 记录日志
log = OperationLog(
user_id=session.get('user_id'),
username='admin',
action='edit_user',
target=user.username,
detail=json.dumps(data, ensure_ascii=False) if isinstance(data, dict) else str(data)
)
db.session.add(log)
db.session.commit()
if request.is_json:
return jsonify({'success': True, 'user': user.to_dict()})
flash('用户信息已更新', 'success')
return redirect(url_for('admin.user_detail', user_id=user_id))
translations = Translation.query.filter_by(user_id=user_id).order_by(desc(Translation.created_at)).limit(20).all()
packages = UserPackage.query.filter_by(user_id=user_id).order_by(desc(UserPackage.purchased_at)).all()
return render_template('admin/user_detail.html', user=user, translations=translations, packages=packages)
@admin_bp.route('/user/<int:user_id>/delete', methods=['POST'])
@admin_required
def delete_user(user_id):
"""删除用户"""
user = User.query.get_or_404(user_id)
if user.is_admin and User.query.filter_by(is_admin=True).count() <= 1:
return jsonify({'error': '不能删除最后一个管理员'}), 400
username = user.username
db.session.delete(user)
db.session.commit()
log = OperationLog(
user_id=session.get('user_id'),
username='admin',
action='delete_user',
target=username
)
db.session.add(log)
db.session.commit()
return jsonify({'success': True})
# ==================== 翻译记录管理 ====================
@admin_bp.route('/translations')
@admin_required
def translations():
"""翻译记录列表"""
page = request.args.get('page', 1, type=int)
status = request.args.get('status', '')
search = request.args.get('search', '')
query = Translation.query
if status:
query = query.filter_by(status=status)
if search:
query = query.filter(Translation.original_filename.contains(search))
translations = query.order_by(desc(Translation.created_at)).paginate(page=page, per_page=20)
return render_template('admin/translations.html',
translations=translations,
status=status,
search=search
)
@admin_bp.route('/translation/<int:trans_id>')
@admin_required
def translation_detail(trans_id):
"""翻译详情"""
translation = Translation.query.get_or_404(trans_id)
user = User.query.get(translation.user_id) if translation.user_id else None
return render_template('admin/translation_detail.html',
translation=translation,
user=user
)
@admin_bp.route('/translation/<int:trans_id>/delete', methods=['POST'])
@admin_required
def delete_translation(trans_id):
"""删除翻译记录"""
translation = Translation.query.get_or_404(trans_id)
if translation.output_path:
import os
if os.path.exists(translation.output_path):
os.remove(translation.output_path)
db.session.delete(translation)
db.session.commit()
return jsonify({'success': True})
# ==================== 缓存管理 ====================
@admin_bp.route('/cache')
@admin_required
def cache_list():
"""缓存列表"""
page = request.args.get('page', 1, type=int)
caches = TranslationCache.query.order_by(desc(TranslationCache.hit_count)).paginate(page=page, per_page=20)
total_size = db.session.query(func.sum(TranslationCache.file_size)).scalar() or 0
total_hits = db.session.query(func.sum(TranslationCache.hit_count)).scalar() or 0
return render_template('admin/cache.html',
caches=caches,
total_size=total_size,
total_hits=total_hits
)
@admin_bp.route('/cache/<int:cache_id>/delete', methods=['POST'])
@admin_required
def delete_cache(cache_id):
"""删除缓存"""
cache = TranslationCache.query.get_or_404(cache_id)
if cache.cache_path:
import os
if os.path.exists(cache.cache_path):
os.remove(cache.cache_path)
db.session.delete(cache)
db.session.commit()
return jsonify({'success': True})
@admin_bp.route('/cache/clear', methods=['POST'])
@admin_required
def clear_cache():
"""清空缓存"""
import os
caches = TranslationCache.query.all()
for cache in caches:
if cache.cache_path and os.path.exists(cache.cache_path):
os.remove(cache.cache_path)
TranslationCache.query.delete()
db.session.commit()
return jsonify({'success': True, 'deleted': len(caches)})
# ==================== 系统配置 V2 ====================
@admin_bp.route('/settings', methods=['GET', 'POST'])
@admin_required
def settings():
"""系统配置"""
if request.method == 'POST':
data = request.json if request.is_json else request.form
for key, value in data.items():
SystemConfig.set(key, value)
if request.is_json:
return jsonify({'success': True})
flash('配置已保存', 'success')
# 获取所有配置
configs = SystemConfig.query.all()
config_dict = {c.key: c.value for c in configs}
# 获取动态配置
dynamic_configs = DynamicConfig.query.all()
return render_template('admin/settings.html',
configs=config_dict,
dynamic_configs=dynamic_configs,
user_limits=USER_LIMITS,
membership_plans=MEMBERSHIP_PLANS
)
# ==================== 用户权限配置 ====================
@admin_bp.route('/settings/user-limits', methods=['GET', 'POST'])
@admin_required
def user_limits_settings():
"""用户权限配置"""
if request.method == 'POST':
data = request.json
# 保存每个用户类型的配置
for user_type, limits in data.items():
for key, value in limits.items():
config_key = f"user_limit_{user_type}_{key}"
DynamicConfig.set(
config_key,
value,
category='user_limits',
value_type='int' if isinstance(value, int) else 'string',
user_id=session.get('user_id')
)
return jsonify({'success': True})
# 获取当前配置
limits_config = {}
for user_type in ['guest', 'free', 'vip_basic', 'vip_pro', 'vip_enterprise']:
limits_config[user_type] = {
'daily_translations': DynamicConfig.get(f'user_limit_{user_type}_daily_translations',
USER_LIMITS.get(user_type, {}).get('daily_translations', 10)),
'max_pages': DynamicConfig.get(f'user_limit_{user_type}_max_pages',
USER_LIMITS.get(user_type, {}).get('max_pages', 50)),
'max_file_size': DynamicConfig.get(f'user_limit_{user_type}_max_file_size',
USER_LIMITS.get(user_type, {}).get('max_file_size', 30*1024*1024)),
}
return render_template('admin/user_limits.html',
limits_config=limits_config,
default_limits=USER_LIMITS
)
# ==================== 会员套餐配置 ====================
@admin_bp.route('/settings/membership', methods=['GET', 'POST'])
@admin_required
def membership_settings():
"""会员套餐配置"""
if request.method == 'POST':
data = request.json
for plan_key, plan_data in data.items():
for key, value in plan_data.items():
config_key = f"membership_{plan_key}_{key}"
value_type = 'int' if isinstance(value, int) else 'float' if isinstance(value, float) else 'string'
DynamicConfig.set(
config_key,
value,
category='membership',
value_type=value_type,
user_id=session.get('user_id')
)
return jsonify({'success': True})
# 获取当前配置
plans_config = {}
for plan_key in ['vip_basic', 'vip_pro', 'vip_enterprise']:
default = MEMBERSHIP_PLANS.get(plan_key, {})
plans_config[plan_key] = {
'name': DynamicConfig.get(f'membership_{plan_key}_name', default.get('name', plan_key)),
'price': DynamicConfig.get(f'membership_{plan_key}_price', default.get('price', 0)),
'period': DynamicConfig.get(f'membership_{plan_key}_period', default.get('period', 'month')),
'description': DynamicConfig.get(f'membership_{plan_key}_description', default.get('description', '')),
}
return render_template('admin/membership.html',
plans_config=plans_config,
default_plans=MEMBERSHIP_PLANS
)
# ==================== 数据包套餐管理 ====================
@admin_bp.route('/packages')
@admin_required
def packages():
"""数据包套餐列表"""
packages = DataPackage.query.order_by(DataPackage.sort_order).all()
return render_template('admin/packages.html', packages=packages)
@admin_bp.route('/package/add', methods=['GET', 'POST'])
@admin_required
def add_package():
"""添加数据包套餐"""
if request.method == 'POST':
data = request.json if request.is_json else request.form
package = DataPackage(
name=data.get('name'),
description=data.get('description'),
translation_count=int(data.get('translation_count', 0)),
price=float(data.get('price', 0)),
original_price=float(data.get('original_price')) if data.get('original_price') else None,
valid_days=int(data.get('valid_days', 30)),
sort_order=int(data.get('sort_order', 0)),
is_active=data.get('is_active', 'true') == 'true' if isinstance(data.get('is_active'), str) else data.get('is_active', True),
is_recommended=data.get('is_recommended', 'true') == 'true' if isinstance(data.get('is_recommended'), str) else data.get('is_recommended', False),
)
db.session.add(package)
db.session.commit()
if request.is_json:
return jsonify({'success': True, 'package': package.to_dict()})
flash('数据包套餐已添加', 'success')
return redirect(url_for('admin.packages'))
return render_template('admin/package_form.html', package=None)
@admin_bp.route('/package/<int:package_id>/edit', methods=['GET', 'POST'])
@admin_required
def edit_package(package_id):
"""编辑数据包套餐"""
package = DataPackage.query.get_or_404(package_id)
if request.method == 'POST':
data = request.json if request.is_json else request.form
package.name = data.get('name', package.name)
package.description = data.get('description', package.description)
package.translation_count = int(data.get('translation_count', package.translation_count))
package.price = float(data.get('price', package.price))
package.original_price = float(data.get('original_price')) if data.get('original_price') else None
package.valid_days = int(data.get('valid_days', package.valid_days))
package.sort_order = int(data.get('sort_order', package.sort_order))
package.is_active = data.get('is_active', 'true') == 'true' if isinstance(data.get('is_active'), str) else data.get('is_active', True)
package.is_recommended = data.get('is_recommended', 'true') == 'true' if isinstance(data.get('is_recommended'), str) else data.get('is_recommended', False)
db.session.commit()
if request.is_json:
return jsonify({'success': True, 'package': package.to_dict()})
flash('数据包套餐已更新', 'success')
return redirect(url_for('admin.packages'))
return render_template('admin/package_form.html', package=package)
@admin_bp.route('/package/<int:package_id>/delete', methods=['POST'])
@admin_required
def delete_package(package_id):
"""删除数据包套餐"""
package = DataPackage.query.get_or_404(package_id)
db.session.delete(package)
db.session.commit()
return jsonify({'success': True})
@admin_bp.route('/package/<int:package_id>/toggle', methods=['POST'])
@admin_required
def toggle_package(package_id):
"""切换数据包套餐状态"""
package = DataPackage.query.get_or_404(package_id)
package.is_active = not package.is_active
db.session.commit()
return jsonify({'success': True, 'is_active': package.is_active})
# ==================== 操作日志 ====================
@admin_bp.route('/logs')
@admin_required
def logs():
"""操作日志"""
page = request.args.get('page', 1, type=int)
action = request.args.get('action', '')
query = OperationLog.query
if action:
query = query.filter_by(action=action)
logs = query.order_by(desc(OperationLog.created_at)).paginate(page=page, per_page=50)
return render_template('admin/logs.html', logs=logs, action=action)
# ==================== 统计报表 ====================
@admin_bp.route('/stats')
@admin_required
def stats():
"""统计报表"""
# 用户增长趋势最近30天
user_growth = []
for i in range(29, -1, -1):
d = date.today() - timedelta(days=i)
count = User.query.filter(func.date(User.created_at) == d).count()
user_growth.append({'date': d.strftime('%Y-%m-%d'), 'count': count})
# 翻译量趋势最近30天
translation_growth = []
for i in range(29, -1, -1):
d = date.today() - timedelta(days=i)
count = Translation.query.filter(func.date(Translation.created_at) == d).count()
translation_growth.append({'date': d.strftime('%Y-%m-%d'), 'count': count})
# 收入趋势最近30天
revenue_growth = []
for i in range(29, -1, -1):
d = date.today() - timedelta(days=i)
revenue = db.session.query(func.sum(UserPackage.price_paid)).filter(
UserPackage.payment_status == 'paid',
func.date(UserPackage.purchased_at) == d
).scalar() or 0
revenue_growth.append({'date': d.strftime('%Y-%m-%d'), 'revenue': float(revenue)})
# 用户类型分布
user_distribution = db.session.query(
User.user_type, func.count(User.id)
).group_by(User.user_type).all()
# 翻译状态分布
status_distribution = db.session.query(
Translation.status, func.count(Translation.id)
).group_by(Translation.status).all()
# Top用户
top_users = db.session.query(
User.username, func.count(Translation.id).label('count')
).join(Translation).group_by(User.id).order_by(desc('count')).limit(10).all()
# 数据包销售排行
top_packages = db.session.query(
DataPackage.name, func.count(UserPackage.id).label('count'),
func.sum(UserPackage.price_paid).label('revenue')
).join(UserPackage).filter(
UserPackage.payment_status == 'paid'
).group_by(DataPackage.id).order_by(desc('count')).limit(10).all()
return render_template('admin/stats.html',
user_growth=user_growth,
translation_growth=translation_growth,
revenue_growth=revenue_growth,
user_distribution=user_distribution,
status_distribution=status_distribution,
top_users=top_users,
top_packages=top_packages
)
# ==================== API接口 ====================
@admin_bp.route('/api/stats/summary')
@admin_required
def api_stats_summary():
"""API: 统计摘要"""
return jsonify({
'total_users': User.query.count(),
'today_users': User.query.filter(func.date(User.created_at) == date.today()).count(),
'total_translations': Translation.query.count(),
'today_translations': Translation.query.filter(func.date(Translation.created_at) == date.today()).count(),
'total_cache': TranslationCache.query.count(),
'cache_hits': db.session.query(func.sum(TranslationCache.hit_count)).scalar() or 0,
'total_packages': DataPackage.query.filter_by(is_active=True).count(),
'packages_sold': UserPackage.query.filter_by(payment_status='paid').count(),
'total_revenue': float(db.session.query(func.sum(UserPackage.price_paid)).filter(
UserPackage.payment_status == 'paid'
).scalar() or 0),
})
@admin_bp.route('/api/user/<int:user_id>/upgrade', methods=['POST'])
@admin_required
def api_user_upgrade(user_id):
"""API: 升级用户会员"""
user = User.query.get_or_404(user_id)
data = request.json
user_type = data.get('user_type')
months = data.get('months', 1)
valid_types = ['free', 'vip_basic', 'vip_pro', 'vip_enterprise', 'admin']
if user_type not in valid_types:
return jsonify({'error': '无效的用户类型'}), 400
user.user_type = user_type
if user_type.startswith('vip'):
user.membership_expire = datetime.utcnow() + timedelta(days=30 * months)
db.session.commit()
return jsonify({'success': True, 'user': user.to_dict()})
@admin_bp.route('/api/user/<int:user_id>/add-package', methods=['POST'])
@admin_required
def api_user_add_package(user_id):
"""API: 为用户添加数据包"""
user = User.query.get_or_404(user_id)
data = request.json
package_id = data.get('package_id')
package = DataPackage.query.get(package_id)
if not package:
return jsonify({'error': '数据包不存在'}), 404
user_package = UserPackage(
user_id=user.id,
package_id=package.id,
package_name=package.name,
translation_count=package.translation_count,
remaining_count=package.translation_count,
expire_at=datetime.utcnow() + timedelta(days=package.valid_days) if package.valid_days > 0 else None,
price_paid=0, # 管理员赠送
payment_status='paid'
)
db.session.add(user_package)
db.session.commit()
return jsonify({'success': True, 'package': {
'id': user_package.id,
'name': user_package.package_name,
'remaining': user_package.remaining_count
}})