V2.0.0: 新增用户权限动态配置、会员套餐配置、数据包购买功能

新功能:
- 用户权限动态配置(翻译次数、页数限制)
- 会员套餐动态配置(名称、价格、周期)
- 数据包购买套餐管理
- 收入统计功能
- 数据包销售排行

技术更新:
- 新增 DynamicConfig 模型支持动态配置
- 新增 DataPackage 和 UserPackage 模型
- 后台管理增加数据包管理模块
This commit is contained in:
2026-04-07 23:26:53 +08:00
commit 2ef5e6da87
37 changed files with 6507 additions and 0 deletions

664
admin.py Normal file
View File

@@ -0,0 +1,664 @@
"""
后台管理模块 V2
"""
from datetime import datetime, date, timedelta
from functools import wraps
from flask import Blueprint, render_template, request, jsonify, redirect, url_for, session, flash
from sqlalchemy import func, desc
import json
from models import (db, User, Translation, TranslationCache, GuestTranslation,
SystemConfig, OperationLog, DataPackage, UserPackage, DynamicConfig)
from config import USER_LIMITS, MEMBERSHIP_PLANS
admin_bp = Blueprint('admin', __name__, url_prefix='/admin')
# ==================== 权限装饰器 ====================
def admin_required(f):
"""管理员权限装饰器"""
@wraps(f)
def decorated_function(*args, **kwargs):
user_id = session.get('user_id')
if not user_id:
return redirect(url_for('login', next=request.url))
user = User.query.get(user_id)
if not user or not user.is_admin:
flash('需要管理员权限', 'error')
return redirect(url_for('index'))
return f(*args, **kwargs)
return decorated_function
# ==================== 后台首页 ====================
@admin_bp.route('/')
@admin_required
def dashboard():
"""后台首页 - 数据概览"""
# 用户统计
total_users = User.query.count()
new_users_today = User.query.filter(
func.date(User.created_at) == date.today()
).count()
vip_users = User.query.filter(User.user_type.startswith('vip')).count()
# 翻译统计
total_translations = Translation.query.count()
today_translations = Translation.query.filter(
func.date(Translation.created_at) == date.today()
).count()
# 缓存统计
total_cache = TranslationCache.query.count()
total_cache_hits = db.session.query(func.sum(TranslationCache.hit_count)).scalar() or 0
total_cache_size = db.session.query(func.sum(TranslationCache.file_size)).scalar() or 0
# 数据包销售统计
total_packages_sold = UserPackage.query.filter_by(payment_status='paid').count()
total_revenue = db.session.query(func.sum(UserPackage.price_paid)).filter(
UserPackage.payment_status == 'paid'
).scalar() or 0
# 最近翻译
recent_translations = Translation.query.order_by(desc(Translation.created_at)).limit(10).all()
# 最近用户
recent_users = User.query.order_by(desc(User.created_at)).limit(10).all()
# 每日翻译趋势最近7天
daily_stats = []
for i in range(6, -1, -1):
d = date.today() - timedelta(days=i)
count = Translation.query.filter(func.date(Translation.created_at) == d).count()
daily_stats.append({'date': d.strftime('%m-%d'), 'count': count})
return render_template('admin/dashboard.html',
total_users=total_users,
new_users_today=new_users_today,
vip_users=vip_users,
total_translations=total_translations,
today_translations=today_translations,
total_cache=total_cache,
total_cache_hits=total_cache_hits,
total_cache_size=total_cache_size,
total_packages_sold=total_packages_sold,
total_revenue=total_revenue,
recent_translations=recent_translations,
recent_users=recent_users,
daily_stats=daily_stats,
)
# ==================== 用户管理 ====================
@admin_bp.route('/users')
@admin_required
def users():
"""用户列表"""
page = request.args.get('page', 1, type=int)
search = request.args.get('search', '')
user_type = request.args.get('type', '')
query = User.query
if search:
query = query.filter(
(User.username.contains(search)) | (User.email.contains(search))
)
if user_type:
query = query.filter_by(user_type=user_type)
users = query.order_by(desc(User.created_at)).paginate(page=page, per_page=20)
return render_template('admin/users.html',
users=users,
search=search,
user_type=user_type,
user_types=['free', 'vip_basic', 'vip_pro', 'vip_enterprise', 'admin']
)
@admin_bp.route('/user/<int:user_id>', methods=['GET', 'POST'])
@admin_required
def user_detail(user_id):
"""用户详情/编辑"""
user = User.query.get_or_404(user_id)
if request.method == 'POST':
data = request.json if request.is_json else request.form
user.username = data.get('username', user.username)
user.email = data.get('email', user.email)
user.user_type = data.get('user_type', user.user_type)
user.is_active = data.get('is_active', user.is_active) == 'true' or data.get('is_active') == True
user.is_admin = data.get('is_admin', user.is_admin) == 'true' or data.get('is_admin') == True
# 会员到期时间
expire_str = data.get('membership_expire')
if expire_str:
try:
user.membership_expire = datetime.fromisoformat(expire_str)
except:
pass
# 密码
new_password = data.get('new_password')
if new_password:
user.set_password(new_password)
db.session.commit()
# 记录日志
log = OperationLog(
user_id=session.get('user_id'),
username='admin',
action='edit_user',
target=user.username,
detail=json.dumps(data, ensure_ascii=False) if isinstance(data, dict) else str(data)
)
db.session.add(log)
db.session.commit()
if request.is_json:
return jsonify({'success': True, 'user': user.to_dict()})
flash('用户信息已更新', 'success')
return redirect(url_for('admin.user_detail', user_id=user_id))
translations = Translation.query.filter_by(user_id=user_id).order_by(desc(Translation.created_at)).limit(20).all()
packages = UserPackage.query.filter_by(user_id=user_id).order_by(desc(UserPackage.purchased_at)).all()
return render_template('admin/user_detail.html', user=user, translations=translations, packages=packages)
@admin_bp.route('/user/<int:user_id>/delete', methods=['POST'])
@admin_required
def delete_user(user_id):
"""删除用户"""
user = User.query.get_or_404(user_id)
if user.is_admin and User.query.filter_by(is_admin=True).count() <= 1:
return jsonify({'error': '不能删除最后一个管理员'}), 400
username = user.username
db.session.delete(user)
db.session.commit()
log = OperationLog(
user_id=session.get('user_id'),
username='admin',
action='delete_user',
target=username
)
db.session.add(log)
db.session.commit()
return jsonify({'success': True})
# ==================== 翻译记录管理 ====================
@admin_bp.route('/translations')
@admin_required
def translations():
"""翻译记录列表"""
page = request.args.get('page', 1, type=int)
status = request.args.get('status', '')
search = request.args.get('search', '')
query = Translation.query
if status:
query = query.filter_by(status=status)
if search:
query = query.filter(Translation.original_filename.contains(search))
translations = query.order_by(desc(Translation.created_at)).paginate(page=page, per_page=20)
return render_template('admin/translations.html',
translations=translations,
status=status,
search=search
)
@admin_bp.route('/translation/<int:trans_id>')
@admin_required
def translation_detail(trans_id):
"""翻译详情"""
translation = Translation.query.get_or_404(trans_id)
user = User.query.get(translation.user_id) if translation.user_id else None
return render_template('admin/translation_detail.html',
translation=translation,
user=user
)
@admin_bp.route('/translation/<int:trans_id>/delete', methods=['POST'])
@admin_required
def delete_translation(trans_id):
"""删除翻译记录"""
translation = Translation.query.get_or_404(trans_id)
if translation.output_path:
import os
if os.path.exists(translation.output_path):
os.remove(translation.output_path)
db.session.delete(translation)
db.session.commit()
return jsonify({'success': True})
# ==================== 缓存管理 ====================
@admin_bp.route('/cache')
@admin_required
def cache_list():
"""缓存列表"""
page = request.args.get('page', 1, type=int)
caches = TranslationCache.query.order_by(desc(TranslationCache.hit_count)).paginate(page=page, per_page=20)
total_size = db.session.query(func.sum(TranslationCache.file_size)).scalar() or 0
total_hits = db.session.query(func.sum(TranslationCache.hit_count)).scalar() or 0
return render_template('admin/cache.html',
caches=caches,
total_size=total_size,
total_hits=total_hits
)
@admin_bp.route('/cache/<int:cache_id>/delete', methods=['POST'])
@admin_required
def delete_cache(cache_id):
"""删除缓存"""
cache = TranslationCache.query.get_or_404(cache_id)
if cache.cache_path:
import os
if os.path.exists(cache.cache_path):
os.remove(cache.cache_path)
db.session.delete(cache)
db.session.commit()
return jsonify({'success': True})
@admin_bp.route('/cache/clear', methods=['POST'])
@admin_required
def clear_cache():
"""清空缓存"""
import os
caches = TranslationCache.query.all()
for cache in caches:
if cache.cache_path and os.path.exists(cache.cache_path):
os.remove(cache.cache_path)
TranslationCache.query.delete()
db.session.commit()
return jsonify({'success': True, 'deleted': len(caches)})
# ==================== 系统配置 V2 ====================
@admin_bp.route('/settings', methods=['GET', 'POST'])
@admin_required
def settings():
"""系统配置"""
if request.method == 'POST':
data = request.json if request.is_json else request.form
for key, value in data.items():
SystemConfig.set(key, value)
if request.is_json:
return jsonify({'success': True})
flash('配置已保存', 'success')
# 获取所有配置
configs = SystemConfig.query.all()
config_dict = {c.key: c.value for c in configs}
# 获取动态配置
dynamic_configs = DynamicConfig.query.all()
return render_template('admin/settings.html',
configs=config_dict,
dynamic_configs=dynamic_configs,
user_limits=USER_LIMITS,
membership_plans=MEMBERSHIP_PLANS
)
# ==================== 用户权限配置 ====================
@admin_bp.route('/settings/user-limits', methods=['GET', 'POST'])
@admin_required
def user_limits_settings():
"""用户权限配置"""
if request.method == 'POST':
data = request.json
# 保存每个用户类型的配置
for user_type, limits in data.items():
for key, value in limits.items():
config_key = f"user_limit_{user_type}_{key}"
DynamicConfig.set(
config_key,
value,
category='user_limits',
value_type='int' if isinstance(value, int) else 'string',
user_id=session.get('user_id')
)
return jsonify({'success': True})
# 获取当前配置
limits_config = {}
for user_type in ['guest', 'free', 'vip_basic', 'vip_pro', 'vip_enterprise']:
limits_config[user_type] = {
'daily_translations': DynamicConfig.get(f'user_limit_{user_type}_daily_translations',
USER_LIMITS.get(user_type, {}).get('daily_translations', 10)),
'max_pages': DynamicConfig.get(f'user_limit_{user_type}_max_pages',
USER_LIMITS.get(user_type, {}).get('max_pages', 50)),
'max_file_size': DynamicConfig.get(f'user_limit_{user_type}_max_file_size',
USER_LIMITS.get(user_type, {}).get('max_file_size', 30*1024*1024)),
}
return render_template('admin/user_limits.html',
limits_config=limits_config,
default_limits=USER_LIMITS
)
# ==================== 会员套餐配置 ====================
@admin_bp.route('/settings/membership', methods=['GET', 'POST'])
@admin_required
def membership_settings():
"""会员套餐配置"""
if request.method == 'POST':
data = request.json
for plan_key, plan_data in data.items():
for key, value in plan_data.items():
config_key = f"membership_{plan_key}_{key}"
value_type = 'int' if isinstance(value, int) else 'float' if isinstance(value, float) else 'string'
DynamicConfig.set(
config_key,
value,
category='membership',
value_type=value_type,
user_id=session.get('user_id')
)
return jsonify({'success': True})
# 获取当前配置
plans_config = {}
for plan_key in ['vip_basic', 'vip_pro', 'vip_enterprise']:
default = MEMBERSHIP_PLANS.get(plan_key, {})
plans_config[plan_key] = {
'name': DynamicConfig.get(f'membership_{plan_key}_name', default.get('name', plan_key)),
'price': DynamicConfig.get(f'membership_{plan_key}_price', default.get('price', 0)),
'period': DynamicConfig.get(f'membership_{plan_key}_period', default.get('period', 'month')),
'description': DynamicConfig.get(f'membership_{plan_key}_description', default.get('description', '')),
}
return render_template('admin/membership.html',
plans_config=plans_config,
default_plans=MEMBERSHIP_PLANS
)
# ==================== 数据包套餐管理 ====================
@admin_bp.route('/packages')
@admin_required
def packages():
"""数据包套餐列表"""
packages = DataPackage.query.order_by(DataPackage.sort_order).all()
return render_template('admin/packages.html', packages=packages)
@admin_bp.route('/package/add', methods=['GET', 'POST'])
@admin_required
def add_package():
"""添加数据包套餐"""
if request.method == 'POST':
data = request.json if request.is_json else request.form
package = DataPackage(
name=data.get('name'),
description=data.get('description'),
translation_count=int(data.get('translation_count', 0)),
price=float(data.get('price', 0)),
original_price=float(data.get('original_price')) if data.get('original_price') else None,
valid_days=int(data.get('valid_days', 30)),
sort_order=int(data.get('sort_order', 0)),
is_active=data.get('is_active', 'true') == 'true' if isinstance(data.get('is_active'), str) else data.get('is_active', True),
is_recommended=data.get('is_recommended', 'true') == 'true' if isinstance(data.get('is_recommended'), str) else data.get('is_recommended', False),
)
db.session.add(package)
db.session.commit()
if request.is_json:
return jsonify({'success': True, 'package': package.to_dict()})
flash('数据包套餐已添加', 'success')
return redirect(url_for('admin.packages'))
return render_template('admin/package_form.html', package=None)
@admin_bp.route('/package/<int:package_id>/edit', methods=['GET', 'POST'])
@admin_required
def edit_package(package_id):
"""编辑数据包套餐"""
package = DataPackage.query.get_or_404(package_id)
if request.method == 'POST':
data = request.json if request.is_json else request.form
package.name = data.get('name', package.name)
package.description = data.get('description', package.description)
package.translation_count = int(data.get('translation_count', package.translation_count))
package.price = float(data.get('price', package.price))
package.original_price = float(data.get('original_price')) if data.get('original_price') else None
package.valid_days = int(data.get('valid_days', package.valid_days))
package.sort_order = int(data.get('sort_order', package.sort_order))
package.is_active = data.get('is_active', 'true') == 'true' if isinstance(data.get('is_active'), str) else data.get('is_active', True)
package.is_recommended = data.get('is_recommended', 'true') == 'true' if isinstance(data.get('is_recommended'), str) else data.get('is_recommended', False)
db.session.commit()
if request.is_json:
return jsonify({'success': True, 'package': package.to_dict()})
flash('数据包套餐已更新', 'success')
return redirect(url_for('admin.packages'))
return render_template('admin/package_form.html', package=package)
@admin_bp.route('/package/<int:package_id>/delete', methods=['POST'])
@admin_required
def delete_package(package_id):
"""删除数据包套餐"""
package = DataPackage.query.get_or_404(package_id)
db.session.delete(package)
db.session.commit()
return jsonify({'success': True})
@admin_bp.route('/package/<int:package_id>/toggle', methods=['POST'])
@admin_required
def toggle_package(package_id):
"""切换数据包套餐状态"""
package = DataPackage.query.get_or_404(package_id)
package.is_active = not package.is_active
db.session.commit()
return jsonify({'success': True, 'is_active': package.is_active})
# ==================== 操作日志 ====================
@admin_bp.route('/logs')
@admin_required
def logs():
"""操作日志"""
page = request.args.get('page', 1, type=int)
action = request.args.get('action', '')
query = OperationLog.query
if action:
query = query.filter_by(action=action)
logs = query.order_by(desc(OperationLog.created_at)).paginate(page=page, per_page=50)
return render_template('admin/logs.html', logs=logs, action=action)
# ==================== 统计报表 ====================
@admin_bp.route('/stats')
@admin_required
def stats():
"""统计报表"""
# 用户增长趋势最近30天
user_growth = []
for i in range(29, -1, -1):
d = date.today() - timedelta(days=i)
count = User.query.filter(func.date(User.created_at) == d).count()
user_growth.append({'date': d.strftime('%Y-%m-%d'), 'count': count})
# 翻译量趋势最近30天
translation_growth = []
for i in range(29, -1, -1):
d = date.today() - timedelta(days=i)
count = Translation.query.filter(func.date(Translation.created_at) == d).count()
translation_growth.append({'date': d.strftime('%Y-%m-%d'), 'count': count})
# 收入趋势最近30天
revenue_growth = []
for i in range(29, -1, -1):
d = date.today() - timedelta(days=i)
revenue = db.session.query(func.sum(UserPackage.price_paid)).filter(
UserPackage.payment_status == 'paid',
func.date(UserPackage.purchased_at) == d
).scalar() or 0
revenue_growth.append({'date': d.strftime('%Y-%m-%d'), 'revenue': float(revenue)})
# 用户类型分布
user_distribution = db.session.query(
User.user_type, func.count(User.id)
).group_by(User.user_type).all()
# 翻译状态分布
status_distribution = db.session.query(
Translation.status, func.count(Translation.id)
).group_by(Translation.status).all()
# Top用户
top_users = db.session.query(
User.username, func.count(Translation.id).label('count')
).join(Translation).group_by(User.id).order_by(desc('count')).limit(10).all()
# 数据包销售排行
top_packages = db.session.query(
DataPackage.name, func.count(UserPackage.id).label('count'),
func.sum(UserPackage.price_paid).label('revenue')
).join(UserPackage).filter(
UserPackage.payment_status == 'paid'
).group_by(DataPackage.id).order_by(desc('count')).limit(10).all()
return render_template('admin/stats.html',
user_growth=user_growth,
translation_growth=translation_growth,
revenue_growth=revenue_growth,
user_distribution=user_distribution,
status_distribution=status_distribution,
top_users=top_users,
top_packages=top_packages
)
# ==================== API接口 ====================
@admin_bp.route('/api/stats/summary')
@admin_required
def api_stats_summary():
"""API: 统计摘要"""
return jsonify({
'total_users': User.query.count(),
'today_users': User.query.filter(func.date(User.created_at) == date.today()).count(),
'total_translations': Translation.query.count(),
'today_translations': Translation.query.filter(func.date(Translation.created_at) == date.today()).count(),
'total_cache': TranslationCache.query.count(),
'cache_hits': db.session.query(func.sum(TranslationCache.hit_count)).scalar() or 0,
'total_packages': DataPackage.query.filter_by(is_active=True).count(),
'packages_sold': UserPackage.query.filter_by(payment_status='paid').count(),
'total_revenue': float(db.session.query(func.sum(UserPackage.price_paid)).filter(
UserPackage.payment_status == 'paid'
).scalar() or 0),
})
@admin_bp.route('/api/user/<int:user_id>/upgrade', methods=['POST'])
@admin_required
def api_user_upgrade(user_id):
"""API: 升级用户会员"""
user = User.query.get_or_404(user_id)
data = request.json
user_type = data.get('user_type')
months = data.get('months', 1)
valid_types = ['free', 'vip_basic', 'vip_pro', 'vip_enterprise', 'admin']
if user_type not in valid_types:
return jsonify({'error': '无效的用户类型'}), 400
user.user_type = user_type
if user_type.startswith('vip'):
user.membership_expire = datetime.utcnow() + timedelta(days=30 * months)
db.session.commit()
return jsonify({'success': True, 'user': user.to_dict()})
@admin_bp.route('/api/user/<int:user_id>/add-package', methods=['POST'])
@admin_required
def api_user_add_package(user_id):
"""API: 为用户添加数据包"""
user = User.query.get_or_404(user_id)
data = request.json
package_id = data.get('package_id')
package = DataPackage.query.get(package_id)
if not package:
return jsonify({'error': '数据包不存在'}), 404
user_package = UserPackage(
user_id=user.id,
package_id=package.id,
package_name=package.name,
translation_count=package.translation_count,
remaining_count=package.translation_count,
expire_at=datetime.utcnow() + timedelta(days=package.valid_days) if package.valid_days > 0 else None,
price_paid=0, # 管理员赠送
payment_status='paid'
)
db.session.add(user_package)
db.session.commit()
return jsonify({'success': True, 'package': {
'id': user_package.id,
'name': user_package.package_name,
'remaining': user_package.remaining_count
}})