Files
pdf-translate-web/models.py
coder 8c35a8741a feat: 系统配置支持动态增删用户类型和会员套餐
新增功能:
- UserTypeConfig 模型:用户类型配置支持动态增删
- MembershipPlanConfig 模型:会员套餐配置支持动态增删
- 用户类型管理页面:添加、编辑、删除、启用/禁用用户类型
- 会员套餐管理页面:添加、编辑、删除、上架/下架、推荐套餐
- 功能权限配置:支持选择功能列表
- 初始化默认配置功能

改进:
- settings.html 页面重构,提供配置入口链接
- 新增API接口支持增删改查操作
2026-04-11 10:25:03 +08:00

523 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
数据库模型定义
"""
from datetime import datetime
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import generate_password_hash, check_password_hash
import hashlib
db = SQLAlchemy()
# ==================== 用户模型 ====================
class User(db.Model):
"""用户表"""
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(256), nullable=False)
# 用户类型: guest, free, vip_basic, vip_pro, vip_enterprise, admin
user_type = db.Column(db.String(20), default='free')
# 会员信息
membership_expire = db.Column(db.DateTime, nullable=True) # 会员到期时间
# 使用统计
daily_count = db.Column(db.Integer, default=0) # 今日翻译次数
total_count = db.Column(db.Integer, default=0) # 总翻译次数
last_translate_date = db.Column(db.Date, nullable=True) # 最后翻译日期
# 时间戳
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 状态
is_active = db.Column(db.Boolean, default=True) # 是否启用
is_admin = db.Column(db.Boolean, default=False) # 是否管理员
# 关系
translations = db.relationship('Translation', backref='user', lazy=True)
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
def is_vip(self):
"""检查是否为付费会员"""
if self.user_type.startswith('vip'):
if self.membership_expire and self.membership_expire > datetime.utcnow():
return True
# 过期则降级为免费用户
self.user_type = 'free'
self.membership_expire = None
db.session.commit()
return False
def can_translate(self, pages, config):
"""检查是否可以翻译(次数、页数限制)"""
limits = config['USER_LIMITS'].get(self.user_type, config['USER_LIMITS']['free'])
# 检查页数限制
max_pages = limits['max_pages']
if max_pages > 0 and pages > max_pages:
return False, f"PDF页数超出限制最大{max_pages}页)"
# 检查每日次数限制 - 使用上海时间UTC+8
from datetime import timezone, timedelta
shanghai_tz = timezone(timedelta(hours=8))
today = datetime.now(shanghai_tz).date()
if self.last_translate_date != today:
self.daily_count = 0
self.last_translate_date = today
daily_limit = limits['daily_translations']
if daily_limit > 0 and self.daily_count >= daily_limit:
return False, f"今日翻译次数已达上限({daily_limit}次)"
return True, "OK"
def increment_count(self):
"""增加翻译计数"""
from datetime import timezone, timedelta
shanghai_tz = timezone(timedelta(hours=8))
today = datetime.now(shanghai_tz).date()
if self.last_translate_date != today:
self.daily_count = 0
self.last_translate_date = today
self.daily_count += 1
self.total_count += 1
db.session.commit()
def to_dict(self):
return {
'id': self.id,
'username': self.username,
'email': self.email,
'user_type': self.user_type,
'is_vip': self.is_vip(),
'is_admin': self.is_admin,
'is_active': self.is_active,
'daily_count': self.daily_count,
'total_count': self.total_count,
'created_at': self.created_at.isoformat() if self.created_at else None,
'membership_expire': self.membership_expire.isoformat() if self.membership_expire else None,
}
# ==================== 翻译记录模型 ====================
class Translation(db.Model):
"""翻译记录表"""
__tablename__ = 'translations'
id = db.Column(db.Integer, primary_key=True)
# 用户关联
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=True) # guest可为null
# 文件信息
file_hash = db.Column(db.String(64), nullable=False) # 文件MD5哈希
original_filename = db.Column(db.String(255), nullable=False)
file_size = db.Column(db.Integer, nullable=False)
page_count = db.Column(db.Integer, nullable=False)
# 翻译信息
source_language = db.Column(db.String(10), default='en')
target_language = db.Column(db.String(10), default='zh')
translate_params = db.Column(db.Text, nullable=True) # JSON格式的翻译参数
# 状态
status = db.Column(db.String(20), default='pending') # pending, processing, completed, failed
progress = db.Column(db.Integer, default=0) # 翻译进度 0-100
error_message = db.Column(db.Text, nullable=True)
# 输出
output_path = db.Column(db.String(255), nullable=True) # 翻译结果文件路径
# 时间戳
created_at = db.Column(db.DateTime, default=datetime.utcnow)
completed_at = db.Column(db.DateTime, nullable=True)
# 是否来自缓存
from_cache = db.Column(db.Boolean, default=False)
# 重译信息
retranslate_request = db.Column(db.Text, nullable=True) # 重译要求
parent_id = db.Column(db.Integer, db.ForeignKey('translations.id'), nullable=True) # 原翻译ID
def to_dict(self):
return {
'id': self.id,
'filename': self.original_filename,
'pages': self.page_count,
'status': self.status,
'progress': self.progress,
'from_cache': self.from_cache,
'file_size': self.file_size,
'created_at': self.created_at.isoformat() if self.created_at else None,
'completed_at': self.completed_at.isoformat() if self.completed_at else None,
'user_id': self.user_id,
}
# ==================== 翻译缓存模型 ====================
class TranslationCache(db.Model):
"""翻译缓存表"""
__tablename__ = 'translation_cache'
id = db.Column(db.Integer, primary_key=True)
# 文件哈希
file_hash = db.Column(db.String(64), unique=True, nullable=False)
# 缓存信息
cache_path = db.Column(db.String(255), nullable=False) # 缓存文件路径
page_count = db.Column(db.Integer, nullable=False)
file_size = db.Column(db.Integer, default=0)
# 统计
hit_count = db.Column(db.Integer, default=0) # 缓存命中次数
# 时间戳
created_at = db.Column(db.DateTime, default=datetime.utcnow)
expires_at = db.Column(db.DateTime, nullable=True)
def increment_hit(self):
self.hit_count += 1
db.session.commit()
@staticmethod
def compute_hash(file_content):
"""计算文件哈希"""
return hashlib.md5(file_content).hexdigest()
# ==================== 访客翻译记录 ====================
class GuestTranslation(db.Model):
"""访客翻译记录基于IP或Session"""
__tablename__ = 'guest_translations'
id = db.Column(db.Integer, primary_key=True)
# 访客标识
session_id = db.Column(db.String(64), nullable=False) # Session ID
ip_address = db.Column(db.String(45), nullable=True)
# 统计
daily_count = db.Column(db.Integer, default=0)
total_count = db.Column(db.Integer, default=0)
last_translate_date = db.Column(db.Date, nullable=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
# ==================== 系统配置模型 ====================
class SystemConfig(db.Model):
"""系统配置表"""
__tablename__ = 'system_config'
id = db.Column(db.Integer, primary_key=True)
key = db.Column(db.String(100), unique=True, nullable=False)
value = db.Column(db.Text, nullable=True)
description = db.Column(db.String(255), nullable=True)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
@staticmethod
def get(key, default=None):
config = SystemConfig.query.filter_by(key=key).first()
return config.value if config else default
@staticmethod
def set(key, value, description=None):
config = SystemConfig.query.filter_by(key=key).first()
if config:
config.value = value
else:
config = SystemConfig(key=key, value=value, description=description)
db.session.add(config)
db.session.commit()
# ==================== 操作日志模型 ====================
class OperationLog(db.Model):
"""操作日志表"""
__tablename__ = 'operation_logs'
id = db.Column(db.Integer, primary_key=True)
# 操作者
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=True)
username = db.Column(db.String(80), nullable=True)
# 操作信息
action = db.Column(db.String(50), nullable=False) # login, translate, register, etc.
target = db.Column(db.String(100), nullable=True) # 操作对象
detail = db.Column(db.Text, nullable=True) # 详细信息(JSON)
# IP地址
ip_address = db.Column(db.String(45), nullable=True)
# 时间
created_at = db.Column(db.DateTime, default=datetime.utcnow)
def to_dict(self):
return {
'id': self.id,
'username': self.username,
'action': self.action,
'target': self.target,
'ip_address': self.ip_address,
'created_at': self.created_at.isoformat() if self.created_at else None,
}
# ==================== 数据包套餐模型 ====================
class DataPackage(db.Model):
"""数据包购买套餐"""
__tablename__ = 'data_packages'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False) # 套餐名称
description = db.Column(db.String(255), nullable=True) # 描述
# 翻译次数
translation_count = db.Column(db.Integer, default=0) # 翻译次数(-1表示无限
# 价格
price = db.Column(db.Float, default=0) # 价格
original_price = db.Column(db.Float, nullable=True) # 原价(用于显示折扣)
# 有效期
valid_days = db.Column(db.Integer, default=30) # 有效天数0表示永久
# 排序和状态
sort_order = db.Column(db.Integer, default=0) # 排序
is_active = db.Column(db.Boolean, default=True) # 是否上架
is_recommended = db.Column(db.Boolean, default=False) # 是否推荐
# 时间
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'description': self.description,
'translation_count': self.translation_count,
'price': self.price,
'original_price': self.original_price,
'valid_days': self.valid_days,
'is_active': self.is_active,
'is_recommended': self.is_recommended,
}
# ==================== 用户数据包购买记录 ====================
class UserPackage(db.Model):
"""用户购买的数据包"""
__tablename__ = 'user_packages'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
package_id = db.Column(db.Integer, db.ForeignKey('data_packages.id'), nullable=False)
# 套餐信息快照
package_name = db.Column(db.String(100), nullable=False)
translation_count = db.Column(db.Integer, default=0) # 总次数
remaining_count = db.Column(db.Integer, default=0) # 剩余次数
# 有效期
expire_at = db.Column(db.DateTime, nullable=True) # 过期时间None表示永久
# 购买信息
price_paid = db.Column(db.Float, default=0) # 实付金额
payment_method = db.Column(db.String(20), nullable=True) # 支付方式
payment_status = db.Column(db.String(20), default='pending') # pending, paid, refunded
# 时间
purchased_at = db.Column(db.DateTime, default=datetime.utcnow)
# 关系
user = db.relationship('User', backref=db.backref('packages', lazy=True))
package = db.relationship('DataPackage', backref=db.backref('purchases', lazy=True))
def is_valid(self):
"""检查数据包是否有效"""
if self.remaining_count <= 0:
return False
if self.expire_at and self.expire_at < datetime.utcnow():
return False
return True
# ==================== 系统配置(动态) ====================
class DynamicConfig(db.Model):
"""动态系统配置(可在页面修改)"""
__tablename__ = 'dynamic_config'
id = db.Column(db.Integer, primary_key=True)
category = db.Column(db.String(50), default='general') # 分类: general, user_limits, membership
key = db.Column(db.String(100), unique=True, nullable=False)
value = db.Column(db.Text, nullable=True)
value_type = db.Column(db.String(20), default='string') # string, int, float, bool, json
description = db.Column(db.String(255), nullable=True)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
updated_by = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=True)
@staticmethod
def get(key, default=None):
config = DynamicConfig.query.filter_by(key=key).first()
if not config:
return default
# 类型转换
if config.value_type == 'int':
return int(config.value) if config.value else 0
elif config.value_type == 'float':
return float(config.value) if config.value else 0.0
elif config.value_type == 'bool':
return config.value.lower() in ('true', '1', 'yes')
elif config.value_type == 'json':
import json
return json.loads(config.value) if config.value else {}
return config.value
@staticmethod
def set(key, value, category='general', value_type='string', description=None, user_id=None):
config = DynamicConfig.query.filter_by(key=key).first()
# 类型转换
if value_type == 'json':
import json
value = json.dumps(value, ensure_ascii=False)
elif value_type == 'bool':
value = 'true' if value else 'false'
else:
value = str(value) if value is not None else None
if config:
config.value = value
config.value_type = value_type
config.updated_by = user_id
else:
config = DynamicConfig(
category=category,
key=key,
value=value,
value_type=value_type,
description=description,
updated_by=user_id
)
db.session.add(config)
db.session.commit()
return config
# ==================== 用户类型配置(动态增删) ====================
class UserTypeConfig(db.Model):
"""用户类型配置表 - 支持动态增删"""
__tablename__ = 'user_type_config'
id = db.Column(db.Integer, primary_key=True)
type_key = db.Column(db.String(50), unique=True, nullable=False) # 类型标识: guest, free, vip_basic, etc.
display_name = db.Column(db.String(100), nullable=False) # 显示名称: 访客, 免费用户, 基础会员
# 权限配置
daily_translations = db.Column(db.Integer, default=10) # 每日翻译次数 (-1=无限)
max_pages = db.Column(db.Integer, default=50) # 最大页数 (-1=无限)
max_file_size = db.Column(db.Integer, default=30*1024*1024) # 最大文件大小(bytes)
# 功能列表(JSON)
features = db.Column(db.Text, default='[]') # JSON数组: ["basic_translate", "history"]
# 排序和状态
sort_order = db.Column(db.Integer, default=0) # 排序权重
is_active = db.Column(db.Boolean, default=True) # 是否启用
is_system = db.Column(db.Boolean, default=False) # 是否系统内置(不可删除)
# 时间
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def get_features(self):
"""获取功能列表"""
import json
try:
return json.loads(self.features) if self.features else []
except:
return []
def set_features(self, feature_list):
"""设置功能列表"""
import json
self.features = json.dumps(feature_list, ensure_ascii=False)
def to_dict(self):
return {
'id': self.id,
'type_key': self.type_key,
'display_name': self.display_name,
'daily_translations': self.daily_translations,
'max_pages': self.max_pages,
'max_file_size': self.max_file_size,
'features': self.get_features(),
'sort_order': self.sort_order,
'is_active': self.is_active,
'is_system': self.is_system,
}
# ==================== 会员套餐配置(动态增删) ====================
class MembershipPlanConfig(db.Model):
"""会员套餐配置表 - 支持动态增删"""
__tablename__ = 'membership_plan_config'
id = db.Column(db.Integer, primary_key=True)
plan_key = db.Column(db.String(50), unique=True, nullable=False) # 套餐标识: vip_basic, vip_pro, etc.
display_name = db.Column(db.String(100), nullable=False) # 显示名称: 基础会员
# 价格配置
price = db.Column(db.Float, default=0) # 价格
original_price = db.Column(db.Float, nullable=True) # 原价(用于折扣显示)
period = db.Column(db.String(20), default='month') # 周期: month, quarter, year
period_days = db.Column(db.Integer, default=30) # 周期天数
# 描述
description = db.Column(db.String(255), nullable=True) # 套餐描述
# 对应的用户类型
user_type_key = db.Column(db.String(50), nullable=True) # 购买后升级到的用户类型
# 排序和状态
sort_order = db.Column(db.Integer, default=0)
is_active = db.Column(db.Boolean, default=True)
is_recommended = db.Column(db.Boolean, default=False) # 是否推荐
is_system = db.Column(db.Boolean, default=False) # 是否系统内置(不可删除)
# 时间
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def to_dict(self):
return {
'id': self.id,
'plan_key': self.plan_key,
'display_name': self.display_name,
'price': self.price,
'original_price': self.original_price,
'period': self.period,
'period_days': self.period_days,
'description': self.description,
'user_type_key': self.user_type_key,
'sort_order': self.sort_order,
'is_active': self.is_active,
'is_recommended': self.is_recommended,
'is_system': self.is_system,
}