Files
tech-forum/backend/app.py
hubian 7f8fbc9605 初始化技术论坛与技术分享网站
功能模块:
- 技术交流: 发帖、评论回复、点赞收藏、标签分类
- 工具分享: 创建主题、子主题分支、问题追问、关注功能
- 用户系统: 用户名+邮箱(必填)+手机(可选)+密码确认

页面:
- 首页: 帖子列表、热门标签、工具分享主题
- 登录/注册页
- 发帖页
- 帖子详情页
- 主题详情页
- 用户主页

技术栈:
- Flask + Tailwind CSS
- JSON文件存储
- JWT认证
- 响应式设计

端口: 19004
2026-04-08 12:35:09 +08:00

819 lines
24 KiB
Python

"""
技术论坛与技术分享网站 - 后端API
"""
from flask import Flask, request, jsonify, send_file
from flask_cors import CORS
from werkzeug.security import generate_password_hash, check_password_hash
import jwt
import datetime
import json
import uuid
import os
from pathlib import Path
app = Flask(__name__, static_folder='../frontend', static_url_path='')
CORS(app)
# 配置
SECRET_KEY = 'tech-forum-secret-2026'
LLM_BASE_URL = 'http://192.168.2.5:1234/v1'
LLM_API_KEY = 'sk-lm-fuP5tGU8:Hi7YU87jHyDP6Ay8Tl2j'
LLM_MODEL = 'qwen3.5-4b'
# 数据目录
DATA_DIR = Path(__file__).parent.parent / 'data'
USERS_FILE = DATA_DIR / 'users.json'
POSTS_FILE = DATA_DIR / 'posts.json'
TOPICS_FILE = DATA_DIR / 'topics.json'
UPLOAD_DIR = Path(__file__).parent.parent / 'uploads'
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
# 初始化数据文件
def init_data():
if not USERS_FILE.exists():
USERS_FILE.write_text(json.dumps({}, ensure_ascii=False))
if not POSTS_FILE.exists():
POSTS_FILE.write_text(json.dumps({}, ensure_ascii=False))
if not TOPICS_FILE.exists():
TOPICS_FILE.write_text(json.dumps({}, ensure_ascii=False))
init_data()
# 辅助函数
def load_users():
return json.loads(USERS_FILE.read_text(encoding='utf-8'))
def save_users(users):
USERS_FILE.write_text(json.dumps(users, ensure_ascii=False, indent=2), encoding='utf-8')
def load_posts():
return json.loads(POSTS_FILE.read_text(encoding='utf-8'))
def save_posts(posts):
POSTS_FILE.write_text(json.dumps(posts, ensure_ascii=False, indent=2), encoding='utf-8')
def load_topics():
return json.loads(TOPICS_FILE.read_text(encoding='utf-8'))
def save_topics(topics):
TOPICS_FILE.write_text(json.dumps(topics, ensure_ascii=False, indent=2), encoding='utf-8')
def generate_token(user_id):
return jwt.encode({
'user_id': user_id,
'exp': datetime.datetime.utcnow() + datetime.timedelta(days=30)
}, SECRET_KEY, algorithm='HS256')
def verify_token(token):
try:
return jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
except:
return None
def get_current_user():
token = request.headers.get('Authorization', '').replace('Bearer ', '')
if not token:
return None
data = verify_token(token)
if not data:
return None
users = load_users()
return users.get(data['user_id'])
# ============ 页面路由 ============
@app.route('/')
def index():
return send_file('../frontend/index.html')
@app.route('/login')
def login_page():
return send_file('../frontend/login.html')
@app.route('/register')
def register_page():
return send_file('../frontend/register.html')
@app.route('/post/<post_id>')
def post_page(post_id):
return send_file('../frontend/post.html')
@app.route('/topic/<topic_id>')
def topic_page(topic_id):
return send_file('../frontend/topic.html')
@app.route('/user/<user_id>')
def user_page(user_id):
return send_file('../frontend/user.html')
@app.route('/create')
def create_page():
return send_file('../frontend/create.html')
# ============ API: 用户认证 ============
@app.route('/api/register', methods=['POST'])
def api_register():
data = request.json
username = data.get('username', '').strip()
email = data.get('email', '').strip().lower()
phone = data.get('phone', '').strip()
password = data.get('password', '')
confirm_password = data.get('confirm_password', '')
# 验证
if not username or len(username) < 2:
return jsonify({'error': '用户名至少2个字符'}), 400
if not email or '@' not in email:
return jsonify({'error': '请输入有效的邮箱地址'}), 400
if not password or len(password) < 6:
return jsonify({'error': '密码至少6个字符'}), 400
if password != confirm_password:
return jsonify({'error': '两次密码不一致'}), 400
users = load_users()
# 检查是否已存在
for uid, user in users.items():
if user['username'] == username:
return jsonify({'error': '用户名已存在'}), 400
if user['email'] == email:
return jsonify({'error': '邮箱已注册'}), 400
# 创建用户
user_id = str(uuid.uuid4())
users[user_id] = {
'id': user_id,
'username': username,
'email': email,
'phone': phone,
'password': generate_password_hash(password),
'avatar': f'https://api.dicebear.com/7.x/avataaars/svg?seed={username}',
'bio': '',
'created_at': datetime.datetime.now().isoformat(),
'posts': [],
'replies': [],
'likes': []
}
save_users(users)
token = generate_token(user_id)
return jsonify({
'success': True,
'token': token,
'user': {
'id': user_id,
'username': username,
'email': email,
'avatar': users[user_id]['avatar']
}
})
@app.route('/api/login', methods=['POST'])
def api_login():
data = request.json
login_name = data.get('username', '').strip()
password = data.get('password', '')
if not login_name or not password:
return jsonify({'error': '请输入用户名和密码'}), 400
users = load_users()
for user_id, user in users.items():
if user['username'] == login_name or user['email'] == login_name:
if check_password_hash(user['password'], password):
token = generate_token(user_id)
return jsonify({
'success': True,
'token': token,
'user': {
'id': user_id,
'username': user['username'],
'email': user['email'],
'avatar': user['avatar'],
'bio': user.get('bio', '')
}
})
else:
return jsonify({'error': '密码错误'}), 400
return jsonify({'error': '用户不存在'}), 400
@app.route('/api/user')
def api_current_user():
user = get_current_user()
if not user:
return jsonify({'error': '未登录'}), 401
return jsonify({
'id': user['id'],
'username': user['username'],
'email': user['email'],
'phone': user.get('phone', ''),
'avatar': user['avatar'],
'bio': user.get('bio', ''),
'posts_count': len(user.get('posts', [])),
'replies_count': len(user.get('replies', [])),
'created_at': user['created_at']
})
@app.route('/api/user/<user_id>')
def api_user_profile(user_id):
users = load_users()
posts = load_posts()
topics = load_topics()
user = users.get(user_id)
if not user:
return jsonify({'error': '用户不存在'}), 404
# 获取用户的帖子
user_posts = []
for post_id in user.get('posts', []):
if post_id in posts:
post = posts[post_id]
user_posts.append({
'id': post_id,
'title': post['title'],
'type': post['type'],
'likes': len(post.get('likes', [])),
'replies': len(post.get('replies', [])),
'created_at': post['created_at']
})
return jsonify({
'user': {
'id': user_id,
'username': user['username'],
'avatar': user['avatar'],
'bio': user.get('bio', ''),
'posts_count': len(user.get('posts', [])),
'replies_count': len(user.get('replies', [])),
'created_at': user['created_at']
},
'posts': user_posts
})
# ============ API: 技术交流帖子 ============
@app.route('/api/posts')
def api_posts():
posts = load_posts()
users = load_users()
post_type = request.args.get('type') # discussion, share
tag = request.args.get('tag')
page = int(request.args.get('page', 1))
per_page = int(request.args.get('per_page', 20))
post_list = []
for pid, post in posts.items():
if post_type and post['type'] != post_type:
continue
if tag and tag not in post.get('tags', []):
continue
author = users.get(post['author_id'], {})
post_list.append({
'id': pid,
'title': post['title'],
'type': post['type'],
'content_preview': post['content'][:150] + '...' if len(post['content']) > 150 else post['content'],
'author': {
'id': post['author_id'],
'username': author.get('username', '未知'),
'avatar': author.get('avatar', '')
},
'tags': post.get('tags', []),
'likes': len(post.get('likes', [])),
'replies': len(post.get('replies', [])),
'views': post.get('views', 0),
'created_at': post['created_at'],
'is_pinned': post.get('is_pinned', False)
})
# 排序:置顶在前,然后按时间
post_list.sort(key=lambda x: (not x['is_pinned'], x['created_at']), reverse=True)
# 分页
start = (page - 1) * per_page
end = start + per_page
return jsonify({
'posts': post_list[start:end],
'total': len(post_list),
'page': page,
'per_page': per_page
})
@app.route('/api/posts', methods=['POST'])
def api_create_post():
user = get_current_user()
if not user:
return jsonify({'error': '请先登录'}), 401
data = request.json
title = data.get('title', '').strip()
content = data.get('content', '').strip()
post_type = data.get('type', 'discussion') # discussion, share
tags = data.get('tags', [])
if not title or len(title) < 5:
return jsonify({'error': '标题至少5个字符'}), 400
if not content or len(content) < 10:
return jsonify({'error': '内容至少10个字符'}), 400
posts = load_posts()
users = load_users()
post_id = str(uuid.uuid4())
posts[post_id] = {
'id': post_id,
'title': title,
'content': content,
'type': post_type,
'author_id': user['id'],
'tags': tags,
'likes': [],
'replies': [],
'views': 0,
'is_pinned': False,
'created_at': datetime.datetime.now().isoformat(),
'updated_at': datetime.datetime.now().isoformat()
}
save_posts(posts)
# 更新用户的帖子列表
if post_id not in users[user['id']]['posts']:
users[user['id']]['posts'].append(post_id)
save_users(users)
return jsonify({
'success': True,
'post_id': post_id
})
@app.route('/api/posts/<post_id>')
def api_post_detail(post_id):
posts = load_posts()
users = load_users()
post = posts.get(post_id)
if not post:
return jsonify({'error': '帖子不存在'}), 404
# 增加浏览量
post['views'] = post.get('views', 0) + 1
save_posts(posts)
author = users.get(post['author_id'], {})
# 获取回复
replies = []
for reply in post.get('replies', []):
reply_author = users.get(reply['author_id'], {})
replies.append({
'id': reply['id'],
'content': reply['content'],
'author': {
'id': reply['author_id'],
'username': reply_author.get('username', '未知'),
'avatar': reply_author.get('avatar', '')
},
'likes': len(reply.get('likes', [])),
'created_at': reply['created_at'],
'reply_to': reply.get('reply_to')
})
return jsonify({
'id': post_id,
'title': post['title'],
'content': post['content'],
'type': post['type'],
'author': {
'id': post['author_id'],
'username': author.get('username', '未知'),
'avatar': author.get('avatar', ''),
'bio': author.get('bio', '')
},
'tags': post.get('tags', []),
'likes': len(post.get('likes', [])),
'views': post['views'],
'replies': replies,
'created_at': post['created_at'],
'updated_at': post.get('updated_at', post['created_at'])
})
@app.route('/api/posts/<post_id>/reply', methods=['POST'])
def api_reply_post(post_id):
user = get_current_user()
if not user:
return jsonify({'error': '请先登录'}), 401
data = request.json
content = data.get('content', '').strip()
reply_to = data.get('reply_to') # 回复的评论ID
if not content:
return jsonify({'error': '回复内容不能为空'}), 400
posts = load_posts()
users = load_users()
post = posts.get(post_id)
if not post:
return jsonify({'error': '帖子不存在'}), 404
reply_id = str(uuid.uuid4())
reply = {
'id': reply_id,
'content': content,
'author_id': user['id'],
'likes': [],
'reply_to': reply_to,
'created_at': datetime.datetime.now().isoformat()
}
post['replies'].append(reply)
post['updated_at'] = datetime.datetime.now().isoformat()
save_posts(posts)
# 更新用户的回复列表
if post_id not in users[user['id']]['replies']:
users[user['id']]['replies'].append(post_id)
save_users(users)
return jsonify({
'success': True,
'reply_id': reply_id
})
@app.route('/api/posts/<post_id>/like', methods=['POST'])
def api_like_post(post_id):
user = get_current_user()
if not user:
return jsonify({'error': '请先登录'}), 401
posts = load_posts()
post = posts.get(post_id)
if not post:
return jsonify({'error': '帖子不存在'}), 404
if user['id'] in post['likes']:
post['likes'].remove(user['id'])
liked = False
else:
post['likes'].append(user['id'])
liked = True
save_posts(posts)
return jsonify({
'success': True,
'liked': liked,
'likes_count': len(post['likes'])
})
# ============ API: 工具分享主题 ============
@app.route('/api/topics')
def api_topics():
topics = load_topics()
users = load_users()
topic_list = []
for tid, topic in topics.items():
author = users.get(topic['author_id'], {})
topic_list.append({
'id': tid,
'name': topic['name'],
'description': topic['description'][:100] + '...' if len(topic.get('description', '')) > 100 else topic.get('description', ''),
'icon': topic.get('icon', '🔧'),
'author': {
'id': topic['author_id'],
'username': author.get('username', '未知')
},
'sub_topics_count': len(topic.get('sub_topics', [])),
'questions_count': len(topic.get('questions', [])),
'followers': len(topic.get('followers', [])),
'created_at': topic['created_at']
})
topic_list.sort(key=lambda x: x['followers'], reverse=True)
return jsonify(topic_list)
@app.route('/api/topics', methods=['POST'])
def api_create_topic():
user = get_current_user()
if not user:
return jsonify({'error': '请先登录'}), 401
data = request.json
name = data.get('name', '').strip()
description = data.get('description', '').strip()
icon = data.get('icon', '🔧')
if not name:
return jsonify({'error': '主题名称不能为空'}), 400
topics = load_topics()
topic_id = str(uuid.uuid4())
topics[topic_id] = {
'id': topic_id,
'name': name,
'description': description,
'icon': icon,
'author_id': user['id'],
'sub_topics': [],
'questions': [],
'followers': [],
'created_at': datetime.datetime.now().isoformat()
}
save_topics(topics)
return jsonify({
'success': True,
'topic_id': topic_id
})
@app.route('/api/topics/<topic_id>')
def api_topic_detail(topic_id):
topics = load_topics()
users = load_users()
topic = topics.get(topic_id)
if not topic:
return jsonify({'error': '主题不存在'}), 404
author = users.get(topic['author_id'], {})
# 获取子主题
sub_topics = []
for st in topic.get('sub_topics', []):
sub_topics.append({
'id': st['id'],
'title': st['title'],
'content': st['content'],
'author': users.get(st['author_id'], {}),
'created_at': st['created_at']
})
# 获取问题
questions = []
for q in topic.get('questions', []):
q_author = users.get(q['author_id'], {})
answers = []
for a in q.get('answers', []):
a_author = users.get(a['author_id'], {})
answers.append({
'id': a['id'],
'content': a['content'],
'author': {
'id': a['author_id'],
'username': a_author.get('username', '未知'),
'avatar': a_author.get('avatar', '')
},
'likes': len(a.get('likes', [])),
'created_at': a['created_at']
})
questions.append({
'id': q['id'],
'title': q['title'],
'content': q.get('content', ''),
'author': {
'id': q['author_id'],
'username': q_author.get('username', '未知'),
'avatar': q_author.get('avatar', '')
},
'answers': answers,
'views': q.get('views', 0),
'created_at': q['created_at']
})
return jsonify({
'id': topic_id,
'name': topic['name'],
'description': topic.get('description', ''),
'icon': topic.get('icon', '🔧'),
'author': {
'id': topic['author_id'],
'username': author.get('username', '未知'),
'avatar': author.get('avatar', '')
},
'sub_topics': sub_topics,
'questions': questions,
'followers': len(topic.get('followers', [])),
'created_at': topic['created_at']
})
@app.route('/api/topics/<topic_id>/subtopic', methods=['POST'])
def api_add_subtopic(topic_id):
user = get_current_user()
if not user:
return jsonify({'error': '请先登录'}), 401
data = request.json
title = data.get('title', '').strip()
content = data.get('content', '').strip()
if not title:
return jsonify({'error': '标题不能为空'}), 400
topics = load_topics()
topic = topics.get(topic_id)
if not topic:
return jsonify({'error': '主题不存在'}), 404
subtopic = {
'id': str(uuid.uuid4()),
'title': title,
'content': content,
'author_id': user['id'],
'created_at': datetime.datetime.now().isoformat()
}
topic['sub_topics'].append(subtopic)
save_topics(topics)
return jsonify({'success': True, 'subtopic_id': subtopic['id']})
@app.route('/api/topics/<topic_id>/question', methods=['POST'])
def api_add_question(topic_id):
user = get_current_user()
if not user:
return jsonify({'error': '请先登录'}), 401
data = request.json
title = data.get('title', '').strip()
content = data.get('content', '').strip()
if not title:
return jsonify({'error': '问题标题不能为空'}), 400
topics = load_topics()
topic = topics.get(topic_id)
if not topic:
return jsonify({'error': '主题不存在'}), 404
question = {
'id': str(uuid.uuid4()),
'title': title,
'content': content,
'author_id': user['id'],
'answers': [],
'views': 0,
'created_at': datetime.datetime.now().isoformat()
}
topic['questions'].append(question)
save_topics(topics)
return jsonify({'success': True, 'question_id': question['id']})
@app.route('/api/topics/<topic_id>/question/<question_id>/answer', methods=['POST'])
def api_answer_question(topic_id, question_id):
user = get_current_user()
if not user:
return jsonify({'error': '请先登录'}), 401
data = request.json
content = data.get('content', '').strip()
if not content:
return jsonify({'error': '回答内容不能为空'}), 400
topics = load_topics()
topic = topics.get(topic_id)
if not topic:
return jsonify({'error': '主题不存在'}), 404
# 找到问题
question = None
for q in topic['questions']:
if q['id'] == question_id:
question = q
break
if not question:
return jsonify({'error': '问题不存在'}), 404
answer = {
'id': str(uuid.uuid4()),
'content': content,
'author_id': user['id'],
'likes': [],
'created_at': datetime.datetime.now().isoformat()
}
question['answers'].append(answer)
save_topics(topics)
return jsonify({'success': True, 'answer_id': answer['id']})
@app.route('/api/topics/<topic_id>/follow', methods=['POST'])
def api_follow_topic(topic_id):
user = get_current_user()
if not user:
return jsonify({'error': '请先登录'}), 401
topics = load_topics()
topic = topics.get(topic_id)
if not topic:
return jsonify({'error': '主题不存在'}), 404
if user['id'] in topic['followers']:
topic['followers'].remove(user['id'])
followed = False
else:
topic['followers'].append(user['id'])
followed = True
save_topics(topics)
return jsonify({
'success': True,
'followed': followed,
'followers_count': len(topic['followers'])
})
# ============ API: 标签 ============
@app.route('/api/tags')
def api_tags():
posts = load_posts()
tag_counts = {}
for post in posts.values():
for tag in post.get('tags', []):
tag_counts[tag] = tag_counts.get(tag, 0) + 1
tags = sorted(tag_counts.items(), key=lambda x: x[1], reverse=True)
return jsonify([{'name': t[0], 'count': t[1]} for t in tags])
# ============ API: 搜索 ============
@app.route('/api/search')
def api_search():
query = request.args.get('q', '').strip().lower()
if not query:
return jsonify({'posts': [], 'topics': []})
posts = load_posts()
topics = load_topics()
users = load_users()
# 搜索帖子
matched_posts = []
for pid, post in posts.items():
if query in post['title'].lower() or query in post['content'].lower():
author = users.get(post['author_id'], {})
matched_posts.append({
'id': pid,
'title': post['title'],
'type': post['type'],
'author': author.get('username', '未知'),
'created_at': post['created_at']
})
# 搜索主题
matched_topics = []
for tid, topic in topics.items():
if query in topic['name'].lower() or query in topic.get('description', '').lower():
matched_topics.append({
'id': tid,
'name': topic['name'],
'icon': topic.get('icon', '🔧')
})
return jsonify({
'posts': matched_posts[:20],
'topics': matched_topics[:20]
})
if __name__ == '__main__':
print("=" * 50)
print("技术论坛与技术分享网站")
print("=" * 50)
print(f"访问地址: http://localhost:19004")
print("=" * 50)
app.run(host='0.0.0.0', port=19004, debug=True)