""" PDF翻译网站主应用 """ import os import json import uuid import hashlib from datetime import datetime, date from functools import wraps from flask import Flask, request, jsonify, render_template, send_file, session, redirect, url_for from flask_sqlalchemy import SQLAlchemy from werkzeug.utils import secure_filename from config import * from models import (db, User, Translation, TranslationCache, GuestTranslation, DataPackage, UserPackage, DynamicConfig) from services import TranslationService, CacheService, TranslationTask from admin import admin_bp # ==================== 创建应用 ==================== app = Flask(__name__) app.config['SECRET_KEY'] = SECRET_KEY app.config['SQLALCHEMY_DATABASE_URI'] = DATABASE_URL app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['MAX_CONTENT_LENGTH'] = MAX_FILE_SIZE # 初始化数据库 db.init_app(app) # 注册后台管理蓝图 app.register_blueprint(admin_bp) # 初始化服务 cache_service = CacheService(CACHE_DIR, CACHE_EXPIRE_DAYS) # ==================== 辅助函数 ==================== def get_current_user(): """获取当前用户""" user_id = session.get('user_id') if user_id: return User.query.get(user_id) return None def get_or_create_guest(): """获取或创建访客记录""" session_id = session.get('guest_id') if not session_id: session_id = str(uuid.uuid4()) session['guest_id'] = session_id guest = GuestTranslation.query.filter_by(session_id=session_id).first() if not guest: guest = GuestTranslation( session_id=session_id, ip_address=request.remote_addr ) db.session.add(guest) db.session.commit() return guest def check_guest_limit(guest): """检查访客翻译限制""" today = date.today() if guest.last_translate_date != today: guest.daily_count = 0 guest.last_translate_date = today db.session.commit() limit = USER_LIMITS['guest']['daily_translations'] if guest.daily_count >= limit: return False, f"今日翻译次数已达上限({limit}次),请登录获取更多次数" return True, "OK" def allowed_file(filename): """检查文件类型""" return '.' in filename and filename.lower().endswith('.pdf') def compute_file_hash(file_content): """计算文件哈希""" return hashlib.md5(file_content).hexdigest() # ==================== 路由: 页面 ==================== @app.route('/') def index(): """首页""" user = get_current_user() if user: # 检查日期并重置计数(使用上海时区) from datetime import timezone, timedelta shanghai_tz = timezone(timedelta(hours=8)) today = datetime.now(shanghai_tz).date() if user.last_translate_date != today: user.daily_count = 0 user.last_translate_date = today db.session.commit() limits = USER_LIMITS.get(user.user_type, USER_LIMITS['free']) daily_remaining = limits['daily_translations'] - user.daily_count if limits['daily_translations'] > 0 else '无限' max_pages = limits['max_pages'] if limits['max_pages'] > 0 else '无限' else: guest = get_or_create_guest() # 检查日期并重置访客计数 from datetime import timezone, timedelta shanghai_tz = timezone(timedelta(hours=8)) today = datetime.now(shanghai_tz).date() if guest.last_translate_date != today: guest.daily_count = 0 guest.last_translate_date = today db.session.commit() limits = USER_LIMITS['guest'] daily_remaining = limits['daily_translations'] - guest.daily_count max_pages = limits['max_pages'] # 获取用户的功能列表 if user: user_features = limits.get('features', []) else: user_features = USER_LIMITS['guest']['features'] # 定义所有功能及其描述 all_features = { 'basic_translate': {'name': '自动翻译缓存,相同文件秒出结果', 'base': True}, 'custom_instruction': {'name': '自定义翻译要求', 'base': False}, 'compare_view': {'name': '原文译文对比查看', 'base': False}, 'history': {'name': '翻译历史记录', 'base': False}, 'retranslate': {'name': '不满意重新翻译', 'base': False}, 'export_pdf': {'name': '导出PDF格式', 'base': False}, 'batch_translate': {'name': '批量翻译', 'base': False}, 'custom_terms': {'name': '自定义术语库', 'base': False}, 'priority_queue': {'name': '优先队列处理', 'base': False}, } # 判断功能是否可用(vip_enterprise 的 features=["all"] 表示全部可用) if user_features == ['all']: user_features = list(all_features.keys()) # 构建功能展示列表 feature_display = [] for feat_key, feat_info in all_features.items(): has_feature = feat_key in user_features # 基础功能(所有用户都有)不显示,只显示需要权限的功能 if not feat_info['base']: feature_display.append({ 'name': feat_info['name'], 'has': has_feature }) return render_template('index.html', user=user, limits=limits, daily_remaining=daily_remaining, max_pages=max_pages, plans=MEMBERSHIP_PLANS, features=feature_display ) @app.route('/translate/') def translation_detail(translation_id): """翻译详情页""" user = get_current_user() translation = Translation.query.get(translation_id) if not translation: return "翻译记录不存在", 404 # 权限检查 if user and translation.user_id != user.id: return "无权访问", 403 if not user and not translation.user_id: # guest记录,检查session pass return render_template('translation.html', translation=translation, user=user ) @app.route('/history') def history(): """翻译历史""" user = get_current_user() if not user: return redirect(url_for('login')) translations = Translation.query.filter_by(user_id=user.id)\ .order_by(Translation.created_at.desc()).limit(50).all() return render_template('history.html', user=user, translations=translations ) @app.route('/pricing') def pricing(): """会员定价页""" return render_template('pricing.html', plans=MEMBERSHIP_PLANS) # ==================== 路由: API ==================== @app.route('/api/upload', methods=['POST']) def upload_pdf(): """上传PDF文件""" user = get_current_user() # 检查文件 if 'file' not in request.files: return jsonify({'error': '未上传文件'}), 400 file = request.files['file'] if file.filename == '': return jsonify({'error': '未选择文件'}), 400 if not allowed_file(file.filename): return jsonify({'error': '只支持PDF文件'}), 400 # 获取翻译参数 instruction = request.form.get('instruction', None) # 用户翻译要求 # 读取文件内容 file_content = file.read() file_hash = compute_file_hash(file_content) filename = secure_filename(file.filename) # 获取页数 try: from pypdf import PdfReader import io reader = PdfReader(io.BytesIO(file_content)) page_count = len(reader.pages) except Exception as e: return jsonify({'error': f'PDF解析失败: {e}'}), 400 # 权限检查 if user: can_translate, msg = user.can_translate(page_count, {'USER_LIMITS': USER_LIMITS}) if not can_translate: return jsonify({'error': msg}), 403 else: guest = get_or_create_guest() can_translate, msg = check_guest_limit(guest) if not can_translate: return jsonify({'error': msg}), 403 # 检查页数限制 max_pages = USER_LIMITS['guest']['max_pages'] if page_count > max_pages: return jsonify({'error': f'PDF页数超出限制(最大{max_pages}页)'}), 403 # 检查缓存 cache_path = cache_service.get_cache(file_hash) from_cache = False if cache_path and ENABLE_CACHE and not instruction: # 有缓存且无特殊翻译要求,直接使用缓存 from_cache = True output_path = cache_path else: # 需要翻译 # 保存上传文件 upload_dir = os.path.join(UPLOAD_DIR, str(uuid.uuid4())) os.makedirs(upload_dir, exist_ok=True) upload_path = os.path.join(upload_dir, filename) with open(upload_path, 'wb') as f: f.write(file_content) # 创建输出路径 output_dir = os.path.join(OUTPUT_DIR, str(uuid.uuid4())) os.makedirs(output_dir, exist_ok=True) output_path = os.path.join(output_dir, f"{filename}_translated.md") # 创建异步翻译任务(先不创建,等translation_id生成后) task_id = str(uuid.uuid4()) # 创建翻译记录 translation = Translation( user_id=user.id if user else None, file_hash=file_hash, original_filename=filename, file_size=len(file_content), page_count=page_count, translate_params=json.dumps({'instruction': instruction}) if instruction else None, status='processing' if not from_cache else 'completed', progress=0 if not from_cache else 100, output_path=output_path, from_cache=from_cache ) db.session.add(translation) # 预先提交获取 translation_id if not from_cache: db.session.flush() # 获取 ID 但不提交完整事务 # 创建异步翻译任务(需要翻译时) if not from_cache: TranslationTask.create_task( task_id, upload_path, output_path, {'LLM_CONFIG': LLM_CONFIG}, instruction, translation_id=translation.id, app=app ) # 更新用户/访客计数 if user: user.increment_count() else: guest.daily_count += 1 guest.last_translate_date = date.today() # 更新缓存记录 if from_cache: cache_record = TranslationCache.query.filter_by(file_hash=file_hash).first() if cache_record: cache_record.increment_hit() db.session.commit() return jsonify({ 'success': True, 'translation_id': translation.id, 'file_hash': file_hash, 'page_count': page_count, 'from_cache': from_cache, 'task_id': task_id if not from_cache else None, 'message': '使用缓存结果' if from_cache else '翻译任务已创建' }) @app.route('/api/status/') def translation_status(translation_id): """获取翻译状态""" translation = Translation.query.get(translation_id) if not translation: return jsonify({'error': '翻译记录不存在'}), 404 # 如果有task_id,检查任务状态 if translation.status == 'processing': # 这里可以查询TranslationTask pass return jsonify({ 'id': translation.id, 'status': translation.status, 'progress': translation.progress, 'from_cache': translation.from_cache, 'error': translation.error_message }) @app.route('/api/task/') def task_status(task_id): """获取任务状态""" task = TranslationTask.get_task(task_id) if not task: return jsonify({'error': '任务不存在'}), 404 return jsonify(task) @app.route('/api/result/') def get_result(translation_id): """获取翻译结果""" user = get_current_user() translation = Translation.query.get(translation_id) if not translation: return jsonify({'error': '翻译记录不存在'}), 404 if translation.status != 'completed': return jsonify({'error': '翻译未完成'}), 400 # 检查输出文件 if not translation.output_path or not os.path.exists(translation.output_path): return jsonify({'error': '翻译结果文件不存在'}), 404 # 读取结果 with open(translation.output_path, 'r', encoding='utf-8') as f: content = f.read() return jsonify({ 'id': translation.id, 'filename': translation.original_filename, 'content': content, 'output_path': translation.output_path }) @app.route('/api/download/') def download_result(translation_id): """下载翻译结果""" user = get_current_user() translation = Translation.query.get(translation_id) if not translation or not translation.output_path: return jsonify({'error': '翻译记录不存在'}), 404 if not os.path.exists(translation.output_path): return jsonify({'error': '文件不存在'}), 404 filename = f"{translation.original_filename}_translated.md" return send_file(translation.output_path, as_attachment=True, download_name=filename) @app.route('/api/retranslate/', methods=['POST']) def retranslate(translation_id): """重新翻译""" user = get_current_user() if not user: return jsonify({'error': '请登录后使用此功能'}), 401 translation = Translation.query.get(translation_id) if not translation or translation.user_id != user.id: return jsonify({'error': '无权操作'}), 403 # 检查功能权限 limits = USER_LIMITS.get(user.user_type, USER_LIMITS['free']) if 'retranslate' not in limits['features']: return jsonify({'error': '会员功能,请升级'}), 403 instruction = request.json.get('instruction', '') # 查找原文件 # 这里需要从原始上传路径恢复,简化处理 # 创建新翻译记录 new_translation = Translation( user_id=user.id, file_hash=translation.file_hash, original_filename=translation.original_filename, file_size=translation.file_size, page_count=translation.page_count, translate_params=json.dumps({'instruction': instruction}), status='processing', parent_id=translation_id, retranslate_request=instruction ) db.session.add(new_translation) db.session.commit() # TODO: 实际翻译逻辑 return jsonify({ 'success': True, 'translation_id': new_translation.id, 'message': '重译任务已创建' }) @app.route('/api/compare/') def compare_view(translation_id): """对比查看""" user = get_current_user() if not user: return jsonify({'error': '请登录后使用此功能'}), 401 translation = Translation.query.get(translation_id) if not translation or translation.user_id != user.id: return jsonify({'error': '无权访问'}), 403 # 生成对比文件 # TODO: 实现对比功能 return jsonify({ 'id': translation.id, 'original': '原文内容', 'translated': '译文内容' }) # ==================== 路由: 用户系统 ==================== @app.route('/login', methods=['GET', 'POST']) def login(): """登录""" if request.method == 'GET': return render_template('login.html') data = request.json username = data.get('username') password = data.get('password') user = User.query.filter_by(username=username).first() if not user or not user.check_password(password): return jsonify({'error': '用户名或密码错误'}), 401 session['user_id'] = user.id return jsonify({'success': True, 'user': user.to_dict()}) @app.route('/register', methods=['GET', 'POST']) def register(): """注册""" if request.method == 'GET': return render_template('register.html') data = request.json username = data.get('username') email = data.get('email') password = data.get('password') # 检查用户是否存在 if User.query.filter_by(username=username).first(): return jsonify({'error': '用户名已存在'}), 400 if User.query.filter_by(email=email).first(): return jsonify({'error': '邮箱已注册'}), 400 # 创建用户 user = User(username=username, email=email, user_type='free') user.set_password(password) db.session.add(user) db.session.commit() session['user_id'] = user.id return jsonify({'success': True, 'user': user.to_dict()}) @app.route('/logout') def logout(): """退出登录""" session.pop('user_id', None) return redirect(url_for('index')) @app.route('/api/user/info') def user_info(): """用户信息""" user = get_current_user() if not user: return jsonify({'user': None}) limits = USER_LIMITS.get(user.user_type, USER_LIMITS['free']) return jsonify({ 'user': user.to_dict(), 'limits': limits }) # ==================== 初始化 ==================== def init_app(): """初始化应用""" # 创建目录 for dir_name in [UPLOAD_DIR, CACHE_DIR, OUTPUT_DIR]: if not os.path.exists(dir_name): os.makedirs(dir_name) # 创建数据库表 with app.app_context(): db.create_all() # 创建默认管理员账号 admin = User.query.filter_by(username='admin').first() if not admin: admin = User( username='admin', email='admin@tphai.com', user_type='admin', is_admin=True, is_active=True ) admin.set_password('admin123') db.session.add(admin) db.session.commit() print("✅ 默认管理员账号已创建: admin / admin123") # 创建示例数据包套餐 if DataPackage.query.count() == 0: packages = [ DataPackage(name='入门包', description='适合轻度使用', translation_count=100, price=9.9, original_price=19.9, valid_days=30, sort_order=1), DataPackage(name='标准包', description='日常使用首选', translation_count=500, price=39.9, original_price=59.9, valid_days=30, sort_order=2, is_recommended=True), DataPackage(name='专业包', description='高频使用更划算', translation_count=2000, price=99.9, original_price=199.9, valid_days=30, sort_order=3), DataPackage(name='无限包', description='畅享无限翻译', translation_count=0, price=199.9, original_price=399.9, valid_days=30, sort_order=4), ] for pkg in packages: db.session.add(pkg) db.session.commit() print("✅ 示例数据包套餐已创建") if __name__ == '__main__': init_app() app.run(host='0.0.0.0', port=19000, debug=True)