597 lines
19 KiB
Python
597 lines
19 KiB
Python
"""
|
||
PDF翻译网站主应用
|
||
"""
|
||
|
||
import os
|
||
import json
|
||
import uuid
|
||
import hashlib
|
||
from datetime import datetime, date
|
||
from functools import wraps
|
||
from flask import Flask, request, jsonify, render_template, send_file, session, redirect, url_for
|
||
from flask_sqlalchemy import SQLAlchemy
|
||
from werkzeug.utils import secure_filename
|
||
|
||
from config import *
|
||
from models import (db, User, Translation, TranslationCache, GuestTranslation,
|
||
DataPackage, UserPackage, DynamicConfig)
|
||
from services import TranslationService, CacheService, TranslationTask
|
||
from admin import admin_bp
|
||
|
||
# ==================== 创建应用 ====================
|
||
app = Flask(__name__)
|
||
app.config['SECRET_KEY'] = SECRET_KEY
|
||
app.config['SQLALCHEMY_DATABASE_URI'] = DATABASE_URL
|
||
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
|
||
app.config['MAX_CONTENT_LENGTH'] = MAX_FILE_SIZE
|
||
|
||
# 初始化数据库
|
||
db.init_app(app)
|
||
|
||
# 注册后台管理蓝图
|
||
app.register_blueprint(admin_bp)
|
||
|
||
# 初始化服务
|
||
cache_service = CacheService(CACHE_DIR, CACHE_EXPIRE_DAYS)
|
||
|
||
# ==================== 辅助函数 ====================
|
||
def get_current_user():
|
||
"""获取当前用户"""
|
||
user_id = session.get('user_id')
|
||
if user_id:
|
||
return User.query.get(user_id)
|
||
return None
|
||
|
||
|
||
def get_or_create_guest():
|
||
"""获取或创建访客记录"""
|
||
session_id = session.get('guest_id')
|
||
if not session_id:
|
||
session_id = str(uuid.uuid4())
|
||
session['guest_id'] = session_id
|
||
|
||
guest = GuestTranslation.query.filter_by(session_id=session_id).first()
|
||
if not guest:
|
||
guest = GuestTranslation(
|
||
session_id=session_id,
|
||
ip_address=request.remote_addr
|
||
)
|
||
db.session.add(guest)
|
||
db.session.commit()
|
||
|
||
return guest
|
||
|
||
|
||
def check_guest_limit(guest):
|
||
"""检查访客翻译限制"""
|
||
today = date.today()
|
||
if guest.last_translate_date != today:
|
||
guest.daily_count = 0
|
||
guest.last_translate_date = today
|
||
db.session.commit()
|
||
|
||
limit = USER_LIMITS['guest']['daily_translations']
|
||
if guest.daily_count >= limit:
|
||
return False, f"今日翻译次数已达上限({limit}次),请登录获取更多次数"
|
||
|
||
return True, "OK"
|
||
|
||
|
||
def allowed_file(filename):
|
||
"""检查文件类型"""
|
||
return '.' in filename and filename.lower().endswith('.pdf')
|
||
|
||
|
||
def compute_file_hash(file_content):
|
||
"""计算文件哈希"""
|
||
return hashlib.md5(file_content).hexdigest()
|
||
|
||
|
||
# ==================== 路由: 页面 ====================
|
||
@app.route('/')
|
||
def index():
|
||
"""首页"""
|
||
user = get_current_user()
|
||
if user:
|
||
# 检查日期并重置计数(使用上海时区)
|
||
from datetime import timezone, timedelta
|
||
shanghai_tz = timezone(timedelta(hours=8))
|
||
today = datetime.now(shanghai_tz).date()
|
||
if user.last_translate_date != today:
|
||
user.daily_count = 0
|
||
user.last_translate_date = today
|
||
db.session.commit()
|
||
|
||
limits = USER_LIMITS.get(user.user_type, USER_LIMITS['free'])
|
||
daily_remaining = limits['daily_translations'] - user.daily_count if limits['daily_translations'] > 0 else '无限'
|
||
max_pages = limits['max_pages'] if limits['max_pages'] > 0 else '无限'
|
||
else:
|
||
guest = get_or_create_guest()
|
||
# 检查日期并重置访客计数
|
||
from datetime import timezone, timedelta
|
||
shanghai_tz = timezone(timedelta(hours=8))
|
||
today = datetime.now(shanghai_tz).date()
|
||
if guest.last_translate_date != today:
|
||
guest.daily_count = 0
|
||
guest.last_translate_date = today
|
||
db.session.commit()
|
||
|
||
limits = USER_LIMITS['guest']
|
||
daily_remaining = limits['daily_translations'] - guest.daily_count
|
||
max_pages = limits['max_pages']
|
||
|
||
# 获取用户的功能列表
|
||
if user:
|
||
user_features = limits.get('features', [])
|
||
else:
|
||
user_features = USER_LIMITS['guest']['features']
|
||
|
||
# 定义所有功能及其描述
|
||
all_features = {
|
||
'basic_translate': {'name': '自动翻译缓存,相同文件秒出结果', 'base': True},
|
||
'custom_instruction': {'name': '自定义翻译要求', 'base': False},
|
||
'compare_view': {'name': '原文译文对比查看', 'base': False},
|
||
'history': {'name': '翻译历史记录', 'base': False},
|
||
'retranslate': {'name': '不满意重新翻译', 'base': False},
|
||
'export_pdf': {'name': '导出PDF格式', 'base': False},
|
||
'batch_translate': {'name': '批量翻译', 'base': False},
|
||
'custom_terms': {'name': '自定义术语库', 'base': False},
|
||
'priority_queue': {'name': '优先队列处理', 'base': False},
|
||
}
|
||
|
||
# 判断功能是否可用(vip_enterprise 的 features=["all"] 表示全部可用)
|
||
if user_features == ['all']:
|
||
user_features = list(all_features.keys())
|
||
|
||
# 构建功能展示列表
|
||
feature_display = []
|
||
for feat_key, feat_info in all_features.items():
|
||
has_feature = feat_key in user_features
|
||
# 基础功能(所有用户都有)不显示,只显示需要权限的功能
|
||
if not feat_info['base']:
|
||
feature_display.append({
|
||
'name': feat_info['name'],
|
||
'has': has_feature
|
||
})
|
||
|
||
return render_template('index.html',
|
||
user=user,
|
||
limits=limits,
|
||
daily_remaining=daily_remaining,
|
||
max_pages=max_pages,
|
||
plans=MEMBERSHIP_PLANS,
|
||
features=feature_display
|
||
)
|
||
|
||
|
||
@app.route('/translate/<int:translation_id>')
|
||
def translation_detail(translation_id):
|
||
"""翻译详情页"""
|
||
user = get_current_user()
|
||
translation = Translation.query.get(translation_id)
|
||
|
||
if not translation:
|
||
return "翻译记录不存在", 404
|
||
|
||
# 权限检查
|
||
if user and translation.user_id != user.id:
|
||
return "无权访问", 403
|
||
|
||
if not user and not translation.user_id:
|
||
# guest记录,检查session
|
||
pass
|
||
|
||
return render_template('translation.html',
|
||
translation=translation,
|
||
user=user
|
||
)
|
||
|
||
|
||
@app.route('/history')
|
||
def history():
|
||
"""翻译历史"""
|
||
user = get_current_user()
|
||
if not user:
|
||
return redirect(url_for('login'))
|
||
|
||
translations = Translation.query.filter_by(user_id=user.id)\
|
||
.order_by(Translation.created_at.desc()).limit(50).all()
|
||
|
||
return render_template('history.html',
|
||
user=user,
|
||
translations=translations
|
||
)
|
||
|
||
|
||
@app.route('/pricing')
|
||
def pricing():
|
||
"""会员定价页"""
|
||
return render_template('pricing.html', plans=MEMBERSHIP_PLANS)
|
||
|
||
|
||
# ==================== 路由: API ====================
|
||
@app.route('/api/upload', methods=['POST'])
|
||
def upload_pdf():
|
||
"""上传PDF文件"""
|
||
user = get_current_user()
|
||
|
||
# 检查文件
|
||
if 'file' not in request.files:
|
||
return jsonify({'error': '未上传文件'}), 400
|
||
|
||
file = request.files['file']
|
||
if file.filename == '':
|
||
return jsonify({'error': '未选择文件'}), 400
|
||
|
||
if not allowed_file(file.filename):
|
||
return jsonify({'error': '只支持PDF文件'}), 400
|
||
|
||
# 获取翻译参数
|
||
instruction = request.form.get('instruction', None) # 用户翻译要求
|
||
|
||
# 读取文件内容
|
||
file_content = file.read()
|
||
file_hash = compute_file_hash(file_content)
|
||
filename = secure_filename(file.filename)
|
||
|
||
# 获取页数
|
||
try:
|
||
from pypdf import PdfReader
|
||
import io
|
||
reader = PdfReader(io.BytesIO(file_content))
|
||
page_count = len(reader.pages)
|
||
except Exception as e:
|
||
return jsonify({'error': f'PDF解析失败: {e}'}), 400
|
||
|
||
# 权限检查
|
||
if user:
|
||
can_translate, msg = user.can_translate(page_count, {'USER_LIMITS': USER_LIMITS})
|
||
if not can_translate:
|
||
return jsonify({'error': msg}), 403
|
||
else:
|
||
guest = get_or_create_guest()
|
||
can_translate, msg = check_guest_limit(guest)
|
||
if not can_translate:
|
||
return jsonify({'error': msg}), 403
|
||
|
||
# 检查页数限制
|
||
max_pages = USER_LIMITS['guest']['max_pages']
|
||
if page_count > max_pages:
|
||
return jsonify({'error': f'PDF页数超出限制(最大{max_pages}页)'}), 403
|
||
|
||
# 检查缓存
|
||
cache_path = cache_service.get_cache(file_hash)
|
||
from_cache = False
|
||
|
||
if cache_path and ENABLE_CACHE and not instruction:
|
||
# 有缓存且无特殊翻译要求,直接使用缓存
|
||
from_cache = True
|
||
output_path = cache_path
|
||
else:
|
||
# 需要翻译
|
||
# 保存上传文件
|
||
upload_dir = os.path.join(UPLOAD_DIR, str(uuid.uuid4()))
|
||
os.makedirs(upload_dir, exist_ok=True)
|
||
upload_path = os.path.join(upload_dir, filename)
|
||
|
||
with open(upload_path, 'wb') as f:
|
||
f.write(file_content)
|
||
|
||
# 创建输出路径
|
||
output_dir = os.path.join(OUTPUT_DIR, str(uuid.uuid4()))
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
output_path = os.path.join(output_dir, f"{filename}_translated.md")
|
||
|
||
# 创建异步翻译任务(先不创建,等translation_id生成后)
|
||
task_id = str(uuid.uuid4())
|
||
|
||
# 创建翻译记录
|
||
translation = Translation(
|
||
user_id=user.id if user else None,
|
||
file_hash=file_hash,
|
||
original_filename=filename,
|
||
file_size=len(file_content),
|
||
page_count=page_count,
|
||
translate_params=json.dumps({'instruction': instruction}) if instruction else None,
|
||
status='processing' if not from_cache else 'completed',
|
||
progress=0 if not from_cache else 100,
|
||
output_path=output_path,
|
||
from_cache=from_cache
|
||
)
|
||
db.session.add(translation)
|
||
|
||
# 预先提交获取 translation_id
|
||
if not from_cache:
|
||
db.session.flush() # 获取 ID 但不提交完整事务
|
||
|
||
# 创建异步翻译任务(需要翻译时)
|
||
if not from_cache:
|
||
TranslationTask.create_task(
|
||
task_id, upload_path, output_path,
|
||
{'LLM_CONFIG': LLM_CONFIG},
|
||
instruction,
|
||
translation_id=translation.id,
|
||
app=app
|
||
)
|
||
|
||
# 更新用户/访客计数
|
||
if user:
|
||
user.increment_count()
|
||
else:
|
||
guest.daily_count += 1
|
||
guest.last_translate_date = date.today()
|
||
|
||
# 更新缓存记录
|
||
if from_cache:
|
||
cache_record = TranslationCache.query.filter_by(file_hash=file_hash).first()
|
||
if cache_record:
|
||
cache_record.increment_hit()
|
||
|
||
db.session.commit()
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'translation_id': translation.id,
|
||
'file_hash': file_hash,
|
||
'page_count': page_count,
|
||
'from_cache': from_cache,
|
||
'task_id': task_id if not from_cache else None,
|
||
'message': '使用缓存结果' if from_cache else '翻译任务已创建'
|
||
})
|
||
|
||
|
||
@app.route('/api/status/<int:translation_id>')
|
||
def translation_status(translation_id):
|
||
"""获取翻译状态"""
|
||
translation = Translation.query.get(translation_id)
|
||
|
||
if not translation:
|
||
return jsonify({'error': '翻译记录不存在'}), 404
|
||
|
||
# 如果有task_id,检查任务状态
|
||
if translation.status == 'processing':
|
||
# 这里可以查询TranslationTask
|
||
pass
|
||
|
||
return jsonify({
|
||
'id': translation.id,
|
||
'status': translation.status,
|
||
'progress': translation.progress,
|
||
'from_cache': translation.from_cache,
|
||
'error': translation.error_message
|
||
})
|
||
|
||
|
||
@app.route('/api/task/<task_id>')
|
||
def task_status(task_id):
|
||
"""获取任务状态"""
|
||
task = TranslationTask.get_task(task_id)
|
||
|
||
if not task:
|
||
return jsonify({'error': '任务不存在'}), 404
|
||
|
||
return jsonify(task)
|
||
|
||
|
||
@app.route('/api/result/<int:translation_id>')
|
||
def get_result(translation_id):
|
||
"""获取翻译结果"""
|
||
user = get_current_user()
|
||
translation = Translation.query.get(translation_id)
|
||
|
||
if not translation:
|
||
return jsonify({'error': '翻译记录不存在'}), 404
|
||
|
||
if translation.status != 'completed':
|
||
return jsonify({'error': '翻译未完成'}), 400
|
||
|
||
# 检查输出文件
|
||
if not translation.output_path or not os.path.exists(translation.output_path):
|
||
return jsonify({'error': '翻译结果文件不存在'}), 404
|
||
|
||
# 读取结果
|
||
with open(translation.output_path, 'r', encoding='utf-8') as f:
|
||
content = f.read()
|
||
|
||
return jsonify({
|
||
'id': translation.id,
|
||
'filename': translation.original_filename,
|
||
'content': content,
|
||
'output_path': translation.output_path
|
||
})
|
||
|
||
|
||
@app.route('/api/download/<int:translation_id>')
|
||
def download_result(translation_id):
|
||
"""下载翻译结果"""
|
||
user = get_current_user()
|
||
translation = Translation.query.get(translation_id)
|
||
|
||
if not translation or not translation.output_path:
|
||
return jsonify({'error': '翻译记录不存在'}), 404
|
||
|
||
if not os.path.exists(translation.output_path):
|
||
return jsonify({'error': '文件不存在'}), 404
|
||
|
||
filename = f"{translation.original_filename}_translated.md"
|
||
return send_file(translation.output_path, as_attachment=True, download_name=filename)
|
||
|
||
|
||
@app.route('/api/retranslate/<int:translation_id>', methods=['POST'])
|
||
def retranslate(translation_id):
|
||
"""重新翻译"""
|
||
user = get_current_user()
|
||
if not user:
|
||
return jsonify({'error': '请登录后使用此功能'}), 401
|
||
|
||
translation = Translation.query.get(translation_id)
|
||
if not translation or translation.user_id != user.id:
|
||
return jsonify({'error': '无权操作'}), 403
|
||
|
||
# 检查功能权限
|
||
limits = USER_LIMITS.get(user.user_type, USER_LIMITS['free'])
|
||
if 'retranslate' not in limits['features']:
|
||
return jsonify({'error': '会员功能,请升级'}), 403
|
||
|
||
instruction = request.json.get('instruction', '')
|
||
|
||
# 查找原文件
|
||
# 这里需要从原始上传路径恢复,简化处理
|
||
|
||
# 创建新翻译记录
|
||
new_translation = Translation(
|
||
user_id=user.id,
|
||
file_hash=translation.file_hash,
|
||
original_filename=translation.original_filename,
|
||
file_size=translation.file_size,
|
||
page_count=translation.page_count,
|
||
translate_params=json.dumps({'instruction': instruction}),
|
||
status='processing',
|
||
parent_id=translation_id,
|
||
retranslate_request=instruction
|
||
)
|
||
db.session.add(new_translation)
|
||
db.session.commit()
|
||
|
||
# TODO: 实际翻译逻辑
|
||
|
||
return jsonify({
|
||
'success': True,
|
||
'translation_id': new_translation.id,
|
||
'message': '重译任务已创建'
|
||
})
|
||
|
||
|
||
@app.route('/api/compare/<int:translation_id>')
|
||
def compare_view(translation_id):
|
||
"""对比查看"""
|
||
user = get_current_user()
|
||
if not user:
|
||
return jsonify({'error': '请登录后使用此功能'}), 401
|
||
|
||
translation = Translation.query.get(translation_id)
|
||
if not translation or translation.user_id != user.id:
|
||
return jsonify({'error': '无权访问'}), 403
|
||
|
||
# 生成对比文件
|
||
# TODO: 实现对比功能
|
||
|
||
return jsonify({
|
||
'id': translation.id,
|
||
'original': '原文内容',
|
||
'translated': '译文内容'
|
||
})
|
||
|
||
|
||
# ==================== 路由: 用户系统 ====================
|
||
@app.route('/login', methods=['GET', 'POST'])
|
||
def login():
|
||
"""登录"""
|
||
if request.method == 'GET':
|
||
return render_template('login.html')
|
||
|
||
data = request.json
|
||
username = data.get('username')
|
||
password = data.get('password')
|
||
|
||
user = User.query.filter_by(username=username).first()
|
||
if not user or not user.check_password(password):
|
||
return jsonify({'error': '用户名或密码错误'}), 401
|
||
|
||
session['user_id'] = user.id
|
||
return jsonify({'success': True, 'user': user.to_dict()})
|
||
|
||
|
||
@app.route('/register', methods=['GET', 'POST'])
|
||
def register():
|
||
"""注册"""
|
||
if request.method == 'GET':
|
||
return render_template('register.html')
|
||
|
||
data = request.json
|
||
username = data.get('username')
|
||
email = data.get('email')
|
||
password = data.get('password')
|
||
|
||
# 检查用户是否存在
|
||
if User.query.filter_by(username=username).first():
|
||
return jsonify({'error': '用户名已存在'}), 400
|
||
|
||
if User.query.filter_by(email=email).first():
|
||
return jsonify({'error': '邮箱已注册'}), 400
|
||
|
||
# 创建用户
|
||
user = User(username=username, email=email, user_type='free')
|
||
user.set_password(password)
|
||
db.session.add(user)
|
||
db.session.commit()
|
||
|
||
session['user_id'] = user.id
|
||
return jsonify({'success': True, 'user': user.to_dict()})
|
||
|
||
|
||
@app.route('/logout')
|
||
def logout():
|
||
"""退出登录"""
|
||
session.pop('user_id', None)
|
||
return redirect(url_for('index'))
|
||
|
||
|
||
@app.route('/api/user/info')
|
||
def user_info():
|
||
"""用户信息"""
|
||
user = get_current_user()
|
||
if not user:
|
||
return jsonify({'user': None})
|
||
|
||
limits = USER_LIMITS.get(user.user_type, USER_LIMITS['free'])
|
||
return jsonify({
|
||
'user': user.to_dict(),
|
||
'limits': limits
|
||
})
|
||
|
||
|
||
# ==================== 初始化 ====================
|
||
def init_app():
|
||
"""初始化应用"""
|
||
# 创建目录
|
||
for dir_name in [UPLOAD_DIR, CACHE_DIR, OUTPUT_DIR]:
|
||
if not os.path.exists(dir_name):
|
||
os.makedirs(dir_name)
|
||
|
||
# 创建数据库表
|
||
with app.app_context():
|
||
db.create_all()
|
||
|
||
# 创建默认管理员账号
|
||
admin = User.query.filter_by(username='admin').first()
|
||
if not admin:
|
||
admin = User(
|
||
username='admin',
|
||
email='admin@tphai.com',
|
||
user_type='admin',
|
||
is_admin=True,
|
||
is_active=True
|
||
)
|
||
admin.set_password('admin123')
|
||
db.session.add(admin)
|
||
db.session.commit()
|
||
print("✅ 默认管理员账号已创建: admin / admin123")
|
||
|
||
# 创建示例数据包套餐
|
||
if DataPackage.query.count() == 0:
|
||
packages = [
|
||
DataPackage(name='入门包', description='适合轻度使用', translation_count=100, price=9.9, original_price=19.9, valid_days=30, sort_order=1),
|
||
DataPackage(name='标准包', description='日常使用首选', translation_count=500, price=39.9, original_price=59.9, valid_days=30, sort_order=2, is_recommended=True),
|
||
DataPackage(name='专业包', description='高频使用更划算', translation_count=2000, price=99.9, original_price=199.9, valid_days=30, sort_order=3),
|
||
DataPackage(name='无限包', description='畅享无限翻译', translation_count=0, price=199.9, original_price=399.9, valid_days=30, sort_order=4),
|
||
]
|
||
for pkg in packages:
|
||
db.session.add(pkg)
|
||
db.session.commit()
|
||
print("✅ 示例数据包套餐已创建")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
init_app()
|
||
app.run(host='0.0.0.0', port=19000, debug=True) |