Files
tech-forum/backend/app.py
hubian cd087e9931 feat: 后台管理功能增强
1. 前端用户下拉菜单优化 - 改用CSS动画,解决鼠标移动问题
2. 帖子管理增加显示/隐藏开关
3. 帖子详情页可直接删除回复
4. 新增回复管理页面(列表、删除)
5. 用户管理增加编辑功能(用户名、邮箱、手机、简介)
6. 用户管理增加查看帖子按钮
7. 所有页面侧边栏添加回复管理入口
8. 数据库增加 is_hidden 字段
2026-04-12 18:16:36 +08:00

1005 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
技术论坛与技术分享网站 - 后端API (整合版)
主服务 + 后台管理 统一端口 19004
"""
from flask import Flask, request, jsonify, send_file, render_template, redirect, session
from flask_cors import CORS
from functools import wraps
import jwt
import datetime
import os
from pathlib import Path
# 导入配置和模型
import sys
sys.path.insert(0, str(Path(__file__).parent.parent))
from config import SECRET_KEY, ADMIN_USERNAME, ADMIN_PASSWORD, DATABASE_PATH, BACKEND_PORT
from models import Database, UserModel, PostModel, ReplyModel, TopicModel
app = Flask(__name__,
static_folder='../frontend',
static_url_path='',
template_folder='../admin/templates')
CORS(app)
app.secret_key = SECRET_KEY
# 初始化数据库
db = Database(DATABASE_PATH)
user_model = UserModel(db)
post_model = PostModel(db)
reply_model = ReplyModel(db)
topic_model = TopicModel(db)
# ============ 辅助函数 ============
def generate_token(user_id):
return jwt.encode({
'user_id': user_id,
'exp': datetime.datetime.utcnow() + datetime.timedelta(days=30)
}, SECRET_KEY, algorithm='HS256')
def verify_token(token):
try:
return jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
except:
return None
def get_current_user():
token = request.headers.get('Authorization', '').replace('Bearer ', '')
if not token:
return None
data = verify_token(token)
if not data:
return None
return user_model.get_by_id(data['user_id'])
# 后台管理员验证装饰器
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# 检查 session
if not session.get('admin_logged_in'):
# 检查 Authorization header
token = request.headers.get('Authorization', '').replace('Bearer ', '')
if token:
try:
data = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
if data.get('admin'):
return f(*args, **kwargs)
except:
pass
# API请求返回401页面请求跳转登录
if request.path.startswith('/admin/api/'):
return jsonify({'error': '请先登录', 'code': 401}), 401
return redirect('/admin/login')
return f(*args, **kwargs)
return decorated_function
# ============ 前台页面路由 ============
@app.route('/')
def index():
return send_file('../frontend/index.html')
@app.route('/login')
def login_page():
return send_file('../frontend/login.html')
@app.route('/register')
def register_page():
return send_file('../frontend/register.html')
@app.route('/post/<post_id>')
def post_page(post_id):
return send_file('../frontend/post.html')
@app.route('/topic/<topic_id>')
def topic_page(topic_id):
return send_file('../frontend/topic.html')
@app.route('/user/<user_id>')
def user_page(user_id):
return send_file('../frontend/user.html')
@app.route('/create')
def create_page():
return send_file('../frontend/create.html')
# ============ 后台管理页面路由 ============
@app.route('/admin/login')
def admin_login_page():
return render_template('login.html')
@app.route('/admin')
@admin_required
def admin_index():
return render_template('index.html')
@app.route('/admin/users')
@admin_required
def admin_users_page():
return render_template('users.html')
@app.route('/admin/posts')
@admin_required
def admin_posts_page():
return render_template('posts.html')
@app.route('/admin/replies')
@admin_required
def admin_replies_page():
return render_template('replies.html')
@app.route('/admin/topics')
@admin_required
def admin_topics_page():
return render_template('topics.html')
# ============ 后台管理 API ============
@app.route('/admin/api/login', methods=['POST'])
def admin_api_login():
data = request.json
username = data.get('username', '').strip()
password = data.get('password', '')
if not username or not password:
return jsonify({'error': '请输入用户名和密码'}), 400
if username != ADMIN_USERNAME or password != ADMIN_PASSWORD:
return jsonify({'error': '用户名或密码错误'}), 400
# 设置session
session['admin_logged_in'] = True
session['admin_username'] = username
# 生成token
token = jwt.encode({
'admin': True,
'username': username,
'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=24)
}, SECRET_KEY, algorithm='HS256')
return jsonify({
'success': True,
'token': token,
'message': '登录成功'
})
@app.route('/admin/api/logout', methods=['POST'])
def admin_api_logout():
session.pop('admin_logged_in', None)
session.pop('admin_username', None)
return jsonify({'success': True, 'message': '已退出登录'})
@app.route('/admin/api/check-auth')
def admin_api_check_auth():
if session.get('admin_logged_in'):
return jsonify({
'logged_in': True,
'username': session.get('admin_username')
})
return jsonify({'logged_in': False})
@app.route('/admin/api/stats')
@admin_required
def admin_api_stats():
users = user_model.get_all()
posts, posts_total = post_model.get_all()
topics = topic_model.get_all()
# 统计回复数
total_replies = 0
for post in posts:
total_replies += len(reply_model.get_by_post(post['id']))
today = datetime.datetime.now().strftime('%Y-%m-%d')
today_posts = sum(1 for p in posts if p.get('created_at', '').startswith(today))
today_users = sum(1 for u in users if u.get('created_at', '').startswith(today))
# 帖子类型统计
discussion_count = sum(1 for p in posts if p.get('type') == 'discussion')
share_count = sum(1 for p in posts if p.get('type') == 'share')
return jsonify({
'users_count': len(users),
'posts_count': posts_total,
'topics_count': len(topics),
'messages_count': total_replies,
'today_posts': today_posts,
'today_users': today_users,
'discussion_count': discussion_count,
'share_count': share_count,
})
@app.route('/admin/api/users')
@admin_required
def admin_api_users():
users = user_model.get_all()
user_list = []
for user in users:
user_list.append({
'id': user['id'],
'username': user.get('username', ''),
'email': user.get('email', ''),
'phone': user.get('phone', ''),
'posts_count': user_model.get_posts_count(user['id']),
'replies_count': user_model.get_replies_count(user['id']),
'created_at': user.get('created_at', ''),
})
return jsonify(user_list)
@app.route('/admin/api/users/<user_id>', methods=['DELETE'])
@admin_required
def admin_api_delete_user(user_id):
user_model.delete(user_id)
return jsonify({'success': True})
@app.route('/admin/api/posts')
@admin_required
def admin_api_posts():
post_type = request.args.get('type')
posts, total = post_model.get_all(post_type)
post_list = []
for post in posts:
author = user_model.get_by_id(post['author_id']) or {}
post_list.append({
'id': post['id'],
'title': post['title'],
'type': post['type'],
'author': author.get('username', '未知'),
'author_id': post['author_id'],
'likes': len(post['likes']),
'replies': len(reply_model.get_by_post(post['id'])),
'views': post['views'],
'is_pinned': post['is_pinned'],
'created_at': post['created_at'],
})
return jsonify(post_list)
@app.route('/admin/api/posts/<post_id>')
@admin_required
def admin_api_post_detail(post_id):
post = post_model.get_by_id(post_id)
if not post:
return jsonify({'error': '帖子不存在'}), 404
author = user_model.get_by_id(post['author_id']) or {}
# 获取回复
replies = reply_model.get_by_post(post_id)
reply_list = []
for reply in replies:
reply_author = user_model.get_by_id(reply['author_id']) or {}
reply_list.append({
'id': reply['id'],
'content': reply['content'][:100] + '...' if len(reply['content']) > 100 else reply['content'],
'author': reply_author.get('username', '未知'),
'likes': len(reply['likes']),
'created_at': reply['created_at'],
})
return jsonify({
'id': post_id,
'title': post['title'],
'content': post['content'],
'type': post['type'],
'author': author.get('username', '未知'),
'tags': post['tags'],
'likes': len(post['likes']),
'replies': reply_list,
'views': post['views'],
'is_pinned': post['is_pinned'],
'created_at': post['created_at'],
})
@app.route('/admin/api/posts/<post_id>', methods=['DELETE'])
@admin_required
def admin_api_delete_post(post_id):
post_model.delete(post_id)
return jsonify({'success': True})
@app.route('/admin/api/posts/<post_id>/pin', methods=['POST'])
@admin_required
def admin_api_pin_post(post_id):
new_pin = post_model.toggle_pin(post_id)
return jsonify({
'success': True,
'is_pinned': new_pin
})
@app.route('/admin/api/posts/<post_id>/hide', methods=['POST'])
@admin_required
def admin_api_hide_post(post_id):
"""切换帖子显示/隐藏"""
new_hidden = post_model.toggle_hidden(post_id)
return jsonify({
'success': True,
'is_hidden': new_hidden
})
# ============ 回复管理 API ============
@app.route('/admin/api/replies')
@admin_required
def admin_api_replies():
"""获取所有回复列表"""
replies = reply_model.get_all(limit=100)
reply_list = []
for reply in replies:
author = user_model.get_by_id(reply['author_id']) or {}
post = post_model.get_by_id(reply['post_id']) or {}
reply_list.append({
'id': reply['id'],
'content': reply['content'][:100] + '...' if len(reply['content']) > 100 else reply['content'],
'author': author.get('username', '未知'),
'post_title': post.get('title', '未知'),
'post_id': reply['post_id'],
'likes': len(reply['likes']),
'created_at': reply['created_at'],
})
return jsonify(reply_list)
@app.route('/admin/api/replies/<reply_id>', methods=['DELETE'])
@admin_required
def admin_api_delete_reply(reply_id):
"""删除回复"""
reply_model.delete(reply_id)
return jsonify({'success': True})
# ============ 用户管理 API ============
@app.route('/admin/api/users/<user_id>', methods=['PUT'])
@admin_required
def admin_api_update_user(user_id):
"""编辑用户信息"""
data = request.json
update_fields = {}
if data.get('username'):
update_fields['username'] = data['username'].strip()
if data.get('email'):
update_fields['email'] = data['email'].strip().lower()
if data.get('phone'):
update_fields['phone'] = data['phone'].strip()
if data.get('bio'):
update_fields['bio'] = data['bio'].strip()
if not update_fields:
return jsonify({'error': '没有要更新的字段'}), 400
# 更新用户
user = user_model.get_by_id(user_id)
if not user:
return jsonify({'error': '用户不存在'}), 404
# 检查用户名/邮箱是否重复
if 'username' in update_fields:
existing = user_model.get_by_username(update_fields['username'])
if existing and existing['id'] != user_id:
return jsonify({'error': '用户名已存在'}), 400
if 'email' in update_fields:
existing = user_model.get_by_email(update_fields['email'])
if existing and existing['id'] != user_id:
return jsonify({'error': '邮箱已存在'}), 400
with db.get_conn() as conn:
set_clause = ', '.join([f"{k} = ?" for k in update_fields.keys()])
conn.execute(f"UPDATE users SET {set_clause}, updated_at = ? WHERE id = ?",
list(update_fields.values()) + [datetime.datetime.now().isoformat(), user_id])
conn.commit()
return jsonify({'success': True})
@app.route('/admin/api/topics')
@admin_required
def admin_api_topics():
topics = topic_model.get_all()
topic_list = []
for topic in topics:
author = user_model.get_by_id(topic['author_id']) or {}
topic_list.append({
'id': topic['id'],
'name': topic['name'],
'icon': topic.get('icon', '🔧'),
'author': author.get('username', '未知'),
'sub_topics_count': len(topic_model.get_sub_topics(topic['id'])),
'questions_count': len(topic_model.get_questions(topic['id'])),
'followers_count': len(topic['followers']),
'created_at': topic['created_at'],
})
return jsonify(topic_list)
@app.route('/admin/api/topics/<topic_id>')
@admin_required
def admin_api_topic_detail(topic_id):
topic = topic_model.get_by_id(topic_id)
if not topic:
return jsonify({'error': '主题不存在'}), 404
author = user_model.get_by_id(topic['author_id']) or {}
return jsonify({
'id': topic_id,
'name': topic['name'],
'description': topic.get('description', ''),
'icon': topic.get('icon', '🔧'),
'author': author.get('username', '未知'),
'sub_topics': topic_model.get_sub_topics(topic_id),
'questions': topic_model.get_questions(topic_id),
'followers_count': len(topic['followers']),
'created_at': topic['created_at'],
})
@app.route('/admin/api/topics/<topic_id>', methods=['DELETE'])
@admin_required
def admin_api_delete_topic(topic_id):
topic_model.delete(topic_id)
return jsonify({'success': True})
@app.route('/admin/api/tags')
@admin_required
def admin_api_tags():
tags = post_model.get_tags_stats()
return jsonify([{'name': t[0], 'count': t[1]} for t in tags[:20]])
# ============ 前台用户 API ============
@app.route('/api/register', methods=['POST'])
def api_register():
data = request.json
username = data.get('username', '').strip()
email = data.get('email', '').strip().lower()
phone = data.get('phone', '').strip()
password = data.get('password', '')
confirm_password = data.get('confirm_password', '')
# 验证
if not username or len(username) < 2:
return jsonify({'error': '用户名至少2个字符'}), 400
if not email or '@' not in email:
return jsonify({'error': '请输入有效的邮箱地址'}), 400
if not password or len(password) < 6:
return jsonify({'error': '密码至少6个字符'}), 400
if password != confirm_password:
return jsonify({'error': '两次密码不一致'}), 400
# 检查是否已存在
if user_model.get_by_username(username):
return jsonify({'error': '用户名已存在'}), 400
if user_model.get_by_email(email):
return jsonify({'error': '邮箱已注册'}), 400
# 创建用户
user_id = user_model.create(username, email, phone, password)
user = user_model.get_by_id(user_id)
token = generate_token(user_id)
return jsonify({
'success': True,
'token': token,
'user': {
'id': user_id,
'username': username,
'email': email,
'avatar': user['avatar']
}
})
@app.route('/api/login', methods=['POST'])
def api_login():
data = request.json
login_name = data.get('username', '').strip()
password = data.get('password', '')
if not login_name or not password:
return jsonify({'error': '请输入用户名和密码'}), 400
# 查找用户
user = user_model.get_by_username(login_name)
if not user:
user = user_model.get_by_email(login_name)
if not user:
return jsonify({'error': '用户不存在'}), 400
if not user_model.verify_password(user, password):
return jsonify({'error': '密码错误'}), 400
token = generate_token(user['id'])
return jsonify({
'success': True,
'token': token,
'user': {
'id': user['id'],
'username': user['username'],
'email': user['email'],
'avatar': user['avatar'],
'bio': user.get('bio', '')
}
})
@app.route('/api/user')
def api_current_user():
user = get_current_user()
if not user:
return jsonify({'error': '未登录'}), 401
return jsonify({
'id': user['id'],
'username': user['username'],
'email': user['email'],
'phone': user.get('phone', ''),
'avatar': user['avatar'],
'bio': user.get('bio', ''),
'posts_count': user_model.get_posts_count(user['id']),
'replies_count': user_model.get_replies_count(user['id']),
'created_at': user['created_at']
})
@app.route('/api/user/<user_id>')
def api_user_profile(user_id):
user = user_model.get_by_id(user_id)
if not user:
return jsonify({'error': '用户不存在'}), 404
# 获取用户的帖子
posts, total = post_model.get_all()
user_posts = []
for post in posts:
if post['author_id'] == user_id:
user_posts.append({
'id': post['id'],
'title': post['title'],
'type': post['type'],
'likes': len(post['likes']),
'replies': len(reply_model.get_by_post(post['id'])),
'created_at': post['created_at']
})
return jsonify({
'user': {
'id': user_id,
'username': user['username'],
'avatar': user['avatar'],
'bio': user.get('bio', ''),
'posts_count': user_model.get_posts_count(user['id']),
'replies_count': user_model.get_replies_count(user['id']),
'created_at': user['created_at']
},
'posts': user_posts
})
# ============ API: 技术交流帖子 ============
@app.route('/api/posts')
def api_posts():
post_type = request.args.get('type')
tag = request.args.get('tag')
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 20))
posts, total = post_model.get_all(post_type, tag, page, per_page)
post_list = []
for post in posts:
author = user_model.get_by_id(post['author_id']) or {}
post_list.append({
'id': post['id'],
'title': post['title'],
'type': post['type'],
'content_preview': post['content'][:150] + '...' if len(post['content']) > 150 else post['content'],
'author': {
'id': post['author_id'],
'username': author.get('username', '未知'),
'avatar': author.get('avatar', '')
},
'tags': post['tags'],
'likes': len(post['likes']),
'replies': len(reply_model.get_by_post(post['id'])),
'views': post['views'],
'created_at': post['created_at'],
'is_pinned': post['is_pinned']
})
return jsonify({
'posts': post_list,
'total': total,
'page': page,
'per_page': per_page
})
@app.route('/api/posts', methods=['POST'])
def api_create_post():
user = get_current_user()
if not user:
return jsonify({'error': '请先登录'}), 401
data = request.json
title = data.get('title', '').strip()
content = data.get('content', '').strip()
post_type = data.get('type', 'discussion')
tags = data.get('tags', [])
if not title or len(title) < 5:
return jsonify({'error': '标题至少5个字符'}), 400
if not content or len(content) < 10:
return jsonify({'error': '内容至少10个字符'}), 400
post_id = post_model.create(title, content, post_type, user['id'], tags)
return jsonify({
'success': True,
'post_id': post_id
})
@app.route('/api/posts/<post_id>')
def api_post_detail(post_id):
post = post_model.get_by_id(post_id)
if not post:
return jsonify({'error': '帖子不存在'}), 404
# 增加浏览量
post_model.increment_views(post_id)
post['views'] += 1
author = user_model.get_by_id(post['author_id']) or {}
# 获取回复
replies = reply_model.get_by_post(post_id)
reply_list = []
for reply in replies:
reply_author = user_model.get_by_id(reply['author_id']) or {}
reply_list.append({
'id': reply['id'],
'content': reply['content'],
'author': {
'id': reply['author_id'],
'username': reply_author.get('username', '未知'),
'avatar': reply_author.get('avatar', '')
},
'likes': len(reply['likes']),
'created_at': reply['created_at'],
'reply_to': reply.get('reply_to')
})
return jsonify({
'id': post_id,
'title': post['title'],
'content': post['content'],
'type': post['type'],
'author': {
'id': post['author_id'],
'username': author.get('username', '未知'),
'avatar': author.get('avatar', ''),
'bio': author.get('bio', '')
},
'tags': post['tags'],
'likes': len(post['likes']),
'views': post['views'],
'replies': reply_list,
'created_at': post['created_at'],
'updated_at': post.get('updated_at', post['created_at'])
})
@app.route('/api/posts/<post_id>/reply', methods=['POST'])
def api_reply_post(post_id):
user = get_current_user()
if not user:
return jsonify({'error': '请先登录'}), 401
data = request.json
content = data.get('content', '').strip()
reply_to = data.get('reply_to')
if not content:
return jsonify({'error': '回复内容不能为空'}), 400
post = post_model.get_by_id(post_id)
if not post:
return jsonify({'error': '帖子不存在'}), 404
reply_id = reply_model.create(post_id, content, user['id'], reply_to)
return jsonify({
'success': True,
'reply_id': reply_id
})
@app.route('/api/posts/<post_id>/like', methods=['POST'])
def api_like_post(post_id):
user = get_current_user()
if not user:
return jsonify({'error': '请先登录'}), 401
post = post_model.get_by_id(post_id)
if not post:
return jsonify({'error': '帖子不存在'}), 404
liked, likes_count = post_model.add_like(post_id, user['id'])
return jsonify({
'success': True,
'liked': liked,
'likes_count': likes_count
})
# ============ API: 工具分享主题 ============
@app.route('/api/topics')
def api_topics():
topics = topic_model.get_all()
topic_list = []
for topic in topics:
author = user_model.get_by_id(topic['author_id']) or {}
topic_list.append({
'id': topic['id'],
'name': topic['name'],
'description': topic['description'][:100] + '...' if len(topic.get('description', '')) > 100 else topic.get('description', ''),
'icon': topic.get('icon', '🔧'),
'author': {
'id': topic['author_id'],
'username': author.get('username', '未知')
},
'sub_topics_count': len(topic_model.get_sub_topics(topic['id'])),
'questions_count': len(topic_model.get_questions(topic['id'])),
'followers': len(topic['followers']),
'created_at': topic['created_at']
})
topic_list.sort(key=lambda x: x['followers'], reverse=True)
return jsonify(topic_list)
@app.route('/api/topics', methods=['POST'])
def api_create_topic():
user = get_current_user()
if not user:
return jsonify({'error': '请先登录'}), 401
data = request.json
name = data.get('name', '').strip()
description = data.get('description', '').strip()
icon = data.get('icon', '🔧')
if not name:
return jsonify({'error': '主题名称不能为空'}), 400
topic_id = topic_model.create(name, description, icon, user['id'])
return jsonify({
'success': True,
'topic_id': topic_id
})
@app.route('/api/topics/<topic_id>')
def api_topic_detail(topic_id):
topic = topic_model.get_by_id(topic_id)
if not topic:
return jsonify({'error': '主题不存在'}), 404
author = user_model.get_by_id(topic['author_id']) or {}
# 获取子主题
sub_topics = topic_model.get_sub_topics(topic_id)
sub_list = []
for st in sub_topics:
st_author = user_model.get_by_id(st['author_id']) or {}
sub_list.append({
'id': st['id'],
'title': st['title'],
'content': st['content'],
'author': st_author,
'created_at': st['created_at']
})
# 获取问题
questions = topic_model.get_questions(topic_id)
q_list = []
for q in questions:
q_author = user_model.get_by_id(q['author_id']) or {}
answers = []
for a in q['answers']:
a_author = user_model.get_by_id(a['author_id']) or {}
answers.append({
'id': a['id'],
'content': a['content'],
'author': {
'id': a['author_id'],
'username': a_author.get('username', '未知'),
'avatar': a_author.get('avatar', '')
},
'likes': len(a['likes']),
'created_at': a['created_at']
})
q_list.append({
'id': q['id'],
'title': q['title'],
'content': q.get('content', ''),
'author': {
'id': q['author_id'],
'username': q_author.get('username', '未知'),
'avatar': q_author.get('avatar', '')
},
'answers': answers,
'views': q.get('views', 0),
'created_at': q['created_at']
})
return jsonify({
'id': topic_id,
'name': topic['name'],
'description': topic.get('description', ''),
'icon': topic.get('icon', '🔧'),
'author': {
'id': topic['author_id'],
'username': author.get('username', '未知'),
'avatar': author.get('avatar', '')
},
'sub_topics': sub_list,
'questions': q_list,
'followers': len(topic['followers']),
'created_at': topic['created_at']
})
@app.route('/api/topics/<topic_id>/subtopic', methods=['POST'])
def api_add_subtopic(topic_id):
user = get_current_user()
if not user:
return jsonify({'error': '请先登录'}), 401
data = request.json
title = data.get('title', '').strip()
content = data.get('content', '').strip()
if not title:
return jsonify({'error': '标题不能为空'}), 400
topic = topic_model.get_by_id(topic_id)
if not topic:
return jsonify({'error': '主题不存在'}), 404
subtopic_id = topic_model.add_sub_topic(topic_id, title, content, user['id'])
return jsonify({'success': True, 'subtopic_id': subtopic_id})
@app.route('/api/topics/<topic_id>/question', methods=['POST'])
def api_add_question(topic_id):
user = get_current_user()
if not user:
return jsonify({'error': '请先登录'}), 401
data = request.json
title = data.get('title', '').strip()
content = data.get('content', '').strip()
if not title:
return jsonify({'error': '问题标题不能为空'}), 400
topic = topic_model.get_by_id(topic_id)
if not topic:
return jsonify({'error': '主题不存在'}), 404
question_id = topic_model.add_question(topic_id, title, content, user['id'])
return jsonify({'success': True, 'question_id': question_id})
@app.route('/api/topics/<topic_id>/question/<question_id>/answer', methods=['POST'])
def api_answer_question(topic_id, question_id):
user = get_current_user()
if not user:
return jsonify({'error': '请先登录'}), 401
data = request.json
content = data.get('content', '').strip()
if not content:
return jsonify({'error': '回答内容不能为空'}), 400
topic = topic_model.get_by_id(topic_id)
if not topic:
return jsonify({'error': '主题不存在'}), 404
answer_id = topic_model.add_answer(question_id, content, user['id'])
return jsonify({'success': True, 'answer_id': answer_id})
@app.route('/api/topics/<topic_id>/follow', methods=['POST'])
def api_follow_topic(topic_id):
user = get_current_user()
if not user:
return jsonify({'error': '请先登录'}), 401
topic = topic_model.get_by_id(topic_id)
if not topic:
return jsonify({'error': '主题不存在'}), 404
followed, followers_count = topic_model.add_follower(topic_id, user['id'])
return jsonify({
'success': True,
'followed': followed,
'followers_count': followers_count
})
# ============ API: 标签和搜索 ============
@app.route('/api/tags')
def api_tags():
tags = post_model.get_tags_stats()
return jsonify([{'name': t[0], 'count': t[1]} for t in tags])
@app.route('/api/search')
def api_search():
query = request.args.get('q', '').strip().lower()
if not query:
return jsonify({'posts': [], 'topics': []})
# 搜索帖子
posts, _ = post_model.get_all()
matched_posts = []
for post in posts:
if query in post['title'].lower() or query in post['content'].lower():
author = user_model.get_by_id(post['author_id']) or {}
matched_posts.append({
'id': post['id'],
'title': post['title'],
'type': post['type'],
'author': author.get('username', '未知'),
'created_at': post['created_at']
})
# 搜索主题
topics = topic_model.get_all()
matched_topics = []
for topic in topics:
if query in topic['name'].lower() or query in topic.get('description', '').lower():
matched_topics.append({
'id': topic['id'],
'name': topic['name'],
'icon': topic.get('icon', '🔧')
})
return jsonify({
'posts': matched_posts[:20],
'topics': matched_topics[:20]
})
# ============ 健康检查 ============
@app.route('/api/health')
def api_health():
return jsonify({'status': 'ok', 'service': 'tech-forum', 'port': BACKEND_PORT})
if __name__ == '__main__':
print("=" * 50)
print("技术论坛与技术分享网站")
print("=" * 50)
print(f"前台地址: http://localhost:{BACKEND_PORT}")
print(f"后台地址: http://localhost:{BACKEND_PORT}/admin")
print(f"后台账号: {ADMIN_USERNAME} / {ADMIN_PASSWORD}")
print("=" * 50)
app.run(host='0.0.0.0', port=BACKEND_PORT, debug=True)