Files
tech-forum/backend/app.py
hubian cb4b7d5363 feat: v1.1.0 安全重构
- 后台添加登录验证(Session + JWT双重验证)
- JSON存储改为SQLite数据库,解决并发问题
- API密钥移至config.py,支持环境变量覆盖
- SECRET_KEY改为随机生成
- 新增管理员登录页面
- 修复README.md乱码
- 更新.gitignore忽略敏感配置
2026-04-12 16:56:35 +08:00

626 lines
19 KiB
Python

"""
技术论坛与技术分享网站 - 后端API (重构版)
"""
from flask import Flask, request, jsonify, send_file
from flask_cors import CORS
import jwt
import datetime
import os
from pathlib import Path
# 导入配置和模型
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
from config import SECRET_KEY, LLM_BASE_URL, LLM_API_KEY, LLM_MODEL, DATABASE_PATH, BACKEND_PORT
from models import Database, UserModel, PostModel, ReplyModel, TopicModel
app = Flask(__name__, static_folder='../frontend', static_url_path='')
CORS(app)
# 初始化数据库
db = Database(DATABASE_PATH)
user_model = UserModel(db)
post_model = PostModel(db)
reply_model = ReplyModel(db)
topic_model = TopicModel(db)
# ============ 辅助函数 ============
def generate_token(user_id):
return jwt.encode({
'user_id': user_id,
'exp': datetime.datetime.utcnow() + datetime.timedelta(days=30)
}, SECRET_KEY, algorithm='HS256')
def verify_token(token):
try:
return jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
except:
return None
def get_current_user():
token = request.headers.get('Authorization', '').replace('Bearer ', '')
if not token:
return None
data = verify_token(token)
if not data:
return None
return user_model.get_by_id(data['user_id'])
# ============ 页面路由 ============
@app.route('/')
def index():
return send_file('../frontend/index.html')
@app.route('/login')
def login_page():
return send_file('../frontend/login.html')
@app.route('/register')
def register_page():
return send_file('../frontend/register.html')
@app.route('/post/<post_id>')
def post_page(post_id):
return send_file('../frontend/post.html')
@app.route('/topic/<topic_id>')
def topic_page(topic_id):
return send_file('../frontend/topic.html')
@app.route('/user/<user_id>')
def user_page(user_id):
return send_file('../frontend/user.html')
@app.route('/create')
def create_page():
return send_file('../frontend/create.html')
# ============ API: 用户认证 ============
@app.route('/api/register', methods=['POST'])
def api_register():
data = request.json
username = data.get('username', '').strip()
email = data.get('email', '').strip().lower()
phone = data.get('phone', '').strip()
password = data.get('password', '')
confirm_password = data.get('confirm_password', '')
# 验证
if not username or len(username) < 2:
return jsonify({'error': '用户名至少2个字符'}), 400
if not email or '@' not in email:
return jsonify({'error': '请输入有效的邮箱地址'}), 400
if not password or len(password) < 6:
return jsonify({'error': '密码至少6个字符'}), 400
if password != confirm_password:
return jsonify({'error': '两次密码不一致'}), 400
# 检查是否已存在
if user_model.get_by_username(username):
return jsonify({'error': '用户名已存在'}), 400
if user_model.get_by_email(email):
return jsonify({'error': '邮箱已注册'}), 400
# 创建用户
user_id = user_model.create(username, email, phone, password)
user = user_model.get_by_id(user_id)
token = generate_token(user_id)
return jsonify({
'success': True,
'token': token,
'user': {
'id': user_id,
'username': username,
'email': email,
'avatar': user['avatar']
}
})
@app.route('/api/login', methods=['POST'])
def api_login():
data = request.json
login_name = data.get('username', '').strip()
password = data.get('password', '')
if not login_name or not password:
return jsonify({'error': '请输入用户名和密码'}), 400
# 查找用户
user = user_model.get_by_username(login_name)
if not user:
user = user_model.get_by_email(login_name)
if not user:
return jsonify({'error': '用户不存在'}), 400
if not user_model.verify_password(user, password):
return jsonify({'error': '密码错误'}), 400
token = generate_token(user['id'])
return jsonify({
'success': True,
'token': token,
'user': {
'id': user['id'],
'username': user['username'],
'email': user['email'],
'avatar': user['avatar'],
'bio': user.get('bio', '')
}
})
@app.route('/api/user')
def api_current_user():
user = get_current_user()
if not user:
return jsonify({'error': '未登录'}), 401
return jsonify({
'id': user['id'],
'username': user['username'],
'email': user['email'],
'phone': user.get('phone', ''),
'avatar': user['avatar'],
'bio': user.get('bio', ''),
'posts_count': user_model.get_posts_count(user['id']),
'replies_count': user_model.get_replies_count(user['id']),
'created_at': user['created_at']
})
@app.route('/api/user/<user_id>')
def api_user_profile(user_id):
user = user_model.get_by_id(user_id)
if not user:
return jsonify({'error': '用户不存在'}), 404
# 获取用户的帖子
posts, total = post_model.get_all()
user_posts = []
for post in posts:
if post['author_id'] == user_id:
user_posts.append({
'id': post['id'],
'title': post['title'],
'type': post['type'],
'likes': len(post['likes']),
'replies': len(reply_model.get_by_post(post['id'])),
'created_at': post['created_at']
})
return jsonify({
'user': {
'id': user_id,
'username': user['username'],
'avatar': user['avatar'],
'bio': user.get('bio', ''),
'posts_count': user_model.get_posts_count(user['id']),
'replies_count': user_model.get_replies_count(user['id']),
'created_at': user['created_at']
},
'posts': user_posts
})
# ============ API: 技术交流帖子 ============
@app.route('/api/posts')
def api_posts():
post_type = request.args.get('type')
tag = request.args.get('tag')
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 20))
posts, total = post_model.get_all(post_type, tag, page, per_page)
post_list = []
for post in posts:
author = user_model.get_by_id(post['author_id']) or {}
post_list.append({
'id': post['id'],
'title': post['title'],
'type': post['type'],
'content_preview': post['content'][:150] + '...' if len(post['content']) > 150 else post['content'],
'author': {
'id': post['author_id'],
'username': author.get('username', '未知'),
'avatar': author.get('avatar', '')
},
'tags': post['tags'],
'likes': len(post['likes']),
'replies': len(reply_model.get_by_post(post['id'])),
'views': post['views'],
'created_at': post['created_at'],
'is_pinned': post['is_pinned']
})
return jsonify({
'posts': post_list,
'total': total,
'page': page,
'per_page': per_page
})
@app.route('/api/posts', methods=['POST'])
def api_create_post():
user = get_current_user()
if not user:
return jsonify({'error': '请先登录'}), 401
data = request.json
title = data.get('title', '').strip()
content = data.get('content', '').strip()
post_type = data.get('type', 'discussion')
tags = data.get('tags', [])
if not title or len(title) < 5:
return jsonify({'error': '标题至少5个字符'}), 400
if not content or len(content) < 10:
return jsonify({'error': '内容至少10个字符'}), 400
post_id = post_model.create(title, content, post_type, user['id'], tags)
return jsonify({
'success': True,
'post_id': post_id
})
@app.route('/api/posts/<post_id>')
def api_post_detail(post_id):
post = post_model.get_by_id(post_id)
if not post:
return jsonify({'error': '帖子不存在'}), 404
# 增加浏览量
post_model.increment_views(post_id)
post['views'] += 1
author = user_model.get_by_id(post['author_id']) or {}
# 获取回复
replies = reply_model.get_by_post(post_id)
reply_list = []
for reply in replies:
reply_author = user_model.get_by_id(reply['author_id']) or {}
reply_list.append({
'id': reply['id'],
'content': reply['content'],
'author': {
'id': reply['author_id'],
'username': reply_author.get('username', '未知'),
'avatar': reply_author.get('avatar', '')
},
'likes': len(reply['likes']),
'created_at': reply['created_at'],
'reply_to': reply.get('reply_to')
})
return jsonify({
'id': post_id,
'title': post['title'],
'content': post['content'],
'type': post['type'],
'author': {
'id': post['author_id'],
'username': author.get('username', '未知'),
'avatar': author.get('avatar', ''),
'bio': author.get('bio', '')
},
'tags': post['tags'],
'likes': len(post['likes']),
'views': post['views'],
'replies': reply_list,
'created_at': post['created_at'],
'updated_at': post.get('updated_at', post['created_at'])
})
@app.route('/api/posts/<post_id>/reply', methods=['POST'])
def api_reply_post(post_id):
user = get_current_user()
if not user:
return jsonify({'error': '请先登录'}), 401
data = request.json
content = data.get('content', '').strip()
reply_to = data.get('reply_to')
if not content:
return jsonify({'error': '回复内容不能为空'}), 400
post = post_model.get_by_id(post_id)
if not post:
return jsonify({'error': '帖子不存在'}), 404
reply_id = reply_model.create(post_id, content, user['id'], reply_to)
return jsonify({
'success': True,
'reply_id': reply_id
})
@app.route('/api/posts/<post_id>/like', methods=['POST'])
def api_like_post(post_id):
user = get_current_user()
if not user:
return jsonify({'error': '请先登录'}), 401
post = post_model.get_by_id(post_id)
if not post:
return jsonify({'error': '帖子不存在'}), 404
liked, likes_count = post_model.add_like(post_id, user['id'])
return jsonify({
'success': True,
'liked': liked,
'likes_count': likes_count
})
# ============ API: 工具分享主题 ============
@app.route('/api/topics')
def api_topics():
topics = topic_model.get_all()
topic_list = []
for topic in topics:
author = user_model.get_by_id(topic['author_id']) or {}
topic_list.append({
'id': topic['id'],
'name': topic['name'],
'description': topic['description'][:100] + '...' if len(topic.get('description', '')) > 100 else topic.get('description', ''),
'icon': topic.get('icon', '🔧'),
'author': {
'id': topic['author_id'],
'username': author.get('username', '未知')
},
'sub_topics_count': len(topic_model.get_sub_topics(topic['id'])),
'questions_count': len(topic_model.get_questions(topic['id'])),
'followers': len(topic['followers']),
'created_at': topic['created_at']
})
topic_list.sort(key=lambda x: x['followers'], reverse=True)
return jsonify(topic_list)
@app.route('/api/topics', methods=['POST'])
def api_create_topic():
user = get_current_user()
if not user:
return jsonify({'error': '请先登录'}), 401
data = request.json
name = data.get('name', '').strip()
description = data.get('description', '').strip()
icon = data.get('icon', '🔧')
if not name:
return jsonify({'error': '主题名称不能为空'}), 400
topic_id = topic_model.create(name, description, icon, user['id'])
return jsonify({
'success': True,
'topic_id': topic_id
})
@app.route('/api/topics/<topic_id>')
def api_topic_detail(topic_id):
topic = topic_model.get_by_id(topic_id)
if not topic:
return jsonify({'error': '主题不存在'}), 404
author = user_model.get_by_id(topic['author_id']) or {}
# 获取子主题
sub_topics = topic_model.get_sub_topics(topic_id)
sub_list = []
for st in sub_topics:
st_author = user_model.get_by_id(st['author_id']) or {}
sub_list.append({
'id': st['id'],
'title': st['title'],
'content': st['content'],
'author': st_author,
'created_at': st['created_at']
})
# 获取问题
questions = topic_model.get_questions(topic_id)
q_list = []
for q in questions:
q_author = user_model.get_by_id(q['author_id']) or {}
answers = []
for a in q['answers']:
a_author = user_model.get_by_id(a['author_id']) or {}
answers.append({
'id': a['id'],
'content': a['content'],
'author': {
'id': a['author_id'],
'username': a_author.get('username', '未知'),
'avatar': a_author.get('avatar', '')
},
'likes': len(a['likes']),
'created_at': a['created_at']
})
q_list.append({
'id': q['id'],
'title': q['title'],
'content': q.get('content', ''),
'author': {
'id': q['author_id'],
'username': q_author.get('username', '未知'),
'avatar': q_author.get('avatar', '')
},
'answers': answers,
'views': q.get('views', 0),
'created_at': q['created_at']
})
return jsonify({
'id': topic_id,
'name': topic['name'],
'description': topic.get('description', ''),
'icon': topic.get('icon', '🔧'),
'author': {
'id': topic['author_id'],
'username': author.get('username', '未知'),
'avatar': author.get('avatar', '')
},
'sub_topics': sub_list,
'questions': q_list,
'followers': len(topic['followers']),
'created_at': topic['created_at']
})
@app.route('/api/topics/<topic_id>/subtopic', methods=['POST'])
def api_add_subtopic(topic_id):
user = get_current_user()
if not user:
return jsonify({'error': '请先登录'}), 401
data = request.json
title = data.get('title', '').strip()
content = data.get('content', '').strip()
if not title:
return jsonify({'error': '标题不能为空'}), 400
topic = topic_model.get_by_id(topic_id)
if not topic:
return jsonify({'error': '主题不存在'}), 404
subtopic_id = topic_model.add_sub_topic(topic_id, title, content, user['id'])
return jsonify({'success': True, 'subtopic_id': subtopic_id})
@app.route('/api/topics/<topic_id>/question', methods=['POST'])
def api_add_question(topic_id):
user = get_current_user()
if not user:
return jsonify({'error': '请先登录'}), 401
data = request.json
title = data.get('title', '').strip()
content = data.get('content', '').strip()
if not title:
return jsonify({'error': '问题标题不能为空'}), 400
topic = topic_model.get_by_id(topic_id)
if not topic:
return jsonify({'error': '主题不存在'}), 404
question_id = topic_model.add_question(topic_id, title, content, user['id'])
return jsonify({'success': True, 'question_id': question_id})
@app.route('/api/topics/<topic_id>/question/<question_id>/answer', methods=['POST'])
def api_answer_question(topic_id, question_id):
user = get_current_user()
if not user:
return jsonify({'error': '请先登录'}), 401
data = request.json
content = data.get('content', '').strip()
if not content:
return jsonify({'error': '回答内容不能为空'}), 400
topic = topic_model.get_by_id(topic_id)
if not topic:
return jsonify({'error': '主题不存在'}), 404
answer_id = topic_model.add_answer(question_id, content, user['id'])
return jsonify({'success': True, 'answer_id': answer_id})
@app.route('/api/topics/<topic_id>/follow', methods=['POST'])
def api_follow_topic(topic_id):
user = get_current_user()
if not user:
return jsonify({'error': '请先登录'}), 401
topic = topic_model.get_by_id(topic_id)
if not topic:
return jsonify({'error': '主题不存在'}), 404
followed, followers_count = topic_model.add_follower(topic_id, user['id'])
return jsonify({
'success': True,
'followed': followed,
'followers_count': followers_count
})
# ============ API: 标签 ============
@app.route('/api/tags')
def api_tags():
tags = post_model.get_tags_stats()
return jsonify([{'name': t[0], 'count': t[1]} for t in tags])
# ============ API: 搜索 ============
@app.route('/api/search')
def api_search():
query = request.args.get('q', '').strip().lower()
if not query:
return jsonify({'posts': [], 'topics': []})
# 搜索帖子
posts, _ = post_model.get_all()
matched_posts = []
for post in posts:
if query in post['title'].lower() or query in post['content'].lower():
author = user_model.get_by_id(post['author_id']) or {}
matched_posts.append({
'id': post['id'],
'title': post['title'],
'type': post['type'],
'author': author.get('username', '未知'),
'created_at': post['created_at']
})
# 搜索主题
topics = topic_model.get_all()
matched_topics = []
for topic in topics:
if query in topic['name'].lower() or query in topic.get('description', '').lower():
matched_topics.append({
'id': topic['id'],
'name': topic['name'],
'icon': topic.get('icon', '🔧')
})
return jsonify({
'posts': matched_posts[:20],
'topics': matched_topics[:20]
})
# ============ API: 健康检查 ============
@app.route('/api/health')
def api_health():
return jsonify({'status': 'ok', 'service': 'tech-forum', 'port': BACKEND_PORT})
if __name__ == '__main__':
print("=" * 50)
print("技术论坛与技术分享网站")
print("=" * 50)
print(f"访问地址: http://localhost:{BACKEND_PORT}")
print("=" * 50)
app.run(host='0.0.0.0', port=BACKEND_PORT, debug=True)