Files
pdf-translate-web/app.py
coder 44077796f8 feat: 翻译记录添加不共享开关功能
- Translation 模型新增 no_share 字段
- 管理后台翻译记录页面添加共享状态列和切换按钮
- 不共享的翻译不会被其他用户使用缓存
- 缓存匹配时检查是否有 no_share 标记
2026-04-16 19:06:43 +08:00

989 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
PDF翻译网站主应用
"""
import os
import json
import uuid
import hashlib
from datetime import datetime, date
from functools import wraps
from flask import Flask, request, jsonify, render_template, send_file, session, redirect, url_for
from flask_sqlalchemy import SQLAlchemy
from werkzeug.utils import secure_filename
from config import *
from models import (db, User, Translation, TranslationCache, GuestTranslation,
DataPackage, UserPackage, DynamicConfig, UserRecharge, UserRefund,
MembershipPurchase, AccountTransaction, MembershipPlanConfig, UserTypeConfig,
UserInvitation, InviteRewardConfig, EmailNotification, EmailTemplateConfig)
from email_service import email_service
from services import TranslationService, CacheService, TranslationTask
from admin import admin_bp
# ==================== 创建应用 ====================
app = Flask(__name__)
app.config['SECRET_KEY'] = SECRET_KEY
app.config['SQLALCHEMY_DATABASE_URI'] = DATABASE_URL
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['MAX_CONTENT_LENGTH'] = MAX_FILE_SIZE
# 初始化数据库
db.init_app(app)
# 注册后台管理蓝图
app.register_blueprint(admin_bp)
# Context processor - 所有模板自动获得 site_config
@app.context_processor
def inject_site_config():
def get_config():
from admin import get_site_config
return get_site_config()
return {'site_config': get_config()}
# 初始化服务
cache_service = CacheService(CACHE_DIR, CACHE_EXPIRE_DAYS)
# ==================== 辅助函数 ====================
def get_current_user():
"""获取当前用户"""
user_id = session.get('user_id')
if user_id:
return User.query.get(user_id)
return None
def get_or_create_guest():
"""获取或创建访客记录"""
session_id = session.get('guest_id')
if not session_id:
session_id = str(uuid.uuid4())
session['guest_id'] = session_id
guest = GuestTranslation.query.filter_by(session_id=session_id).first()
if not guest:
guest = GuestTranslation(
session_id=session_id,
ip_address=request.remote_addr
)
db.session.add(guest)
db.session.commit()
return guest
def check_guest_limit(guest):
"""检查访客翻译限制"""
today = date.today()
if guest.last_translate_date != today:
guest.daily_count = 0
guest.last_translate_date = today
db.session.commit()
limit = USER_LIMITS['guest']['daily_translations']
if guest.daily_count >= limit:
return False, f"今日翻译次数已达上限({limit}次),请登录获取更多次数"
return True, "OK"
def allowed_file(filename):
"""检查文件类型"""
return '.' in filename and filename.lower().endswith('.pdf')
def compute_file_hash(file_content):
"""计算文件哈希"""
return hashlib.md5(file_content).hexdigest()
# ==================== 路由: 页面 ====================
@app.route('/')
def index():
"""首页"""
user = get_current_user()
if user:
# 检查日期并重置计数(使用上海时区)
from datetime import timezone, timedelta
shanghai_tz = timezone(timedelta(hours=8))
today = datetime.now(shanghai_tz).date()
if user.last_translate_date != today:
user.daily_count = 0
user.last_translate_date = today
db.session.commit()
limits = USER_LIMITS.get(user.user_type, USER_LIMITS['free'])
daily_remaining = limits['daily_translations'] - user.daily_count if limits['daily_translations'] > 0 else '无限'
max_pages = limits['max_pages'] if limits['max_pages'] > 0 else '无限'
else:
guest = get_or_create_guest()
# 检查日期并重置访客计数
from datetime import timezone, timedelta
shanghai_tz = timezone(timedelta(hours=8))
today = datetime.now(shanghai_tz).date()
if guest.last_translate_date != today:
guest.daily_count = 0
guest.last_translate_date = today
db.session.commit()
limits = USER_LIMITS['guest']
daily_remaining = limits['daily_translations'] - guest.daily_count
max_pages = limits['max_pages']
# 获取用户的功能列表
if user:
user_features = limits.get('features', [])
else:
user_features = USER_LIMITS['guest']['features']
# 定义所有功能及其描述
all_features = {
'basic_translate': {'name': '自动翻译缓存,相同文件秒出结果', 'base': True},
'custom_instruction': {'name': '自定义翻译要求', 'base': False},
'compare_view': {'name': '原文译文对比查看', 'base': False},
'history': {'name': '翻译历史记录', 'base': False},
'retranslate': {'name': '不满意重新翻译', 'base': False},
'export_pdf': {'name': '导出PDF格式', 'base': False},
'batch_translate': {'name': '批量翻译', 'base': False},
'custom_terms': {'name': '自定义术语库', 'base': False},
'priority_queue': {'name': '优先队列处理', 'base': False},
}
# 判断功能是否可用vip_enterprise 的 features=["all"] 表示全部可用)
if user_features == ['all']:
user_features = list(all_features.keys())
# 构建功能展示列表
feature_display = []
for feat_key, feat_info in all_features.items():
has_feature = feat_key in user_features
# 基础功能(所有用户都有)不显示,只显示需要权限的功能
if not feat_info['base']:
feature_display.append({
'name': feat_info['name'],
'has': has_feature
})
return render_template('index.html',
user=user,
limits=limits,
daily_remaining=daily_remaining,
max_pages=max_pages,
plans=MEMBERSHIP_PLANS,
features=feature_display
)
@app.route('/translate/<int:translation_id>')
def translation_detail(translation_id):
"""翻译详情页"""
user = get_current_user()
translation = Translation.query.get(translation_id)
if not translation:
return "翻译记录不存在", 404
# 权限检查
if user and translation.user_id != user.id:
return "无权访问", 403
if not user and not translation.user_id:
# guest记录检查session
pass
return render_template('translation.html',
translation=translation,
user=user
)
@app.route('/history')
def history():
"""翻译历史"""
user = get_current_user()
if not user:
return redirect(url_for('login'))
translations = Translation.query.filter_by(user_id=user.id)\
.order_by(Translation.created_at.desc()).limit(50).all()
return render_template('history.html',
user=user,
translations=translations
)
@app.route('/pricing')
def pricing():
"""会员定价页"""
user = get_current_user()
# 权益名称映射
feature_names = {
'basic_translate': '基础翻译功能',
'history': '翻译历史记录',
'retranslate': '不满意重新翻译',
'export_pdf': '导出PDF格式',
'compare_view': '原文译文对比查看',
'batch_translate': '批量翻译',
'custom_terms': '自定义术语库',
'priority_queue': '优先处理队列',
'custom_instruction': '自定义翻译要求',
'api_access': 'API接口调用',
'email_notify': '邮件通知',
'email_attachment': '邮件附件发送',
}
# 从数据库读取动态配置的会员套餐
db_plans = MembershipPlanConfig.query.filter_by(is_active=True)\
.order_by(MembershipPlanConfig.sort_order).all()
# 读取用户类型配置获取权益
user_types = UserTypeConfig.query.filter_by(is_active=True).all()
user_type_map = {ut.type_key: ut for ut in user_types}
# 为每个套餐添加权益列表
plan_features = {}
for plan in db_plans:
ut = user_type_map.get(plan.user_type_key)
if ut:
features = ut.get_features()
plan_features[plan.plan_key] = [
{'key': f, 'name': feature_names.get(f, f), 'has': True}
for f in features
]
# 添加限制信息
plan_features[plan.plan_key].insert(0, {
'key': 'daily_translations',
'name': f"每日翻译{ut.daily_translations if ut.daily_translations > 0 else '无限'}",
'has': True
})
plan_features[plan.plan_key].insert(1, {
'key': 'max_pages',
'name': f"单文件最大{ut.max_pages if ut.max_pages > 0 else '无限'}",
'has': True
})
# 免费用户权益
free_ut = user_type_map.get('free')
free_features = []
if free_ut:
free_features = [
{'key': 'daily_translations', 'name': f"每日翻译{free_ut.daily_translations}", 'has': True},
{'key': 'max_pages', 'name': f"单文件最大{free_ut.max_pages}", 'has': True},
]
for f in free_ut.get_features():
free_features.append({'key': f, 'name': feature_names.get(f, f), 'has': True})
# 添加没有的功能
all_features = ['compare_view', 'batch_translate', 'custom_terms']
for f in all_features:
if f not in free_ut.get_features():
free_features.append({'key': f, 'name': feature_names.get(f, f), 'has': False})
return render_template('pricing.html',
plans=db_plans,
plan_features=plan_features,
free_features=free_features,
user=user)
@app.route('/profile')
def profile():
"""个人中心"""
user = get_current_user()
if not user:
return redirect(url_for('login'))
limits = USER_LIMITS.get(user.user_type, USER_LIMITS['free'])
daily_remaining = limits['daily_translations'] - user.daily_count if limits['daily_translations'] > 0 else '无限'
max_pages = limits['max_pages'] if limits['max_pages'] > 0 else '无限'
# 检查是否有邮件附件权限
user_features = limits.get('features', [])
has_email_attachment = 'email_attachment' in user_features or user_features == ['all']
return render_template('profile.html',
user=user,
daily_remaining=daily_remaining,
max_pages=max_pages,
has_email_attachment=has_email_attachment
)
# ==================== 路由: API ====================
@app.route('/api/upload', methods=['POST'])
def upload_pdf():
"""上传PDF文件"""
user = get_current_user()
# 检查文件
if 'file' not in request.files:
return jsonify({'error': '未上传文件'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': '未选择文件'}), 400
if not allowed_file(file.filename):
return jsonify({'error': '只支持PDF文件'}), 400
# 获取翻译参数
instruction = request.form.get('instruction', None) # 用户翻译要求
# 读取文件内容
file_content = file.read()
file_hash = compute_file_hash(file_content)
filename = secure_filename(file.filename)
# 获取页数
try:
from pypdf import PdfReader
import io
reader = PdfReader(io.BytesIO(file_content))
page_count = len(reader.pages)
except Exception as e:
return jsonify({'error': f'PDF解析失败: {e}'}), 400
# 权限检查
if user:
can_translate, msg = user.can_translate(page_count, {'USER_LIMITS': USER_LIMITS})
if not can_translate:
return jsonify({'error': msg}), 403
else:
guest = get_or_create_guest()
can_translate, msg = check_guest_limit(guest)
if not can_translate:
return jsonify({'error': msg}), 403
# 检查页数限制
max_pages = USER_LIMITS['guest']['max_pages']
if page_count > max_pages:
return jsonify({'error': f'PDF页数超出限制最大{max_pages}页)'}), 403
# 检查缓存
cache_path = cache_service.get_cache(file_hash)
from_cache = False
# 检查是否有用户设置了不共享此文件
no_share_check = Translation.query.filter_by(file_hash=file_hash, no_share=True).first()
if cache_path and ENABLE_CACHE and not instruction and not no_share_check:
# 有缓存且无特殊翻译要求且无不共享标记,直接使用缓存
from_cache = True
output_path = cache_path
else:
# 需要翻译
# 保存上传文件
upload_dir = os.path.join(UPLOAD_DIR, str(uuid.uuid4()))
os.makedirs(upload_dir, exist_ok=True)
upload_path = os.path.join(upload_dir, filename)
with open(upload_path, 'wb') as f:
f.write(file_content)
# 创建输出路径
output_dir = os.path.join(OUTPUT_DIR, str(uuid.uuid4()))
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, f"{filename}_translated.md")
# 创建异步翻译任务先不创建等translation_id生成后
task_id = str(uuid.uuid4())
# 创建翻译记录
translation = Translation(
user_id=user.id if user else None,
file_hash=file_hash,
original_filename=filename,
file_size=len(file_content),
page_count=page_count,
translate_params=json.dumps({'instruction': instruction}) if instruction else None,
status='processing' if not from_cache else 'completed',
progress=0 if not from_cache else 100,
output_path=output_path,
from_cache=from_cache
)
db.session.add(translation)
# 预先提交获取 translation_id
if not from_cache:
db.session.flush() # 获取 ID 但不提交完整事务
# 创建异步翻译任务(需要翻译时)
if not from_cache:
TranslationTask.create_task(
task_id, upload_path, output_path,
{'LLM_CONFIG': LLM_CONFIG},
instruction,
translation_id=translation.id,
app=app
)
# 更新用户/访客计数
if user:
user.increment_count()
else:
guest.daily_count += 1
guest.last_translate_date = date.today()
# 更新缓存记录
if from_cache:
cache_record = TranslationCache.query.filter_by(file_hash=file_hash).first()
if cache_record:
cache_record.increment_hit()
db.session.commit()
return jsonify({
'success': True,
'translation_id': translation.id,
'file_hash': file_hash,
'page_count': page_count,
'from_cache': from_cache,
'task_id': task_id if not from_cache else None,
'message': '使用缓存结果' if from_cache else '翻译任务已创建'
})
@app.route('/api/status/<int:translation_id>')
def translation_status(translation_id):
"""获取翻译状态"""
translation = Translation.query.get(translation_id)
if not translation:
return jsonify({'error': '翻译记录不存在'}), 404
# 如果有task_id检查任务状态
if translation.status == 'processing':
# 这里可以查询TranslationTask
pass
return jsonify({
'id': translation.id,
'status': translation.status,
'progress': translation.progress,
'from_cache': translation.from_cache,
'error': translation.error_message
})
@app.route('/api/task/<task_id>')
def task_status(task_id):
"""获取任务状态"""
task = TranslationTask.get_task(task_id)
if not task:
return jsonify({'error': '任务不存在'}), 404
return jsonify(task)
@app.route('/api/result/<int:translation_id>')
def get_result(translation_id):
"""获取翻译结果"""
user = get_current_user()
translation = Translation.query.get(translation_id)
if not translation:
return jsonify({'error': '翻译记录不存在'}), 404
if translation.status != 'completed':
return jsonify({'error': '翻译未完成'}), 400
# 检查输出文件
if not translation.output_path or not os.path.exists(translation.output_path):
return jsonify({'error': '翻译结果文件不存在'}), 404
# 读取结果
with open(translation.output_path, 'r', encoding='utf-8') as f:
content = f.read()
return jsonify({
'id': translation.id,
'filename': translation.original_filename,
'content': content,
'output_path': translation.output_path
})
@app.route('/api/download/<int:translation_id>')
def download_result(translation_id):
"""下载翻译结果"""
user = get_current_user()
translation = Translation.query.get(translation_id)
if not translation or not translation.output_path:
return jsonify({'error': '翻译记录不存在'}), 404
if not os.path.exists(translation.output_path):
return jsonify({'error': '文件不存在'}), 404
filename = f"{translation.original_filename}_translated.md"
return send_file(translation.output_path, as_attachment=True, download_name=filename)
@app.route('/api/retranslate/<int:translation_id>', methods=['POST'])
def retranslate(translation_id):
"""重新翻译"""
user = get_current_user()
if not user:
return jsonify({'error': '请登录后使用此功能'}), 401
translation = Translation.query.get(translation_id)
if not translation or translation.user_id != user.id:
return jsonify({'error': '无权操作'}), 403
# 检查功能权限
limits = USER_LIMITS.get(user.user_type, USER_LIMITS['free'])
if 'retranslate' not in limits['features']:
return jsonify({'error': '会员功能,请升级'}), 403
instruction = request.json.get('instruction', '')
# 查找原文件
# 这里需要从原始上传路径恢复,简化处理
# 创建新翻译记录
new_translation = Translation(
user_id=user.id,
file_hash=translation.file_hash,
original_filename=translation.original_filename,
file_size=translation.file_size,
page_count=translation.page_count,
translate_params=json.dumps({'instruction': instruction}),
status='processing',
parent_id=translation_id,
retranslate_request=instruction
)
db.session.add(new_translation)
db.session.commit()
# TODO: 实际翻译逻辑
return jsonify({
'success': True,
'translation_id': new_translation.id,
'message': '重译任务已创建'
})
@app.route('/api/compare/<int:translation_id>')
def compare_view(translation_id):
"""对比查看"""
user = get_current_user()
if not user:
return jsonify({'error': '请登录后使用此功能'}), 401
translation = Translation.query.get(translation_id)
if not translation or translation.user_id != user.id:
return jsonify({'error': '无权访问'}), 403
# 生成对比文件
# TODO: 实现对比功能
return jsonify({
'id': translation.id,
'original': '原文内容',
'translated': '译文内容'
})
# ==================== 路由: 用户系统 ====================
@app.route('/login', methods=['GET', 'POST'])
def login():
"""登录"""
if request.method == 'GET':
return render_template('login.html')
data = request.json
username = data.get('username')
password = data.get('password')
user = User.query.filter_by(username=username).first()
if not user or not user.check_password(password):
return jsonify({'error': '用户名或密码错误'}), 401
session['user_id'] = user.id
return jsonify({'success': True, 'user': user.to_dict()})
@app.route('/register', methods=['GET', 'POST'])
def register():
"""注册"""
if request.method == 'GET':
# 检查邀请码参数
invite_code = request.args.get('invite', None)
return render_template('register.html', invite_code=invite_code)
data = request.json
username = data.get('username')
email = data.get('email')
password = data.get('password')
invite_code = data.get('invite_code', None) # 邀请码
# 检查用户是否存在
if User.query.filter_by(username=username).first():
return jsonify({'error': '用户名已存在'}), 400
if User.query.filter_by(email=email).first():
return jsonify({'error': '邮箱已注册'}), 400
# 创建用户
user = User(username=username, email=email, user_type='free')
user.set_password(password)
# 生成邀请码6位随机字符
import random
import string
user.invite_code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=6))
# 处理邀请
if invite_code:
inviter = User.query.filter_by(invite_code=invite_code).first()
if inviter:
user.invited_by = inviter.id
# 创建邀请记录
invitation = UserInvitation(
inviter_id=inviter.id,
invite_code=invite_code,
invitee_id=user.id,
invitee_email=email,
reward_amount=5.0,
status='registered',
registered_at=datetime.utcnow()
)
db.session.add(invitation)
# 给邀请人奖励
inviter.invite_count += 1
inviter.invite_rewards += 5.0
inviter.balance += 5.0
# 给被邀请人奖励
user.balance += 2.0
# 创建流水
inviter_tx = AccountTransaction(
user_id=inviter.id,
transaction_type='invite_reward',
amount=5.0,
balance_before=inviter.balance - 5.0,
balance_after=inviter.balance,
description=f'邀请用户{username}注册奖励'
)
db.session.add(inviter_tx)
user_tx = AccountTransaction(
user_id=user.id,
transaction_type='invite_bonus',
amount=2.0,
balance_before=0,
balance_after=2.0,
description='新用户注册奖励'
)
db.session.add(user_tx)
# 发送邀请奖励邮件
if inviter.email_notify:
email_service.send_invite_reward(inviter.email, inviter.username, 5.0, inviter.invite_count)
db.session.add(user)
db.session.commit()
session['user_id'] = user.id
# 发送欢迎邮件
email_service.send_welcome_email(user.email, user.username, user.invite_code)
return jsonify({
'success': True,
'user': user.to_dict(),
'invite_reward': user.balance > 0 # 是否获得邀请奖励
})
@app.route('/logout')
def logout():
"""退出登录"""
session.pop('user_id', None)
return redirect(url_for('index'))
@app.route('/api/user/info')
def user_info():
"""用户信息"""
user = get_current_user()
if not user:
return jsonify({'user': None})
limits = USER_LIMITS.get(user.user_type, USER_LIMITS['free'])
return jsonify({
'user': user.to_dict(),
'limits': limits
})
# ==================== 个人中心API ====================
@app.route('/api/profile/transactions')
def get_transactions():
"""获取账户流水"""
user = get_current_user()
if not user:
return jsonify({'error': '请登录'}), 401
filter_type = request.args.get('type', 'all')
query = AccountTransaction.query.filter_by(user_id=user.id)
if filter_type != 'all':
query = query.filter_by(transaction_type=filter_type)
transactions = query.order_by(AccountTransaction.created_at.desc()).limit(50).all()
return jsonify({
'success': True,
'transactions': [t.to_dict() for t in transactions]
})
@app.route('/api/profile/purchases')
def get_purchases():
"""获取会员购买记录"""
user = get_current_user()
if not user:
return jsonify({'error': '请登录'}), 401
purchases = MembershipPurchase.query.filter_by(user_id=user.id)\
.order_by(MembershipPurchase.created_at.desc()).limit(20).all()
return jsonify({
'success': True,
'purchases': [p.to_dict() for p in purchases]
})
@app.route('/api/profile/recharge', methods=['POST'])
def recharge_balance():
"""充值"""
user = get_current_user()
if not user:
return jsonify({'error': '请登录'}), 401
data = request.json
amount = float(data.get('amount', 0))
payment_method = data.get('payment_method', 'balance')
if amount < 10:
return jsonify({'error': '充值金额最少10元'}), 400
if amount > 10000:
return jsonify({'error': '充值金额最多10000元'}), 400
balance_before = user.balance
# 创建充值记录
order_no = f"RC{datetime.now().strftime('%Y%m%d%H%M%S')}{user.id}"
recharge = UserRecharge(
user_id=user.id,
amount=amount,
balance_before=balance_before,
payment_method=payment_method,
status='completed',
order_no=order_no,
completed_at=datetime.utcnow()
)
# 更新余额
user.balance += amount
recharge.balance_after = user.balance
db.session.add(recharge)
# 创建流水记录
transaction = AccountTransaction(
user_id=user.id,
transaction_type='recharge',
amount=amount,
balance_before=balance_before,
balance_after=user.balance,
related_id=recharge.id,
related_type='recharge',
description=f'充值¥{amount}'
)
db.session.add(transaction)
db.session.commit()
return jsonify({
'success': True,
'balance': user.balance,
'recharge_id': recharge.id
})
@app.route('/api/profile/refund', methods=['POST'])
def request_refund():
"""申请退款"""
user = get_current_user()
if not user:
return jsonify({'error': '请登录'}), 401
data = request.json
amount = float(data.get('amount', 0))
reason = data.get('reason', '')
if amount <= 0:
return jsonify({'error': '退款金额必须大于0'}), 400
if amount > user.balance:
return jsonify({'error': '退款金额不能超过余额'}), 400
if not reason:
return jsonify({'error': '请填写退款原因'}), 400
balance_before = user.balance
# 创建退款申请
refund = UserRefund(
user_id=user.id,
amount=amount,
balance_before=balance_before,
reason=reason,
reason_type='user_request',
status='pending'
)
db.session.add(refund)
db.session.commit()
return jsonify({
'success': True,
'refund_id': refund.id,
'message': '退款申请已提交,等待管理员审核'
})
@app.route('/api/profile/settings', methods=['POST'])
def update_settings():
"""更新账户设置"""
user = get_current_user()
if not user:
return jsonify({'error': '请登录'}), 401
data = request.json
# 手机号
if 'phone' in data:
phone = data.get('phone', '')
if phone and len(phone) >= 10:
user.phone = phone
user.phone_verified = False
# 通知邮箱(更换后邮件通知发送到此邮箱)
if 'email' in data:
email = data.get('email', '')
if email and '@' in email:
# 检查邮箱是否已被其他用户使用
existing = User.query.filter(User.email == email, User.id != user.id).first()
if existing:
return jsonify({'error': '该邮箱已被其他用户使用'}), 400
user.email = email
# 通知设置
if 'notify_on_complete' in data:
user.notify_on_complete = data.get('notify_on_complete', True)
if 'notify_on_expire' in data:
user.notify_on_expire = data.get('notify_on_expire', True)
if 'notify_with_attachment' in data:
# 检查是否有附件权限
limits = USER_LIMITS.get(user.user_type, USER_LIMITS['free'])
user_features = limits.get('features', [])
if 'email_attachment' in user_features or user_features == ['all']:
user.notify_with_attachment = data.get('notify_with_attachment', False)
else:
return jsonify({'error': '邮件附件功能需VIP会员', 'feature': 'email_attachment'}), 403
if 'email_notify' in data:
user.email_notify = data.get('email_notify', True)
db.session.commit()
return jsonify({
'success': True,
'user': user.to_dict()
})
@app.route('/api/profile/invitations')
def get_invitations():
"""获取邀请记录"""
user = get_current_user()
if not user:
return jsonify({'error': '请登录'}), 401
invitations = UserInvitation.query.filter_by(inviter_id=user.id)\
.order_by(UserInvitation.created_at.desc()).limit(20).all()
return jsonify({
'success': True,
'invitations': [inv.to_dict() for inv in invitations]
})
@app.route('/api/invite/<invite_code>')
def check_invite(invite_code):
"""检查邀请码"""
inviter = User.query.filter_by(invite_code=invite_code).first()
if not inviter:
return jsonify({'valid': False, 'error': '邀请码无效'})
return jsonify({
'valid': True,
'inviter': inviter.username,
'reward': 5 # 被邀请人奖励
})
# ==================== 初始化 ====================
def init_app():
"""初始化应用"""
# 创建目录
for dir_name in [UPLOAD_DIR, CACHE_DIR, OUTPUT_DIR]:
if not os.path.exists(dir_name):
os.makedirs(dir_name)
# 创建数据库表
with app.app_context():
db.create_all()
# 创建默认管理员账号
admin = User.query.filter_by(username='admin').first()
if not admin:
admin = User(
username='admin',
email='admin@tphai.com',
user_type='admin',
is_admin=True,
is_active=True
)
admin.set_password('admin123')
db.session.add(admin)
db.session.commit()
print("✅ 默认管理员账号已创建: admin / admin123")
# 创建示例数据包套餐
if DataPackage.query.count() == 0:
packages = [
DataPackage(name='入门包', description='适合轻度使用', translation_count=100, price=9.9, original_price=19.9, valid_days=30, sort_order=1),
DataPackage(name='标准包', description='日常使用首选', translation_count=500, price=39.9, original_price=59.9, valid_days=30, sort_order=2, is_recommended=True),
DataPackage(name='专业包', description='高频使用更划算', translation_count=2000, price=99.9, original_price=199.9, valid_days=30, sort_order=3),
DataPackage(name='无限包', description='畅享无限翻译', translation_count=0, price=199.9, original_price=399.9, valid_days=30, sort_order=4),
]
for pkg in packages:
db.session.add(pkg)
db.session.commit()
print("✅ 示例数据包套餐已创建")
if __name__ == '__main__':
init_app()
app.run(host='0.0.0.0', port=19000, debug=True)