""" PDF翻译网站主应用 """ import os import json import uuid import hashlib from datetime import datetime, date from functools import wraps from flask import Flask, request, jsonify, render_template, send_file, session, redirect, url_for from flask_sqlalchemy import SQLAlchemy from werkzeug.utils import secure_filename from config import * from models import (db, User, Translation, TranslationCache, GuestTranslation, DataPackage, UserPackage, DynamicConfig, UserRecharge, UserRefund, MembershipPurchase, AccountTransaction, MembershipPlanConfig, UserTypeConfig, UserInvitation, InviteRewardConfig, EmailNotification, EmailTemplateConfig) from email_service import email_service from services import TranslationService, CacheService, TranslationTask from admin import admin_bp # ==================== 创建应用 ==================== app = Flask(__name__) app.config['SECRET_KEY'] = SECRET_KEY app.config['SQLALCHEMY_DATABASE_URI'] = DATABASE_URL app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['MAX_CONTENT_LENGTH'] = MAX_FILE_SIZE # 初始化数据库 db.init_app(app) # 注册后台管理蓝图 app.register_blueprint(admin_bp) # Context processor - 所有模板自动获得 site_config @app.context_processor def inject_site_config(): def get_config(): from admin import get_site_config return get_site_config() return {'site_config': get_config()} # 初始化服务 cache_service = CacheService(CACHE_DIR, CACHE_EXPIRE_DAYS) # ==================== 辅助函数 ==================== def get_current_user(): """获取当前用户""" user_id = session.get('user_id') if user_id: return User.query.get(user_id) return None def get_or_create_guest(): """获取或创建访客记录""" session_id = session.get('guest_id') if not session_id: session_id = str(uuid.uuid4()) session['guest_id'] = session_id guest = GuestTranslation.query.filter_by(session_id=session_id).first() if not guest: guest = GuestTranslation( session_id=session_id, ip_address=request.remote_addr ) db.session.add(guest) db.session.commit() return guest def check_guest_limit(guest): """检查访客翻译限制""" today = date.today() if guest.last_translate_date != today: guest.daily_count = 0 guest.last_translate_date = today db.session.commit() limit = USER_LIMITS['guest']['daily_translations'] if guest.daily_count >= limit: return False, f"今日翻译次数已达上限({limit}次),请登录获取更多次数" return True, "OK" def allowed_file(filename): """检查文件类型""" return '.' in filename and filename.lower().endswith('.pdf') def compute_file_hash(file_content): """计算文件哈希""" return hashlib.md5(file_content).hexdigest() # ==================== 路由: 页面 ==================== @app.route('/') def index(): """首页""" user = get_current_user() if user: # 检查日期并重置计数(使用上海时区) from datetime import timezone, timedelta shanghai_tz = timezone(timedelta(hours=8)) today = datetime.now(shanghai_tz).date() if user.last_translate_date != today: user.daily_count = 0 user.last_translate_date = today db.session.commit() limits = USER_LIMITS.get(user.user_type, USER_LIMITS['free']) daily_remaining = limits['daily_translations'] - user.daily_count if limits['daily_translations'] > 0 else '无限' max_pages = limits['max_pages'] if limits['max_pages'] > 0 else '无限' else: guest = get_or_create_guest() # 检查日期并重置访客计数 from datetime import timezone, timedelta shanghai_tz = timezone(timedelta(hours=8)) today = datetime.now(shanghai_tz).date() if guest.last_translate_date != today: guest.daily_count = 0 guest.last_translate_date = today db.session.commit() limits = USER_LIMITS['guest'] daily_remaining = limits['daily_translations'] - guest.daily_count max_pages = limits['max_pages'] # 获取用户的功能列表 if user: user_features = limits.get('features', []) else: user_features = USER_LIMITS['guest']['features'] # 定义所有功能及其描述 all_features = { 'basic_translate': {'name': '自动翻译缓存,相同文件秒出结果', 'base': True}, 'custom_instruction': {'name': '自定义翻译要求', 'base': False}, 'compare_view': {'name': '原文译文对比查看', 'base': False}, 'history': {'name': '翻译历史记录', 'base': False}, 'retranslate': {'name': '不满意重新翻译', 'base': False}, 'export_pdf': {'name': '导出PDF格式', 'base': False}, 'batch_translate': {'name': '批量翻译', 'base': False}, 'custom_terms': {'name': '自定义术语库', 'base': False}, 'priority_queue': {'name': '优先队列处理', 'base': False}, } # 判断功能是否可用(vip_enterprise 的 features=["all"] 表示全部可用) if user_features == ['all']: user_features = list(all_features.keys()) # 构建功能展示列表 feature_display = [] for feat_key, feat_info in all_features.items(): has_feature = feat_key in user_features # 基础功能(所有用户都有)不显示,只显示需要权限的功能 if not feat_info['base']: feature_display.append({ 'name': feat_info['name'], 'has': has_feature }) return render_template('index.html', user=user, limits=limits, daily_remaining=daily_remaining, max_pages=max_pages, plans=MEMBERSHIP_PLANS, features=feature_display ) @app.route('/translate/') def translation_detail(translation_id): """翻译详情页""" user = get_current_user() translation = Translation.query.get(translation_id) if not translation: return "翻译记录不存在", 404 # 权限检查 if user and translation.user_id != user.id: return "无权访问", 403 if not user and not translation.user_id: # guest记录,检查session pass return render_template('translation.html', translation=translation, user=user ) @app.route('/history') def history(): """翻译历史""" user = get_current_user() if not user: return redirect(url_for('login')) translations = Translation.query.filter_by(user_id=user.id)\ .order_by(Translation.created_at.desc()).limit(50).all() return render_template('history.html', user=user, translations=translations ) @app.route('/pricing') def pricing(): """会员定价页""" user = get_current_user() # 权益名称映射 feature_names = { 'basic_translate': '基础翻译功能', 'history': '翻译历史记录', 'retranslate': '不满意重新翻译', 'export_pdf': '导出PDF格式', 'compare_view': '原文译文对比查看', 'batch_translate': '批量翻译', 'custom_terms': '自定义术语库', 'priority_queue': '优先处理队列', 'custom_instruction': '自定义翻译要求', 'api_access': 'API接口调用', 'email_notify': '邮件通知', 'email_attachment': '邮件附件发送', } # 从数据库读取动态配置的会员套餐 db_plans = MembershipPlanConfig.query.filter_by(is_active=True)\ .order_by(MembershipPlanConfig.sort_order).all() # 读取用户类型配置获取权益 user_types = UserTypeConfig.query.filter_by(is_active=True).all() user_type_map = {ut.type_key: ut for ut in user_types} # 为每个套餐添加权益列表 plan_features = {} for plan in db_plans: ut = user_type_map.get(plan.user_type_key) if ut: features = ut.get_features() plan_features[plan.plan_key] = [ {'key': f, 'name': feature_names.get(f, f), 'has': True} for f in features ] # 添加限制信息 plan_features[plan.plan_key].insert(0, { 'key': 'daily_translations', 'name': f"每日翻译{ut.daily_translations if ut.daily_translations > 0 else '无限'}次", 'has': True }) plan_features[plan.plan_key].insert(1, { 'key': 'max_pages', 'name': f"单文件最大{ut.max_pages if ut.max_pages > 0 else '无限'}页", 'has': True }) # 免费用户权益 free_ut = user_type_map.get('free') free_features = [] if free_ut: free_features = [ {'key': 'daily_translations', 'name': f"每日翻译{free_ut.daily_translations}次", 'has': True}, {'key': 'max_pages', 'name': f"单文件最大{free_ut.max_pages}页", 'has': True}, ] for f in free_ut.get_features(): free_features.append({'key': f, 'name': feature_names.get(f, f), 'has': True}) # 添加没有的功能 all_features = ['compare_view', 'batch_translate', 'custom_terms'] for f in all_features: if f not in free_ut.get_features(): free_features.append({'key': f, 'name': feature_names.get(f, f), 'has': False}) return render_template('pricing.html', plans=db_plans, plan_features=plan_features, free_features=free_features, user=user) @app.route('/profile') def profile(): """个人中心""" user = get_current_user() if not user: return redirect(url_for('login')) limits = USER_LIMITS.get(user.user_type, USER_LIMITS['free']) daily_remaining = limits['daily_translations'] - user.daily_count if limits['daily_translations'] > 0 else '无限' max_pages = limits['max_pages'] if limits['max_pages'] > 0 else '无限' # 检查是否有邮件附件权限 user_features = limits.get('features', []) has_email_attachment = 'email_attachment' in user_features or user_features == ['all'] return render_template('profile.html', user=user, daily_remaining=daily_remaining, max_pages=max_pages, has_email_attachment=has_email_attachment ) # ==================== 路由: API ==================== @app.route('/api/upload', methods=['POST']) def upload_pdf(): """上传PDF文件""" user = get_current_user() # 检查文件 if 'file' not in request.files: return jsonify({'error': '未上传文件'}), 400 file = request.files['file'] if file.filename == '': return jsonify({'error': '未选择文件'}), 400 if not allowed_file(file.filename): return jsonify({'error': '只支持PDF文件'}), 400 # 获取翻译参数 instruction = request.form.get('instruction', None) # 用户翻译要求 # 读取文件内容 file_content = file.read() file_hash = compute_file_hash(file_content) filename = secure_filename(file.filename) # 获取页数 try: from pypdf import PdfReader import io reader = PdfReader(io.BytesIO(file_content)) page_count = len(reader.pages) except Exception as e: return jsonify({'error': f'PDF解析失败: {e}'}), 400 # 权限检查 if user: can_translate, msg = user.can_translate(page_count, {'USER_LIMITS': USER_LIMITS}) if not can_translate: return jsonify({'error': msg}), 403 else: guest = get_or_create_guest() can_translate, msg = check_guest_limit(guest) if not can_translate: return jsonify({'error': msg}), 403 # 检查页数限制 max_pages = USER_LIMITS['guest']['max_pages'] if page_count > max_pages: return jsonify({'error': f'PDF页数超出限制(最大{max_pages}页)'}), 403 # 检查缓存 cache_path = cache_service.get_cache(file_hash) from_cache = False # 检查是否有用户设置了不共享此文件 no_share_check = Translation.query.filter_by(file_hash=file_hash, no_share=True).first() if cache_path and ENABLE_CACHE and not instruction and not no_share_check: # 有缓存且无特殊翻译要求且无不共享标记,直接使用缓存 from_cache = True output_path = cache_path else: # 需要翻译 # 保存上传文件 - 使用同一个UUID确保uploads和outputs目录关联 session_uuid = str(uuid.uuid4()) upload_dir = os.path.join(UPLOAD_DIR, session_uuid) os.makedirs(upload_dir, exist_ok=True) upload_path = os.path.join(upload_dir, filename) with open(upload_path, 'wb') as f: f.write(file_content) # 创建输出路径 - 使用相同的UUID output_dir = os.path.join(OUTPUT_DIR, session_uuid) os.makedirs(output_dir, exist_ok=True) output_path = os.path.join(output_dir, f"{filename}_translated.md") # 创建异步翻译任务(先不创建,等translation_id生成后) task_id = str(uuid.uuid4()) # 创建翻译记录 translation = Translation( user_id=user.id if user else None, file_hash=file_hash, original_filename=filename, file_size=len(file_content), page_count=page_count, translate_params=json.dumps({'instruction': instruction}) if instruction else None, status='processing' if not from_cache else 'completed', progress=0 if not from_cache else 100, upload_path=upload_path if not from_cache else None, # 保存上传路径 output_path=output_path, from_cache=from_cache ) db.session.add(translation) # 预先提交获取 translation_id if not from_cache: db.session.flush() # 获取 ID 但不提交完整事务 # 创建异步翻译任务(需要翻译时) if not from_cache: TranslationTask.create_task( task_id, upload_path, output_path, {'LLM_CONFIG': LLM_CONFIG}, instruction, translation_id=translation.id, app=app ) # 更新用户/访客计数 if user: user.increment_count() else: guest.daily_count += 1 guest.last_translate_date = date.today() # 更新缓存记录 if from_cache: cache_record = TranslationCache.query.filter_by(file_hash=file_hash).first() if cache_record: cache_record.increment_hit() db.session.commit() return jsonify({ 'success': True, 'translation_id': translation.id, 'file_hash': file_hash, 'page_count': page_count, 'from_cache': from_cache, 'task_id': task_id if not from_cache else None, 'message': '使用缓存结果' if from_cache else '翻译任务已创建' }) @app.route('/api/config') def api_config(): """获取系统配置""" from admin import get_llm_config, get_site_config return jsonify({ 'site_name': get_site_config().get('site_name'), 'max_file_size': get_site_config().get('max_file_size'), 'cache_expire_days': get_site_config().get('cache_expire_days'), 'llm_config': get_llm_config() }) @app.route('/api/translations') def api_translations_list(): """获取翻译记录列表""" user = get_current_user() if user: translations = Translation.query.filter_by(user_id=user.id)\ .order_by(Translation.created_at.desc()).limit(20).all() else: # 访客返回空列表 translations = [] return jsonify({ 'translations': [t.to_dict() for t in translations] }) @app.route('/api/status/') def translation_status(translation_id): """获取翻译状态""" translation = Translation.query.get(translation_id) if not translation: return jsonify({'error': '翻译记录不存在'}), 404 # 如果有task_id,检查任务状态 if translation.status == 'processing': # 这里可以查询TranslationTask pass return jsonify({ 'id': translation.id, 'status': translation.status, 'progress': translation.progress, 'filename': translation.original_filename, 'pages': translation.page_count, 'from_cache': translation.from_cache, 'error': translation.error_message, 'created_at': translation.created_at.isoformat() if translation.created_at else None, 'completed_at': translation.completed_at.isoformat() if translation.completed_at else None, }) @app.route('/api/task/') def task_status(task_id): """获取任务状态""" task = TranslationTask.get_task(task_id) if not task: return jsonify({'error': '任务不存在'}), 404 return jsonify(task) @app.route('/api/result/') def get_result(translation_id): """获取翻译结果""" user = get_current_user() translation = Translation.query.get(translation_id) if not translation: return jsonify({'error': '翻译记录不存在'}), 404 if translation.status != 'completed': return jsonify({'error': '翻译未完成'}), 400 # 检查输出文件 if not translation.output_path or not os.path.exists(translation.output_path): return jsonify({'error': '翻译结果文件不存在'}), 404 # 读取结果 with open(translation.output_path, 'r', encoding='utf-8') as f: content = f.read() return jsonify({ 'id': translation.id, 'filename': translation.original_filename, 'content': content, 'output_path': translation.output_path }) @app.route('/api/download/') def download_result(translation_id): """下载翻译结果""" user = get_current_user() translation = Translation.query.get(translation_id) if not translation or not translation.output_path: return jsonify({'error': '翻译记录不存在'}), 404 if not os.path.exists(translation.output_path): return jsonify({'error': '文件不存在'}), 404 filename = f"{translation.original_filename}_translated.md" return send_file(translation.output_path, as_attachment=True, download_name=filename) @app.route('/api/retranslate/', methods=['POST']) def retranslate(translation_id): """重新翻译""" user = get_current_user() if not user: return jsonify({'error': '请登录后使用此功能'}), 401 translation = Translation.query.get(translation_id) if not translation or translation.user_id != user.id: return jsonify({'error': '无权操作'}), 403 # 检查功能权限 limits = USER_LIMITS.get(user.user_type, USER_LIMITS['free']) if 'retranslate' not in limits['features']: return jsonify({'error': '会员功能,请升级'}), 403 instruction = request.json.get('instruction', '') # 查找原文件 # 这里需要从原始上传路径恢复,简化处理 # 创建新翻译记录 new_translation = Translation( user_id=user.id, file_hash=translation.file_hash, original_filename=translation.original_filename, file_size=translation.file_size, page_count=translation.page_count, translate_params=json.dumps({'instruction': instruction}), status='processing', parent_id=translation_id, retranslate_request=instruction ) db.session.add(new_translation) db.session.commit() # TODO: 实际翻译逻辑 return jsonify({ 'success': True, 'translation_id': new_translation.id, 'message': '重译任务已创建' }) @app.route('/api/compare/') def compare_view(translation_id): """对比查看""" user = get_current_user() if not user: return jsonify({'error': '请登录后使用此功能'}), 401 translation = Translation.query.get(translation_id) if not translation or (translation.user_id != user.id and user.user_type != 'admin'): return jsonify({'error': '无权访问'}), 403 # 读取翻译结果文件 translated_content = '' if translation.output_path and os.path.exists(translation.output_path): try: with open(translation.output_path, 'r', encoding='utf-8') as f: translated_content = f.read() except Exception as e: translated_content = f'读取失败: {str(e)}' # 从翻译结果中提取各页内容 # 翻译结果格式是Markdown,包含"## 第 X 页"分隔 original_pages = [] translated_pages = [] if translated_content: # 解析翻译结果的页面结构 import re page_pattern = r'## 第 (\d+) 页\n\n(.*?)\n\n---' matches = re.findall(page_pattern, translated_content, re.DOTALL) for page_num, content in matches: translated_pages.append({ 'page': int(page_num), 'content': content.strip() }) # 如果有原文内容存储,获取原文 original_content = '' # 优先从数据库存储的upload_path获取原PDF possible_paths = [] if translation.upload_path: possible_paths.append(translation.upload_path) # 备用方案:尝试从路径推断(兼容旧数据) upload_dir = os.path.dirname(translation.output_path.replace('outputs', 'uploads').replace('_translated.md', '')) if translation.output_path else '' if upload_dir: possible_paths.append( translation.output_path.replace('outputs', 'uploads').replace('_translated.md', '') if translation.output_path else '' ) possible_paths.append(os.path.join(upload_dir, translation.original_filename)) for pdf_path in possible_paths: if pdf_path and os.path.exists(pdf_path) and pdf_path.endswith('.pdf'): try: from pypdf import PdfReader reader = PdfReader(pdf_path) for page in reader.pages: text = page.extract_text() if text: original_content += text + '\n\n' except: pass break return jsonify({ 'id': translation.id, 'filename': translation.original_filename, 'original': original_content or '原文内容未找到(可能PDF已被删除或为扫描版)', 'translated': translated_content, 'pages': translated_pages }) # ==================== 路由: 用户系统 ==================== @app.route('/login', methods=['GET', 'POST']) def login(): """登录""" if request.method == 'GET': return render_template('login.html') data = request.json username = data.get('username') password = data.get('password') user = User.query.filter_by(username=username).first() if not user or not user.check_password(password): return jsonify({'error': '用户名或密码错误'}), 401 session['user_id'] = user.id return jsonify({'success': True, 'user': user.to_dict()}) @app.route('/register', methods=['GET', 'POST']) def register(): """注册""" if request.method == 'GET': # 检查邀请码参数 invite_code = request.args.get('invite', None) return render_template('register.html', invite_code=invite_code) data = request.json username = data.get('username') email = data.get('email') password = data.get('password') invite_code = data.get('invite_code', None) # 邀请码 # 检查用户是否存在 if User.query.filter_by(username=username).first(): return jsonify({'error': '用户名已存在'}), 400 if User.query.filter_by(email=email).first(): return jsonify({'error': '邮箱已注册'}), 400 # 创建用户 user = User(username=username, email=email, user_type='free') user.set_password(password) # 生成邀请码(6位随机字符) import random import string user.invite_code = ''.join(random.choices(string.ascii_uppercase + string.digits, k=6)) # 处理邀请 if invite_code: inviter = User.query.filter_by(invite_code=invite_code).first() if inviter: user.invited_by = inviter.id # 创建邀请记录 invitation = UserInvitation( inviter_id=inviter.id, invite_code=invite_code, invitee_id=user.id, invitee_email=email, reward_amount=5.0, status='registered', registered_at=datetime.utcnow() ) db.session.add(invitation) # 给邀请人奖励 inviter.invite_count += 1 inviter.invite_rewards += 5.0 inviter.balance += 5.0 # 给被邀请人奖励 user.balance += 2.0 # 创建流水 inviter_tx = AccountTransaction( user_id=inviter.id, transaction_type='invite_reward', amount=5.0, balance_before=inviter.balance - 5.0, balance_after=inviter.balance, description=f'邀请用户{username}注册奖励' ) db.session.add(inviter_tx) user_tx = AccountTransaction( user_id=user.id, transaction_type='invite_bonus', amount=2.0, balance_before=0, balance_after=2.0, description='新用户注册奖励' ) db.session.add(user_tx) # 发送邀请奖励邮件 if inviter.email_notify: email_service.send_invite_reward(inviter.email, inviter.username, 5.0, inviter.invite_count) db.session.add(user) db.session.commit() session['user_id'] = user.id # 发送欢迎邮件 email_service.send_welcome_email(user.email, user.username, user.invite_code) return jsonify({ 'success': True, 'user': user.to_dict(), 'invite_reward': user.balance > 0 # 是否获得邀请奖励 }) @app.route('/logout') def logout(): """退出登录""" session.pop('user_id', None) return redirect(url_for('index')) @app.route('/api/user/info') def user_info(): """用户信息""" user = get_current_user() if not user: return jsonify({'user': None}) limits = USER_LIMITS.get(user.user_type, USER_LIMITS['free']) return jsonify({ 'user': user.to_dict(), 'limits': limits }) # ==================== 个人中心API ==================== @app.route('/api/profile/transactions') def get_transactions(): """获取账户流水""" user = get_current_user() if not user: return jsonify({'error': '请登录'}), 401 filter_type = request.args.get('type', 'all') query = AccountTransaction.query.filter_by(user_id=user.id) if filter_type != 'all': query = query.filter_by(transaction_type=filter_type) transactions = query.order_by(AccountTransaction.created_at.desc()).limit(50).all() return jsonify({ 'success': True, 'transactions': [t.to_dict() for t in transactions] }) @app.route('/api/profile/purchases') def get_purchases(): """获取会员购买记录""" user = get_current_user() if not user: return jsonify({'error': '请登录'}), 401 purchases = MembershipPurchase.query.filter_by(user_id=user.id)\ .order_by(MembershipPurchase.created_at.desc()).limit(20).all() return jsonify({ 'success': True, 'purchases': [p.to_dict() for p in purchases] }) @app.route('/api/profile/recharge', methods=['POST']) def recharge_balance(): """充值""" user = get_current_user() if not user: return jsonify({'error': '请登录'}), 401 data = request.json amount = float(data.get('amount', 0)) payment_method = data.get('payment_method', 'balance') if amount < 10: return jsonify({'error': '充值金额最少10元'}), 400 if amount > 10000: return jsonify({'error': '充值金额最多10000元'}), 400 balance_before = user.balance # 创建充值记录 order_no = f"RC{datetime.now().strftime('%Y%m%d%H%M%S')}{user.id}" recharge = UserRecharge( user_id=user.id, amount=amount, balance_before=balance_before, payment_method=payment_method, status='completed', order_no=order_no, completed_at=datetime.utcnow() ) # 更新余额 user.balance += amount recharge.balance_after = user.balance db.session.add(recharge) # 创建流水记录 transaction = AccountTransaction( user_id=user.id, transaction_type='recharge', amount=amount, balance_before=balance_before, balance_after=user.balance, related_id=recharge.id, related_type='recharge', description=f'充值¥{amount}' ) db.session.add(transaction) db.session.commit() return jsonify({ 'success': True, 'balance': user.balance, 'recharge_id': recharge.id }) @app.route('/api/profile/refund', methods=['POST']) def request_refund(): """申请退款""" user = get_current_user() if not user: return jsonify({'error': '请登录'}), 401 data = request.json amount = float(data.get('amount', 0)) reason = data.get('reason', '') if amount <= 0: return jsonify({'error': '退款金额必须大于0'}), 400 if amount > user.balance: return jsonify({'error': '退款金额不能超过余额'}), 400 if not reason: return jsonify({'error': '请填写退款原因'}), 400 balance_before = user.balance # 创建退款申请 refund = UserRefund( user_id=user.id, amount=amount, balance_before=balance_before, reason=reason, reason_type='user_request', status='pending' ) db.session.add(refund) db.session.commit() return jsonify({ 'success': True, 'refund_id': refund.id, 'message': '退款申请已提交,等待管理员审核' }) @app.route('/api/profile/settings', methods=['POST']) def update_settings(): """更新账户设置""" user = get_current_user() if not user: return jsonify({'error': '请登录'}), 401 data = request.json # 手机号 if 'phone' in data: phone = data.get('phone', '') if phone and len(phone) >= 10: user.phone = phone user.phone_verified = False # 通知邮箱(更换后邮件通知发送到此邮箱) if 'email' in data: email = data.get('email', '') if email and '@' in email: # 检查邮箱是否已被其他用户使用 existing = User.query.filter(User.email == email, User.id != user.id).first() if existing: return jsonify({'error': '该邮箱已被其他用户使用'}), 400 user.email = email # 通知设置 if 'notify_on_complete' in data: user.notify_on_complete = data.get('notify_on_complete', True) if 'notify_on_expire' in data: user.notify_on_expire = data.get('notify_on_expire', True) if 'notify_with_attachment' in data: # 检查是否有附件权限 limits = USER_LIMITS.get(user.user_type, USER_LIMITS['free']) user_features = limits.get('features', []) if 'email_attachment' in user_features or user_features == ['all']: user.notify_with_attachment = data.get('notify_with_attachment', False) else: return jsonify({'error': '邮件附件功能需VIP会员', 'feature': 'email_attachment'}), 403 if 'email_notify' in data: user.email_notify = data.get('email_notify', True) db.session.commit() return jsonify({ 'success': True, 'user': user.to_dict() }) @app.route('/api/profile/invitations') def get_invitations(): """获取邀请记录""" user = get_current_user() if not user: return jsonify({'error': '请登录'}), 401 invitations = UserInvitation.query.filter_by(inviter_id=user.id)\ .order_by(UserInvitation.created_at.desc()).limit(20).all() return jsonify({ 'success': True, 'invitations': [inv.to_dict() for inv in invitations] }) @app.route('/api/invite/') def check_invite(invite_code): """检查邀请码""" inviter = User.query.filter_by(invite_code=invite_code).first() if not inviter: return jsonify({'valid': False, 'error': '邀请码无效'}) return jsonify({ 'valid': True, 'inviter': inviter.username, 'reward': 5 # 被邀请人奖励 }) # ==================== 初始化 ==================== def init_app(): """初始化应用""" # 创建目录 for dir_name in [UPLOAD_DIR, CACHE_DIR, OUTPUT_DIR]: if not os.path.exists(dir_name): os.makedirs(dir_name) # 创建数据库表 with app.app_context(): db.create_all() # 创建默认管理员账号 admin = User.query.filter_by(username='admin').first() if not admin: admin = User( username='admin', email='admin@tphai.com', user_type='admin', is_admin=True, is_active=True ) admin.set_password('admin123') db.session.add(admin) db.session.commit() print("✅ 默认管理员账号已创建: admin / admin123") # 创建示例数据包套餐 if DataPackage.query.count() == 0: packages = [ DataPackage(name='入门包', description='适合轻度使用', translation_count=100, price=9.9, original_price=19.9, valid_days=30, sort_order=1), DataPackage(name='标准包', description='日常使用首选', translation_count=500, price=39.9, original_price=59.9, valid_days=30, sort_order=2, is_recommended=True), DataPackage(name='专业包', description='高频使用更划算', translation_count=2000, price=99.9, original_price=199.9, valid_days=30, sort_order=3), DataPackage(name='无限包', description='畅享无限翻译', translation_count=0, price=199.9, original_price=399.9, valid_days=30, sort_order=4), ] for pkg in packages: db.session.add(pkg) db.session.commit() print("✅ 示例数据包套餐已创建") if __name__ == '__main__': init_app() app.run(host='0.0.0.0', port=19000, debug=True)