Files
pdf-translate-web/app.py
coder f55f2027e5 feat: 首页功能特点根据用户类型动态显示
- 访客/免费/VIP各等级显示不同可用功能
- 有权限显示,无权限显示
- 添加.gitignore排除uploads/outputs/cache目录
2026-04-14 18:00:06 +08:00

597 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
PDF翻译网站主应用
"""
import os
import json
import uuid
import hashlib
from datetime import datetime, date
from functools import wraps
from flask import Flask, request, jsonify, render_template, send_file, session, redirect, url_for
from flask_sqlalchemy import SQLAlchemy
from werkzeug.utils import secure_filename
from config import *
from models import (db, User, Translation, TranslationCache, GuestTranslation,
DataPackage, UserPackage, DynamicConfig)
from services import TranslationService, CacheService, TranslationTask
from admin import admin_bp
# ==================== 创建应用 ====================
app = Flask(__name__)
app.config['SECRET_KEY'] = SECRET_KEY
app.config['SQLALCHEMY_DATABASE_URI'] = DATABASE_URL
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['MAX_CONTENT_LENGTH'] = MAX_FILE_SIZE
# 初始化数据库
db.init_app(app)
# 注册后台管理蓝图
app.register_blueprint(admin_bp)
# 初始化服务
cache_service = CacheService(CACHE_DIR, CACHE_EXPIRE_DAYS)
# ==================== 辅助函数 ====================
def get_current_user():
"""获取当前用户"""
user_id = session.get('user_id')
if user_id:
return User.query.get(user_id)
return None
def get_or_create_guest():
"""获取或创建访客记录"""
session_id = session.get('guest_id')
if not session_id:
session_id = str(uuid.uuid4())
session['guest_id'] = session_id
guest = GuestTranslation.query.filter_by(session_id=session_id).first()
if not guest:
guest = GuestTranslation(
session_id=session_id,
ip_address=request.remote_addr
)
db.session.add(guest)
db.session.commit()
return guest
def check_guest_limit(guest):
"""检查访客翻译限制"""
today = date.today()
if guest.last_translate_date != today:
guest.daily_count = 0
guest.last_translate_date = today
db.session.commit()
limit = USER_LIMITS['guest']['daily_translations']
if guest.daily_count >= limit:
return False, f"今日翻译次数已达上限({limit}次),请登录获取更多次数"
return True, "OK"
def allowed_file(filename):
"""检查文件类型"""
return '.' in filename and filename.lower().endswith('.pdf')
def compute_file_hash(file_content):
"""计算文件哈希"""
return hashlib.md5(file_content).hexdigest()
# ==================== 路由: 页面 ====================
@app.route('/')
def index():
"""首页"""
user = get_current_user()
if user:
# 检查日期并重置计数(使用上海时区)
from datetime import timezone, timedelta
shanghai_tz = timezone(timedelta(hours=8))
today = datetime.now(shanghai_tz).date()
if user.last_translate_date != today:
user.daily_count = 0
user.last_translate_date = today
db.session.commit()
limits = USER_LIMITS.get(user.user_type, USER_LIMITS['free'])
daily_remaining = limits['daily_translations'] - user.daily_count if limits['daily_translations'] > 0 else '无限'
max_pages = limits['max_pages'] if limits['max_pages'] > 0 else '无限'
else:
guest = get_or_create_guest()
# 检查日期并重置访客计数
from datetime import timezone, timedelta
shanghai_tz = timezone(timedelta(hours=8))
today = datetime.now(shanghai_tz).date()
if guest.last_translate_date != today:
guest.daily_count = 0
guest.last_translate_date = today
db.session.commit()
limits = USER_LIMITS['guest']
daily_remaining = limits['daily_translations'] - guest.daily_count
max_pages = limits['max_pages']
# 获取用户的功能列表
if user:
user_features = limits.get('features', [])
else:
user_features = USER_LIMITS['guest']['features']
# 定义所有功能及其描述
all_features = {
'basic_translate': {'name': '自动翻译缓存,相同文件秒出结果', 'base': True},
'custom_instruction': {'name': '自定义翻译要求', 'base': False},
'compare_view': {'name': '原文译文对比查看', 'base': False},
'history': {'name': '翻译历史记录', 'base': False},
'retranslate': {'name': '不满意重新翻译', 'base': False},
'export_pdf': {'name': '导出PDF格式', 'base': False},
'batch_translate': {'name': '批量翻译', 'base': False},
'custom_terms': {'name': '自定义术语库', 'base': False},
'priority_queue': {'name': '优先队列处理', 'base': False},
}
# 判断功能是否可用vip_enterprise 的 features=["all"] 表示全部可用)
if user_features == ['all']:
user_features = list(all_features.keys())
# 构建功能展示列表
feature_display = []
for feat_key, feat_info in all_features.items():
has_feature = feat_key in user_features
# 基础功能(所有用户都有)不显示,只显示需要权限的功能
if not feat_info['base']:
feature_display.append({
'name': feat_info['name'],
'has': has_feature
})
return render_template('index.html',
user=user,
limits=limits,
daily_remaining=daily_remaining,
max_pages=max_pages,
plans=MEMBERSHIP_PLANS,
features=feature_display
)
@app.route('/translate/<int:translation_id>')
def translation_detail(translation_id):
"""翻译详情页"""
user = get_current_user()
translation = Translation.query.get(translation_id)
if not translation:
return "翻译记录不存在", 404
# 权限检查
if user and translation.user_id != user.id:
return "无权访问", 403
if not user and not translation.user_id:
# guest记录检查session
pass
return render_template('translation.html',
translation=translation,
user=user
)
@app.route('/history')
def history():
"""翻译历史"""
user = get_current_user()
if not user:
return redirect(url_for('login'))
translations = Translation.query.filter_by(user_id=user.id)\
.order_by(Translation.created_at.desc()).limit(50).all()
return render_template('history.html',
user=user,
translations=translations
)
@app.route('/pricing')
def pricing():
"""会员定价页"""
return render_template('pricing.html', plans=MEMBERSHIP_PLANS)
# ==================== 路由: API ====================
@app.route('/api/upload', methods=['POST'])
def upload_pdf():
"""上传PDF文件"""
user = get_current_user()
# 检查文件
if 'file' not in request.files:
return jsonify({'error': '未上传文件'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': '未选择文件'}), 400
if not allowed_file(file.filename):
return jsonify({'error': '只支持PDF文件'}), 400
# 获取翻译参数
instruction = request.form.get('instruction', None) # 用户翻译要求
# 读取文件内容
file_content = file.read()
file_hash = compute_file_hash(file_content)
filename = secure_filename(file.filename)
# 获取页数
try:
from pypdf import PdfReader
import io
reader = PdfReader(io.BytesIO(file_content))
page_count = len(reader.pages)
except Exception as e:
return jsonify({'error': f'PDF解析失败: {e}'}), 400
# 权限检查
if user:
can_translate, msg = user.can_translate(page_count, {'USER_LIMITS': USER_LIMITS})
if not can_translate:
return jsonify({'error': msg}), 403
else:
guest = get_or_create_guest()
can_translate, msg = check_guest_limit(guest)
if not can_translate:
return jsonify({'error': msg}), 403
# 检查页数限制
max_pages = USER_LIMITS['guest']['max_pages']
if page_count > max_pages:
return jsonify({'error': f'PDF页数超出限制最大{max_pages}页)'}), 403
# 检查缓存
cache_path = cache_service.get_cache(file_hash)
from_cache = False
if cache_path and ENABLE_CACHE and not instruction:
# 有缓存且无特殊翻译要求,直接使用缓存
from_cache = True
output_path = cache_path
else:
# 需要翻译
# 保存上传文件
upload_dir = os.path.join(UPLOAD_DIR, str(uuid.uuid4()))
os.makedirs(upload_dir, exist_ok=True)
upload_path = os.path.join(upload_dir, filename)
with open(upload_path, 'wb') as f:
f.write(file_content)
# 创建输出路径
output_dir = os.path.join(OUTPUT_DIR, str(uuid.uuid4()))
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, f"{filename}_translated.md")
# 创建异步翻译任务先不创建等translation_id生成后
task_id = str(uuid.uuid4())
# 创建翻译记录
translation = Translation(
user_id=user.id if user else None,
file_hash=file_hash,
original_filename=filename,
file_size=len(file_content),
page_count=page_count,
translate_params=json.dumps({'instruction': instruction}) if instruction else None,
status='processing' if not from_cache else 'completed',
progress=0 if not from_cache else 100,
output_path=output_path,
from_cache=from_cache
)
db.session.add(translation)
# 预先提交获取 translation_id
if not from_cache:
db.session.flush() # 获取 ID 但不提交完整事务
# 创建异步翻译任务(需要翻译时)
if not from_cache:
TranslationTask.create_task(
task_id, upload_path, output_path,
{'LLM_CONFIG': LLM_CONFIG},
instruction,
translation_id=translation.id,
app=app
)
# 更新用户/访客计数
if user:
user.increment_count()
else:
guest.daily_count += 1
guest.last_translate_date = date.today()
# 更新缓存记录
if from_cache:
cache_record = TranslationCache.query.filter_by(file_hash=file_hash).first()
if cache_record:
cache_record.increment_hit()
db.session.commit()
return jsonify({
'success': True,
'translation_id': translation.id,
'file_hash': file_hash,
'page_count': page_count,
'from_cache': from_cache,
'task_id': task_id if not from_cache else None,
'message': '使用缓存结果' if from_cache else '翻译任务已创建'
})
@app.route('/api/status/<int:translation_id>')
def translation_status(translation_id):
"""获取翻译状态"""
translation = Translation.query.get(translation_id)
if not translation:
return jsonify({'error': '翻译记录不存在'}), 404
# 如果有task_id检查任务状态
if translation.status == 'processing':
# 这里可以查询TranslationTask
pass
return jsonify({
'id': translation.id,
'status': translation.status,
'progress': translation.progress,
'from_cache': translation.from_cache,
'error': translation.error_message
})
@app.route('/api/task/<task_id>')
def task_status(task_id):
"""获取任务状态"""
task = TranslationTask.get_task(task_id)
if not task:
return jsonify({'error': '任务不存在'}), 404
return jsonify(task)
@app.route('/api/result/<int:translation_id>')
def get_result(translation_id):
"""获取翻译结果"""
user = get_current_user()
translation = Translation.query.get(translation_id)
if not translation:
return jsonify({'error': '翻译记录不存在'}), 404
if translation.status != 'completed':
return jsonify({'error': '翻译未完成'}), 400
# 检查输出文件
if not translation.output_path or not os.path.exists(translation.output_path):
return jsonify({'error': '翻译结果文件不存在'}), 404
# 读取结果
with open(translation.output_path, 'r', encoding='utf-8') as f:
content = f.read()
return jsonify({
'id': translation.id,
'filename': translation.original_filename,
'content': content,
'output_path': translation.output_path
})
@app.route('/api/download/<int:translation_id>')
def download_result(translation_id):
"""下载翻译结果"""
user = get_current_user()
translation = Translation.query.get(translation_id)
if not translation or not translation.output_path:
return jsonify({'error': '翻译记录不存在'}), 404
if not os.path.exists(translation.output_path):
return jsonify({'error': '文件不存在'}), 404
filename = f"{translation.original_filename}_translated.md"
return send_file(translation.output_path, as_attachment=True, download_name=filename)
@app.route('/api/retranslate/<int:translation_id>', methods=['POST'])
def retranslate(translation_id):
"""重新翻译"""
user = get_current_user()
if not user:
return jsonify({'error': '请登录后使用此功能'}), 401
translation = Translation.query.get(translation_id)
if not translation or translation.user_id != user.id:
return jsonify({'error': '无权操作'}), 403
# 检查功能权限
limits = USER_LIMITS.get(user.user_type, USER_LIMITS['free'])
if 'retranslate' not in limits['features']:
return jsonify({'error': '会员功能,请升级'}), 403
instruction = request.json.get('instruction', '')
# 查找原文件
# 这里需要从原始上传路径恢复,简化处理
# 创建新翻译记录
new_translation = Translation(
user_id=user.id,
file_hash=translation.file_hash,
original_filename=translation.original_filename,
file_size=translation.file_size,
page_count=translation.page_count,
translate_params=json.dumps({'instruction': instruction}),
status='processing',
parent_id=translation_id,
retranslate_request=instruction
)
db.session.add(new_translation)
db.session.commit()
# TODO: 实际翻译逻辑
return jsonify({
'success': True,
'translation_id': new_translation.id,
'message': '重译任务已创建'
})
@app.route('/api/compare/<int:translation_id>')
def compare_view(translation_id):
"""对比查看"""
user = get_current_user()
if not user:
return jsonify({'error': '请登录后使用此功能'}), 401
translation = Translation.query.get(translation_id)
if not translation or translation.user_id != user.id:
return jsonify({'error': '无权访问'}), 403
# 生成对比文件
# TODO: 实现对比功能
return jsonify({
'id': translation.id,
'original': '原文内容',
'translated': '译文内容'
})
# ==================== 路由: 用户系统 ====================
@app.route('/login', methods=['GET', 'POST'])
def login():
"""登录"""
if request.method == 'GET':
return render_template('login.html')
data = request.json
username = data.get('username')
password = data.get('password')
user = User.query.filter_by(username=username).first()
if not user or not user.check_password(password):
return jsonify({'error': '用户名或密码错误'}), 401
session['user_id'] = user.id
return jsonify({'success': True, 'user': user.to_dict()})
@app.route('/register', methods=['GET', 'POST'])
def register():
"""注册"""
if request.method == 'GET':
return render_template('register.html')
data = request.json
username = data.get('username')
email = data.get('email')
password = data.get('password')
# 检查用户是否存在
if User.query.filter_by(username=username).first():
return jsonify({'error': '用户名已存在'}), 400
if User.query.filter_by(email=email).first():
return jsonify({'error': '邮箱已注册'}), 400
# 创建用户
user = User(username=username, email=email, user_type='free')
user.set_password(password)
db.session.add(user)
db.session.commit()
session['user_id'] = user.id
return jsonify({'success': True, 'user': user.to_dict()})
@app.route('/logout')
def logout():
"""退出登录"""
session.pop('user_id', None)
return redirect(url_for('index'))
@app.route('/api/user/info')
def user_info():
"""用户信息"""
user = get_current_user()
if not user:
return jsonify({'user': None})
limits = USER_LIMITS.get(user.user_type, USER_LIMITS['free'])
return jsonify({
'user': user.to_dict(),
'limits': limits
})
# ==================== 初始化 ====================
def init_app():
"""初始化应用"""
# 创建目录
for dir_name in [UPLOAD_DIR, CACHE_DIR, OUTPUT_DIR]:
if not os.path.exists(dir_name):
os.makedirs(dir_name)
# 创建数据库表
with app.app_context():
db.create_all()
# 创建默认管理员账号
admin = User.query.filter_by(username='admin').first()
if not admin:
admin = User(
username='admin',
email='admin@tphai.com',
user_type='admin',
is_admin=True,
is_active=True
)
admin.set_password('admin123')
db.session.add(admin)
db.session.commit()
print("✅ 默认管理员账号已创建: admin / admin123")
# 创建示例数据包套餐
if DataPackage.query.count() == 0:
packages = [
DataPackage(name='入门包', description='适合轻度使用', translation_count=100, price=9.9, original_price=19.9, valid_days=30, sort_order=1),
DataPackage(name='标准包', description='日常使用首选', translation_count=500, price=39.9, original_price=59.9, valid_days=30, sort_order=2, is_recommended=True),
DataPackage(name='专业包', description='高频使用更划算', translation_count=2000, price=99.9, original_price=199.9, valid_days=30, sort_order=3),
DataPackage(name='无限包', description='畅享无限翻译', translation_count=0, price=199.9, original_price=399.9, valid_days=30, sort_order=4),
]
for pkg in packages:
db.session.add(pkg)
db.session.commit()
print("✅ 示例数据包套餐已创建")
if __name__ == '__main__':
init_app()
app.run(host='0.0.0.0', port=19000, debug=True)