Files
llm-index-rag/app.py
coder cdaadef10c V1.0.0: 基于索引的知识检索系统
核心功能:
- 文档索引:使用LLM分析提取关键词/摘要/主题/实体
- 查询处理:LLM分析查询意图并扩展关键词
- BM25检索:基于倒排索引的相关性排序
- RAG问答:检索增强生成

技术栈:
- Flask + SQLAlchemy
- OpenAI API兼容LLM
- BM25算法

特点: 不依赖向量模型和向量库
2026-04-07 23:48:06 +08:00

321 lines
8.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
LLM Index RAG 主应用
"""
import os
import json
from datetime import datetime
from flask import Flask, request, jsonify, render_template, send_file
from flask_sqlalchemy import SQLAlchemy
from werkzeug.utils import secure_filename
from config import *
from models import db, Document, DocumentChunk, InvertedIndex, QueryLog, IndexStats
from services import DocumentIndexer, SearchEngine, RAGGenerator
# ==================== 创建应用 ====================
app = Flask(__name__)
app.config['SECRET_KEY'] = SECRET_KEY
app.config['SQLALCHEMY_DATABASE_URI'] = DATABASE_URL
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024 # 50MB
# 初始化数据库
db.init_app(app)
# 初始化服务
indexer = DocumentIndexer()
search_engine = SearchEngine()
rag_generator = RAGGenerator()
# ==================== 初始化函数 ====================
def init_app():
"""初始化应用"""
# 创建目录
for dir_name in [DOCUMENT_DIR, INDEX_DIR, LOG_DIR]:
if not os.path.exists(dir_name):
os.makedirs(dir_name)
# 创建数据库表
with app.app_context():
db.create_all()
# 初始化统计
IndexStats.get_stats()
# ==================== 页面路由 ====================
@app.route('/')
def index():
"""首页"""
stats = IndexStats.get_stats()
return render_template('index.html', stats=stats)
@app.route('/documents')
def documents():
"""文档列表页"""
page = request.args.get('page', 1, type=int)
docs = Document.query.order_by(Document.created_at.desc()).paginate(page=page, per_page=20)
return render_template('documents.html', docs=docs)
@app.route('/search')
def search_page():
"""搜索页"""
return render_template('search.html')
# ==================== API路由 ====================
# === 文档管理 ===
@app.route('/api/documents', methods=['GET'])
def api_list_documents():
"""获取文档列表"""
page = request.args.get('page', 1, type=int)
status = request.args.get('status', '')
query = Document.query
if status:
query = query.filter_by(status=status)
docs = query.order_by(Document.created_at.desc()).paginate(page=page, per_page=20)
return jsonify({
'documents': [d.to_dict() for d in docs.items],
'total': docs.total,
'pages': docs.pages,
'current_page': docs.page
})
@app.route('/api/documents', methods=['POST'])
def api_upload_document():
"""上传文档"""
if 'file' not in request.files:
return jsonify({'error': '未上传文件'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': '未选择文件'}), 400
# 检查文件类型
ext = os.path.splitext(file.filename)[1].lower()
if ext not in SUPPORTED_FORMATS:
return jsonify({'error': f'不支持的文件格式: {ext}'}), 400
# 保存文件
filename = secure_filename(file.filename)
filepath = os.path.join(DOCUMENT_DIR, filename)
# 避免重名
if os.path.exists(filepath):
base, ext = os.path.splitext(filename)
filename = f"{base}_{datetime.now().strftime('%Y%m%d%H%M%S')}{ext}"
filepath = os.path.join(DOCUMENT_DIR, filename)
file.save(filepath)
# 创建文档记录
doc = Document(
filename=filename,
filepath=filepath,
file_type=ext,
file_size=os.path.getsize(filepath),
title=request.form.get('title', filename),
source=request.form.get('source', ''),
)
db.session.add(doc)
db.session.commit()
return jsonify({
'success': True,
'document': doc.to_dict()
})
@app.route('/api/documents/<int:doc_id>', methods=['GET'])
def api_get_document(doc_id):
"""获取文档详情"""
doc = Document.query.get_or_404(doc_id)
chunks = DocumentChunk.query.filter_by(document_id=doc_id).all()
return jsonify({
'document': doc.to_dict(),
'chunks': [c.to_dict() for c in chunks]
})
@app.route('/api/documents/<int:doc_id>', methods=['DELETE'])
def api_delete_document(doc_id):
"""删除文档"""
doc = Document.query.get_or_404(doc_id)
# 删除文件
if os.path.exists(doc.filepath):
os.remove(doc.filepath)
# 删除数据库记录级联删除chunks
db.session.delete(doc)
db.session.commit()
# 更新统计
IndexStats.get_stats().update_stats()
return jsonify({'success': True})
# === 索引管理 ===
@app.route('/api/index/<int:doc_id>', methods=['POST'])
def api_index_document(doc_id):
"""索引单个文档"""
success = indexer.index_document(doc_id)
if success:
return jsonify({'success': True, 'message': '索引完成'})
else:
return jsonify({'error': '索引失败'}), 500
@app.route('/api/index/batch', methods=['POST'])
def api_batch_index():
"""批量索引所有待索引文档"""
pending_docs = Document.query.filter_by(status='pending').all()
results = {'success': 0, 'failed': 0, 'total': len(pending_docs)}
for doc in pending_docs:
if indexer.index_document(doc.id):
results['success'] += 1
else:
results['failed'] += 1
return jsonify(results)
@app.route('/api/index/rebuild', methods=['POST'])
def api_rebuild_index():
"""重建所有索引"""
# 清除旧索引
InvertedIndex.query.delete()
DocumentChunk.query.delete()
docs = Document.query.all()
for doc in docs:
doc.status = 'pending'
doc.indexed_at = None
db.session.commit()
# 批量索引
return api_batch_index()
@app.route('/api/stats', methods=['GET'])
def api_get_stats():
"""获取索引统计"""
stats = IndexStats.get_stats()
return jsonify({
'total_documents': stats.total_documents,
'total_chunks': stats.total_chunks,
'total_terms': stats.total_terms,
'total_words': stats.total_words,
'last_indexed_at': stats.last_indexed_at.isoformat() if stats.last_indexed_at else None
})
# === 搜索 ===
@app.route('/api/search', methods=['POST'])
def api_search():
"""搜索文档"""
data = request.json
query = data.get('query', '')
top_k = data.get('top_k', 10)
if not query:
return jsonify({'error': '查询不能为空'}), 400
results = search_engine.search(query, top_k)
return jsonify({
'query': query,
'results': results,
'total': len(results)
})
@app.route('/api/search/suggestions', methods=['GET'])
def api_search_suggestions():
"""获取搜索建议(自动补全)"""
prefix = request.args.get('prefix', '')
if len(prefix) < 2:
return jsonify({'suggestions': []})
# 从倒排索引中查找匹配的词
terms = InvertedIndex.query.filter(
InvertedIndex.term.ilike(f'{prefix}%')
).order_by(InvertedIndex.total_freq.desc()).limit(10).all()
return jsonify({
'suggestions': [t.term for t in terms]
})
# === RAG ===
@app.route('/api/rag/answer', methods=['POST'])
def api_rag_answer():
"""RAG问答"""
data = request.json
query = data.get('query', '')
top_k = data.get('top_k', 5)
if not query:
return jsonify({'error': '查询不能为空'}), 400
result = rag_generator.answer(query, top_k)
return jsonify(result)
# === 查询日志 ===
@app.route('/api/logs', methods=['GET'])
def api_get_logs():
"""获取查询日志"""
page = request.args.get('page', 1, type=int)
logs = QueryLog.query.order_by(QueryLog.created_at.desc()).paginate(page=page, per_page=50)
return jsonify({
'logs': [l.to_dict() for l in logs.items],
'total': logs.total,
'pages': logs.pages
})
@app.route('/api/logs/<int:log_id>/feedback', methods=['POST'])
def api_log_feedback(log_id):
"""提交查询反馈"""
log = QueryLog.query.get_or_404(log_id)
data = request.json
log.rating = data.get('rating')
log.feedback = data.get('feedback')
db.session.commit()
return jsonify({'success': True})
# ==================== 启动 ====================
if __name__ == '__main__':
init_app()
app.run(host=API_HOST, port=API_PORT, debug=True)