commit cdaadef10ce20f61673af7808acd75c0645470cf Author: coder Date: Tue Apr 7 23:48:06 2026 +0800 V1.0.0: 基于索引的知识检索系统 核心功能: - 文档索引:使用LLM分析提取关键词/摘要/主题/实体 - 查询处理:LLM分析查询意图并扩展关键词 - BM25检索:基于倒排索引的相关性排序 - RAG问答:检索增强生成 技术栈: - Flask + SQLAlchemy - OpenAI API兼容LLM - BM25算法 特点: 不依赖向量模型和向量库 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0b298dd --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +instance/ +*.db +__pycache__/ +*.pyc +.env \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..79d88f4 --- /dev/null +++ b/README.md @@ -0,0 +1,179 @@ +# LLM Index RAG + +基于索引和搜索的知识检索系统(不使用向量模型和向量库) + +## 特点 + +- **不依赖向量模型** - 使用传统信息检索技术 +- **LLM增强索引** - 使用大语言模型提取关键词、摘要、实体 +- **BM25排序** - 经典相关性算法 +- **智能查询处理** - LLM分析查询意图并扩展关键词 + +## 工作原理 + +### 1. 文档索引阶段 + +``` +文档 → LLM分析 → 提取关键词/摘要/主题/实体 → 构建倒排索引 +``` + +- 对每个文档使用LLM进行分析 +- 提取:关键词、摘要、主题分类、命名实体 +- 分块处理,计算词频 +- 构建倒排索引(term → [doc_ids]) + +### 2. 查询处理阶段 + +``` +Query → LLM分析 → 提取意图/关键词 → 查询扩展 → 检索 +``` + +- LLM分析查询意图(查找/比较/解释等) +- 提取主要关键词 +- 生成同义词/相关词扩展 +- 使用BM25计算相关性得分 + +### 3. 检索与生成 + +- **文档检索模式**:返回相关文档列表 +- **智能问答模式**:RAG生成回答 + +## 快速开始 + +```bash +# 安装依赖 +pip install -r requirements.txt + +# 启动服务 +python app.py +``` + +访问 http://localhost:19001 + +## API接口 + +### 文档管理 + +| 接口 | 方法 | 说明 | +|------|------|------| +| `/api/documents` | GET | 获取文档列表 | +| `/api/documents` | POST | 上传文档 | +| `/api/documents/` | GET | 获取文档详情 | +| `/api/documents/` | DELETE | 删除文档 | + +### 索引管理 + +| 接口 | 方法 | 说明 | +|------|------|------| +| `/api/index/` | POST | 索引单个文档 | +| `/api/index/batch` | POST | 批量索引 | +| `/api/index/rebuild` | POST | 重建索引 | +| `/api/stats` | GET | 获取统计信息 | + +### 搜索 + +| 接口 | 方法 | 说明 | +|------|------|------| +| `/api/search` | POST | 文档检索 | +| `/api/rag/answer` | POST | 智能问答 | + +## 使用示例 + +### 上传并索引文档 + +```python +import requests + +# 上传文档 +files = {'file': open('document.pdf', 'rb')} +r = requests.post('http://localhost:19001/api/documents', files=files) +doc_id = r.json()['document']['id'] + +# 索引文档 +requests.post(f'http://localhost:19001/api/index/{doc_id}') +``` + +### 搜索 + +```python +import requests + +# 文档检索 +r = requests.post('http://localhost:19001/api/search', json={ + 'query': '机器学习的基本原理', + 'top_k': 10 +}) +results = r.json()['results'] + +# 智能问答 +r = requests.post('http://localhost:19001/api/rag/answer', json={ + 'query': '什么是深度学习?' +}) +answer = r.json()['answer'] +``` + +## 配置说明 + +修改 `config.py`: + +```python +# LLM配置 +LLM_CONFIG = { + "api_base": "http://192.168.2.5:1234/v1", + "model": "qwen/qwen3.5-35b-a3b", +} + +# 索引配置 +INDEX_CONFIG = { + "bm25_k1": 1.5, # BM25参数 + "bm25_b": 0.75, + "max_results": 20, +} + +# 文档处理 +DOC_CONFIG = { + "chunk_size": 2000, # 分块大小 + "max_keywords": 20, # 最大关键词数 +} +``` + +## 目录结构 + +``` +llm-index-rag/ +├── app.py # Flask应用 +├── config.py # 配置文件 +├── models.py # 数据模型 +├── services.py # 核心服务 +├── requirements.txt # 依赖 +├── documents/ # 文档存储 +├── indexes/ # 索引文件 +├── templates/ # HTML模板 +│ ├── index.html +│ ├── documents.html +│ └── search.html +└── static/ # 静态资源 +``` + +## 对比向量RAG + +| 维度 | 向量RAG | 索引RAG(本系统) | +|------|---------|------------------| +| 依赖 | 向量模型、向量库 | 仅LLM | +| 精确匹配 | 较弱 | 强 | +| 语义理解 | 强 | 中等 | +| 可解释性 | 弱 | 强 | +| 部署复杂度 | 高 | 低 | +| 资源消耗 | 高 | 中等 | + +## 扩展建议 + +1. **中文分词** - 集成jieba等专业分词 +2. **同义词库** - 添加领域同义词扩展 +3. **混合检索** - 结合向量检索 +4. **增量索引** - 支持实时更新 +5. **分布式** - 支持大规模文档 + +## License + +MIT \ No newline at end of file diff --git a/app.py b/app.py new file mode 100644 index 0000000..a94eb09 --- /dev/null +++ b/app.py @@ -0,0 +1,321 @@ +""" +LLM Index RAG 主应用 +""" + +import os +import json +from datetime import datetime +from flask import Flask, request, jsonify, render_template, send_file +from flask_sqlalchemy import SQLAlchemy +from werkzeug.utils import secure_filename + +from config import * +from models import db, Document, DocumentChunk, InvertedIndex, QueryLog, IndexStats +from services import DocumentIndexer, SearchEngine, RAGGenerator + +# ==================== 创建应用 ==================== +app = Flask(__name__) +app.config['SECRET_KEY'] = SECRET_KEY +app.config['SQLALCHEMY_DATABASE_URI'] = DATABASE_URL +app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False +app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024 # 50MB + +# 初始化数据库 +db.init_app(app) + +# 初始化服务 +indexer = DocumentIndexer() +search_engine = SearchEngine() +rag_generator = RAGGenerator() + + +# ==================== 初始化函数 ==================== +def init_app(): + """初始化应用""" + # 创建目录 + for dir_name in [DOCUMENT_DIR, INDEX_DIR, LOG_DIR]: + if not os.path.exists(dir_name): + os.makedirs(dir_name) + + # 创建数据库表 + with app.app_context(): + db.create_all() + # 初始化统计 + IndexStats.get_stats() + + +# ==================== 页面路由 ==================== +@app.route('/') +def index(): + """首页""" + stats = IndexStats.get_stats() + return render_template('index.html', stats=stats) + + +@app.route('/documents') +def documents(): + """文档列表页""" + page = request.args.get('page', 1, type=int) + docs = Document.query.order_by(Document.created_at.desc()).paginate(page=page, per_page=20) + return render_template('documents.html', docs=docs) + + +@app.route('/search') +def search_page(): + """搜索页""" + return render_template('search.html') + + +# ==================== API路由 ==================== + +# === 文档管理 === + +@app.route('/api/documents', methods=['GET']) +def api_list_documents(): + """获取文档列表""" + page = request.args.get('page', 1, type=int) + status = request.args.get('status', '') + + query = Document.query + + if status: + query = query.filter_by(status=status) + + docs = query.order_by(Document.created_at.desc()).paginate(page=page, per_page=20) + + return jsonify({ + 'documents': [d.to_dict() for d in docs.items], + 'total': docs.total, + 'pages': docs.pages, + 'current_page': docs.page + }) + + +@app.route('/api/documents', methods=['POST']) +def api_upload_document(): + """上传文档""" + if 'file' not in request.files: + return jsonify({'error': '未上传文件'}), 400 + + file = request.files['file'] + if file.filename == '': + return jsonify({'error': '未选择文件'}), 400 + + # 检查文件类型 + ext = os.path.splitext(file.filename)[1].lower() + if ext not in SUPPORTED_FORMATS: + return jsonify({'error': f'不支持的文件格式: {ext}'}), 400 + + # 保存文件 + filename = secure_filename(file.filename) + filepath = os.path.join(DOCUMENT_DIR, filename) + + # 避免重名 + if os.path.exists(filepath): + base, ext = os.path.splitext(filename) + filename = f"{base}_{datetime.now().strftime('%Y%m%d%H%M%S')}{ext}" + filepath = os.path.join(DOCUMENT_DIR, filename) + + file.save(filepath) + + # 创建文档记录 + doc = Document( + filename=filename, + filepath=filepath, + file_type=ext, + file_size=os.path.getsize(filepath), + title=request.form.get('title', filename), + source=request.form.get('source', ''), + ) + db.session.add(doc) + db.session.commit() + + return jsonify({ + 'success': True, + 'document': doc.to_dict() + }) + + +@app.route('/api/documents/', methods=['GET']) +def api_get_document(doc_id): + """获取文档详情""" + doc = Document.query.get_or_404(doc_id) + + chunks = DocumentChunk.query.filter_by(document_id=doc_id).all() + + return jsonify({ + 'document': doc.to_dict(), + 'chunks': [c.to_dict() for c in chunks] + }) + + +@app.route('/api/documents/', methods=['DELETE']) +def api_delete_document(doc_id): + """删除文档""" + doc = Document.query.get_or_404(doc_id) + + # 删除文件 + if os.path.exists(doc.filepath): + os.remove(doc.filepath) + + # 删除数据库记录(级联删除chunks) + db.session.delete(doc) + db.session.commit() + + # 更新统计 + IndexStats.get_stats().update_stats() + + return jsonify({'success': True}) + + +# === 索引管理 === + +@app.route('/api/index/', methods=['POST']) +def api_index_document(doc_id): + """索引单个文档""" + success = indexer.index_document(doc_id) + + if success: + return jsonify({'success': True, 'message': '索引完成'}) + else: + return jsonify({'error': '索引失败'}), 500 + + +@app.route('/api/index/batch', methods=['POST']) +def api_batch_index(): + """批量索引所有待索引文档""" + pending_docs = Document.query.filter_by(status='pending').all() + + results = {'success': 0, 'failed': 0, 'total': len(pending_docs)} + + for doc in pending_docs: + if indexer.index_document(doc.id): + results['success'] += 1 + else: + results['failed'] += 1 + + return jsonify(results) + + +@app.route('/api/index/rebuild', methods=['POST']) +def api_rebuild_index(): + """重建所有索引""" + # 清除旧索引 + InvertedIndex.query.delete() + DocumentChunk.query.delete() + + docs = Document.query.all() + for doc in docs: + doc.status = 'pending' + doc.indexed_at = None + + db.session.commit() + + # 批量索引 + return api_batch_index() + + +@app.route('/api/stats', methods=['GET']) +def api_get_stats(): + """获取索引统计""" + stats = IndexStats.get_stats() + + return jsonify({ + 'total_documents': stats.total_documents, + 'total_chunks': stats.total_chunks, + 'total_terms': stats.total_terms, + 'total_words': stats.total_words, + 'last_indexed_at': stats.last_indexed_at.isoformat() if stats.last_indexed_at else None + }) + + +# === 搜索 === + +@app.route('/api/search', methods=['POST']) +def api_search(): + """搜索文档""" + data = request.json + query = data.get('query', '') + top_k = data.get('top_k', 10) + + if not query: + return jsonify({'error': '查询不能为空'}), 400 + + results = search_engine.search(query, top_k) + + return jsonify({ + 'query': query, + 'results': results, + 'total': len(results) + }) + + +@app.route('/api/search/suggestions', methods=['GET']) +def api_search_suggestions(): + """获取搜索建议(自动补全)""" + prefix = request.args.get('prefix', '') + + if len(prefix) < 2: + return jsonify({'suggestions': []}) + + # 从倒排索引中查找匹配的词 + terms = InvertedIndex.query.filter( + InvertedIndex.term.ilike(f'{prefix}%') + ).order_by(InvertedIndex.total_freq.desc()).limit(10).all() + + return jsonify({ + 'suggestions': [t.term for t in terms] + }) + + +# === RAG === + +@app.route('/api/rag/answer', methods=['POST']) +def api_rag_answer(): + """RAG问答""" + data = request.json + query = data.get('query', '') + top_k = data.get('top_k', 5) + + if not query: + return jsonify({'error': '查询不能为空'}), 400 + + result = rag_generator.answer(query, top_k) + + return jsonify(result) + + +# === 查询日志 === + +@app.route('/api/logs', methods=['GET']) +def api_get_logs(): + """获取查询日志""" + page = request.args.get('page', 1, type=int) + + logs = QueryLog.query.order_by(QueryLog.created_at.desc()).paginate(page=page, per_page=50) + + return jsonify({ + 'logs': [l.to_dict() for l in logs.items], + 'total': logs.total, + 'pages': logs.pages + }) + + +@app.route('/api/logs//feedback', methods=['POST']) +def api_log_feedback(log_id): + """提交查询反馈""" + log = QueryLog.query.get_or_404(log_id) + + data = request.json + log.rating = data.get('rating') + log.feedback = data.get('feedback') + + db.session.commit() + + return jsonify({'success': True}) + + +# ==================== 启动 ==================== +if __name__ == '__main__': + init_app() + app.run(host=API_HOST, port=API_PORT, debug=True) \ No newline at end of file diff --git a/config.py b/config.py new file mode 100644 index 0000000..c4fbe78 --- /dev/null +++ b/config.py @@ -0,0 +1,65 @@ +""" +LLM Index RAG 配置文件 +基于索引和搜索的RAG系统(不使用向量模型) +""" + +# ==================== 应用配置 ==================== +APP_NAME = "LLM Index RAG" +APP_VERSION = "1.0.0" +SECRET_KEY = "llm-index-rag-secret-key" + +# ==================== LLM配置 ==================== +LLM_CONFIG = { + "api_base": "http://192.168.2.5:1234/v1", + "api_key": "sk-lm-fuP5tGU8:Hi7YU87jHyDP6Ay8Tl2j", + "model": "qwen/qwen3.5-35b-a3b", + "max_tokens": 4000, + "temperature": 0.3, + "timeout": 120, +} + +# ==================== 文档配置 ==================== +DOCUMENT_DIR = "documents" # 文档存储目录 +INDEX_DIR = "indexes" # 索引存储目录 +LOG_DIR = "logs" # 日志目录 + +SUPPORTED_FORMATS = ['.txt', '.md', '.pdf', '.docx', '.html', '.json'] + +# 文档处理配置 +DOC_CONFIG = { + "chunk_size": 2000, # 文档分块大小 + "chunk_overlap": 200, # 分块重叠 + "max_keywords": 20, # 每个文档块最大关键词数 + "max_summary_length": 500, # 摘要最大长度 +} + +# ==================== 索引配置 ==================== +INDEX_CONFIG = { + # BM25参数 + "bm25_k1": 1.5, # 词频饱和参数 + "bm25_b": 0.75, # 文档长度归一化参数 + + # 检索配置 + "max_results": 20, # 最大返回结果数 + "min_score": 0.1, # 最低相关性分数 + + # 关键词权重 + "title_weight": 3.0, # 标题关键词权重 + "keyword_weight": 2.0, # 显式关键词权重 + "content_weight": 1.0, # 内容关键词权重 + "summary_weight": 1.5, # 摘要关键词权重 +} + +# ==================== 查询配置 ==================== +QUERY_CONFIG = { + "max_expansion_terms": 5, # 查询扩展最大词数 + "use_query_expansion": True, # 是否启用查询扩展 + "use_rerank": True, # 是否使用重排 +} + +# ==================== 数据库配置 ==================== +DATABASE_URL = "sqlite:///llm_index_rag.db" + +# ==================== API配置 ==================== +API_HOST = "0.0.0.0" +API_PORT = 19001 \ No newline at end of file diff --git a/models.py b/models.py new file mode 100644 index 0000000..44ad38c --- /dev/null +++ b/models.py @@ -0,0 +1,275 @@ +""" +数据库模型定义 +""" + +from datetime import datetime +from flask_sqlalchemy import SQLAlchemy +import json + +db = SQLAlchemy() + + +class Document(db.Model): + """文档表""" + __tablename__ = 'documents' + + id = db.Column(db.Integer, primary_key=True) + + # 文件信息 + filename = db.Column(db.String(255), nullable=False) + filepath = db.Column(db.String(512), nullable=False) + file_type = db.Column(db.String(20), nullable=False) + file_size = db.Column(db.Integer, default=0) + + # 文档元数据 + title = db.Column(db.String(500), nullable=True) + author = db.Column(db.String(100), nullable=True) + source = db.Column(db.String(255), nullable=True) + + # 处理状态 + status = db.Column(db.String(20), default='pending') # pending, processing, indexed, failed + error_message = db.Column(db.Text, nullable=True) + + # 文档内容(可选存储原文) + content = db.Column(db.Text, nullable=True) + + # 文档摘要(LLM生成) + summary = db.Column(db.Text, nullable=True) + + # 主要关键词(JSON数组) + keywords = db.Column(db.Text, nullable=True) + + # 文档分类/主题 + category = db.Column(db.String(100), nullable=True) + topics = db.Column(db.Text, nullable=True) # JSON数组 + + # 文档实体(人物、地点、组织等) + entities = db.Column(db.Text, nullable=True) # JSON对象 + + # 统计信息 + chunk_count = db.Column(db.Integer, default=0) # 分块数量 + word_count = db.Column(db.Integer, default=0) # 字数 + + # 时间戳 + created_at = db.Column(db.DateTime, default=datetime.utcnow) + indexed_at = db.Column(db.DateTime, nullable=True) + updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + # 关系 + chunks = db.relationship('DocumentChunk', backref='document', lazy=True, cascade='all, delete-orphan') + + def get_keywords(self): + """获取关键词列表""" + if self.keywords: + return json.loads(self.keywords) + return [] + + def set_keywords(self, keywords): + """设置关键词""" + self.keywords = json.dumps(keywords, ensure_ascii=False) + + def get_topics(self): + """获取主题列表""" + if self.topics: + return json.loads(self.topics) + return [] + + def set_topics(self, topics): + """设置主题""" + self.topics = json.dumps(topics, ensure_ascii=False) + + def get_entities(self): + """获取实体""" + if self.entities: + return json.loads(self.entities) + return {} + + def set_entities(self, entities): + """设置实体""" + self.entities = json.dumps(entities, ensure_ascii=False) + + def to_dict(self): + return { + 'id': self.id, + 'filename': self.filename, + 'title': self.title, + 'status': self.status, + 'summary': self.summary, + 'keywords': self.get_keywords(), + 'category': self.category, + 'chunk_count': self.chunk_count, + 'word_count': self.word_count, + 'created_at': self.created_at.isoformat() if self.created_at else None, + 'indexed_at': self.indexed_at.isoformat() if self.indexed_at else None, + } + + +class DocumentChunk(db.Model): + """文档分块表""" + __tablename__ = 'document_chunks' + + id = db.Column(db.Integer, primary_key=True) + document_id = db.Column(db.Integer, db.ForeignKey('documents.id'), nullable=False) + + # 分块信息 + chunk_index = db.Column(db.Integer, default=0) # 块序号 + content = db.Column(db.Text, nullable=False) # 块内容 + + # LLM生成的索引信息 + summary = db.Column(db.Text, nullable=True) # 块摘要 + keywords = db.Column(db.Text, nullable=True) # 关键词JSON数组 + topics = db.Column(db.Text, nullable=True) # 主题JSON数组 + + # 位置信息 + start_char = db.Column(db.Integer, default=0) + end_char = db.Column(db.Integer, default=0) + + # 词频统计(用于BM25) + term_freq = db.Column(db.Text, nullable=True) # JSON对象 {term: count} + + created_at = db.Column(db.DateTime, default=datetime.utcnow) + + def get_keywords(self): + if self.keywords: + return json.loads(self.keywords) + return [] + + def set_keywords(self, keywords): + self.keywords = json.dumps(keywords, ensure_ascii=False) + + def get_term_freq(self): + if self.term_freq: + return json.loads(self.term_freq) + return {} + + def set_term_freq(self, tf): + self.term_freq = json.dumps(tf, ensure_ascii=False) + + def to_dict(self): + return { + 'id': self.id, + 'document_id': self.document_id, + 'chunk_index': self.chunk_index, + 'content': self.content[:200] + '...' if len(self.content) > 200 else self.content, + 'summary': self.summary, + 'keywords': self.get_keywords(), + } + + +class InvertedIndex(db.Model): + """倒排索引表""" + __tablename__ = 'inverted_index' + + id = db.Column(db.Integer, primary_key=True) + + # 索引项 + term = db.Column(db.String(100), nullable=False, index=True) # 关键词 + term_type = db.Column(db.String(20), default='keyword') # keyword, entity, topic + + # 文档频率 + doc_freq = db.Column(db.Integer, default=0) # 包含该词的文档数 + + # 倒排列表(JSON:[{doc_id, chunk_id, tf, positions}]) + postings = db.Column(db.Text, nullable=False) + + # 统计 + total_freq = db.Column(db.Integer, default=0) # 总词频 + + updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + def get_postings(self): + if self.postings: + return json.loads(self.postings) + return [] + + def set_postings(self, postings): + self.postings = json.dumps(postings, ensure_ascii=False) + self.doc_freq = len(set(p['doc_id'] for p in postings)) + self.total_freq = sum(p.get('tf', 1) for p in postings) + + @staticmethod + def get_or_create(term, term_type='keyword'): + """获取或创建索引项""" + index = InvertedIndex.query.filter_by(term=term, term_type=term_type).first() + if not index: + index = InvertedIndex(term=term, term_type=term_type, postings='[]') + db.session.add(index) + return index + + +class QueryLog(db.Model): + """查询日志表""" + __tablename__ = 'query_logs' + + id = db.Column(db.Integer, primary_key=True) + + # 查询信息 + original_query = db.Column(db.Text, nullable=False) # 原始查询 + processed_query = db.Column(db.Text, nullable=True) # 处理后的查询 + expanded_terms = db.Column(db.Text, nullable=True) # 扩展词JSON数组 + + # 查询意图 + intent = db.Column(db.String(50), nullable=True) # 查询意图 + entities = db.Column(db.Text, nullable=True) # 识别的实体 + + # 检索结果 + result_count = db.Column(db.Integer, default=0) + top_doc_ids = db.Column(db.Text, nullable=True) # 返回的文档ID JSON数组 + + # 性能 + retrieval_time = db.Column(db.Float, default=0) # 检索耗时(秒) + total_time = db.Column(db.Float, default=0) # 总耗时(秒) + + # 用户反馈 + rating = db.Column(db.Integer, nullable=True) # 评分1-5 + feedback = db.Column(db.Text, nullable=True) # 反馈文本 + + created_at = db.Column(db.DateTime, default=datetime.utcnow) + + def to_dict(self): + return { + 'id': self.id, + 'query': self.original_query, + 'result_count': self.result_count, + 'retrieval_time': self.retrieval_time, + 'created_at': self.created_at.isoformat() if self.created_at else None, + } + + +class IndexStats(db.Model): + """索引统计表""" + __tablename__ = 'index_stats' + + id = db.Column(db.Integer, primary_key=True) + + # 统计信息 + total_documents = db.Column(db.Integer, default=0) + total_chunks = db.Column(db.Integer, default=0) + total_terms = db.Column(db.Integer, default=0) + total_words = db.Column(db.Integer, default=0) + + # 索引大小 + index_size_mb = db.Column(db.Float, default=0) + + # 最后更新 + last_indexed_at = db.Column(db.DateTime, nullable=True) + + updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + @staticmethod + def get_stats(): + stats = IndexStats.query.first() + if not stats: + stats = IndexStats() + db.session.add(stats) + db.session.commit() + return stats + + def update_stats(self): + """更新统计信息""" + self.total_documents = Document.query.filter_by(status='indexed').count() + self.total_chunks = DocumentChunk.query.count() + self.total_terms = InvertedIndex.query.count() + self.total_words = db.session.query(db.func.sum(Document.word_count)).scalar() or 0 + self.last_indexed_at = datetime.utcnow() + db.session.commit() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..ffb41ce --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +flask>=2.3.0 +flask-sqlalchemy>=3.0.0 +openai>=2.0.0 +pypdf>=6.0.0 +python-docx>=0.8.11 \ No newline at end of file diff --git a/services.py b/services.py new file mode 100644 index 0000000..4b08e77 --- /dev/null +++ b/services.py @@ -0,0 +1,655 @@ +""" +文档索引服务 +使用LLM分析文档并构建索引 +""" + +import os +import re +import json +import math +from datetime import datetime +from collections import Counter +from openai import OpenAI +from flask import current_app + +from config import LLM_CONFIG, DOC_CONFIG, INDEX_CONFIG +from models import db, Document, DocumentChunk, InvertedIndex, IndexStats + + +class LLMService: + """LLM服务封装""" + + def __init__(self): + self.client = OpenAI( + api_key=LLM_CONFIG['api_key'], + base_url=LLM_CONFIG['api_base'], + ) + self.model = LLM_CONFIG['model'] + self.max_tokens = LLM_CONFIG['max_tokens'] + self.temperature = LLM_CONFIG['temperature'] + + def analyze_document(self, content, title=None): + """ + 分析文档,提取关键信息 + + Returns: + dict: {summary, keywords, topics, entities, category} + """ + prompt = f"""请分析以下文档内容,提取关键信息。 + +{'文档标题:' + title if title else ''} + +文档内容(前3000字): +{content[:3000]} + +请以JSON格式返回以下信息: +{{ + "summary": "文档摘要(100-200字)", + "keywords": ["关键词1", "关键词2", ...最多10个], + "topics": ["主题1", "主题2", ...最多5个], + "category": "文档分类", + "entities": {{ + "persons": ["人名"], + "organizations": ["组织名"], + "locations": ["地点"], + "dates": ["日期"], + "others": ["其他实体"] + }} +}} + +只返回JSON,不要其他内容。""" + + try: + response = self.client.chat.completions.create( + model=self.model, + messages=[{"role": "user", "content": prompt}], + max_tokens=1000, + temperature=0.3, + ) + + result = response.choices[0].message.content.strip() + # 清理可能的markdown标记 + result = re.sub(r'^```json\s*', '', result) + result = re.sub(r'\s*```$', '', result) + + return json.loads(result) + + except Exception as e: + print(f"LLM分析失败: {e}") + return { + "summary": "", + "keywords": [], + "topics": [], + "category": "", + "entities": {} + } + + def analyze_chunk(self, content): + """ + 分析文档块,提取关键词 + + Returns: + dict: {summary, keywords, topics} + """ + prompt = f"""分析以下文本片段,提取关键信息。 + +文本: +{content[:1500]} + +请以JSON格式返回: +{{ + "summary": "片段摘要(50字以内)", + "keywords": ["关键词", ...最多8个], + "topics": ["主题", ...最多3个] +}} + +只返回JSON。""" + + try: + response = self.client.chat.completions.create( + model=self.model, + messages=[{"role": "user", "content": prompt}], + max_tokens=500, + temperature=0.3, + ) + + result = response.choices[0].message.content.strip() + result = re.sub(r'^```json\s*', '', result) + result = re.sub(r'\s*```$', '', result) + + return json.loads(result) + + except Exception as e: + return {"summary": "", "keywords": [], "topics": []} + + def process_query(self, query): + """ + 处理查询,提取意图和关键词 + + Returns: + dict: {intent, keywords, expanded_terms, entities} + """ + prompt = f"""分析以下查询,提取搜索意图和关键词。 + +查询:{query} + +请以JSON格式返回: +{{ + "intent": "查询意图(如:查找信息、比较、解释、列表等)", + "keywords": ["主要关键词", ...最多10个], + "expanded_terms": ["同义词或相关词", ...最多5个], + "entities": {{ + "persons": [], + "organizations": [], + "locations": [], + "dates": [], + "others": [] + }} +}} + +只返回JSON。""" + + try: + response = self.client.chat.completions.create( + model=self.model, + messages=[{"role": "user", "content": prompt}], + max_tokens=500, + temperature=0.3, + ) + + result = response.choices[0].message.content.strip() + result = re.sub(r'^```json\s*', '', result) + result = re.sub(r'\s*```$', '', result) + + return json.loads(result) + + except Exception as e: + return { + "intent": "search", + "keywords": query.split(), + "expanded_terms": [], + "entities": {} + } + + +class DocumentIndexer: + """文档索引器""" + + def __init__(self): + self.llm = LLMService() + self.chunk_size = DOC_CONFIG['chunk_size'] + self.chunk_overlap = DOC_CONFIG['chunk_overlap'] + + def index_document(self, doc_id): + """ + 索引单个文档 + + Args: + doc_id: 文档ID + + Returns: + bool: 是否成功 + """ + doc = Document.query.get(doc_id) + if not doc: + return False + + try: + doc.status = 'processing' + db.session.commit() + + # 读取文档内容 + content = self._read_document(doc.filepath) + if not content: + raise Exception("无法读取文档内容") + + # 存储原文 + doc.content = content + doc.word_count = len(content) + + # 使用LLM分析整个文档 + print(f" 正在分析文档: {doc.filename}") + analysis = self.llm.analyze_document(content, doc.title) + + doc.summary = analysis.get('summary', '') + doc.set_keywords(analysis.get('keywords', [])) + doc.set_topics(analysis.get('topics', [])) + doc.category = analysis.get('category', '') + doc.set_entities(analysis.get('entities', {})) + + # 分块处理 + chunks = self._split_content(content) + doc.chunk_count = len(chunks) + + # 清理旧分块 + DocumentChunk.query.filter_by(document_id=doc.id).delete() + + # 索引每个分块 + for i, chunk_content in enumerate(chunks): + chunk = DocumentChunk( + document_id=doc.id, + chunk_index=i, + content=chunk_content, + start_char=0, + end_char=len(chunk_content) + ) + + # LLM分析分块 + chunk_analysis = self.llm.analyze_chunk(chunk_content) + chunk.summary = chunk_analysis.get('summary', '') + chunk.set_keywords(chunk_analysis.get('keywords', [])) + + # 计算词频 + term_freq = self._compute_term_freq(chunk_content) + chunk.set_term_freq(term_freq) + + db.session.add(chunk) + + db.session.commit() + + # 更新倒排索引 + self._update_inverted_index(doc.id) + + # 标记完成 + doc.status = 'indexed' + doc.indexed_at = datetime.utcnow() + db.session.commit() + + # 更新统计 + IndexStats.get_stats().update_stats() + + print(f" ✓ 文档索引完成: {doc.filename}") + return True + + except Exception as e: + doc.status = 'failed' + doc.error_message = str(e) + db.session.commit() + print(f" ✗ 索引失败: {e}") + return False + + def _read_document(self, filepath): + """读取文档内容""" + ext = os.path.splitext(filepath)[1].lower() + + if ext in ['.txt', '.md', '.json', '.html']: + with open(filepath, 'r', encoding='utf-8', errors='ignore') as f: + return f.read() + + elif ext == '.pdf': + try: + from pypdf import PdfReader + reader = PdfReader(filepath) + text = '' + for page in reader.pages: + text += page.extract_text() + '\n' + return text + except: + pass + + elif ext == '.docx': + try: + from docx import Document as DocxDocument + doc = DocxDocument(filepath) + return '\n'.join([p.text for p in doc.paragraphs]) + except: + pass + + return None + + def _split_content(self, content): + """ + 分割内容为块 + + Args: + content: 文档内容 + + Returns: + list: 内容块列表 + """ + chunks = [] + + # 按段落分割 + paragraphs = content.split('\n\n') + + current_chunk = "" + for para in paragraphs: + if len(current_chunk) + len(para) < self.chunk_size: + current_chunk += para + '\n\n' + else: + if current_chunk.strip(): + chunks.append(current_chunk.strip()) + current_chunk = para + '\n\n' + + if current_chunk.strip(): + chunks.append(current_chunk.strip()) + + return chunks if chunks else [content[:self.chunk_size]] + + def _compute_term_freq(self, content): + """计算词频""" + # 简单分词(中英文混合) + # 中文按字符,英文按空格 + terms = [] + + # 提取中文词汇(简单按字,实际可用jieba) + chinese = re.findall(r'[\u4e00-\u9fff]+', content) + for text in chinese: + # 简单的双字词分割 + if len(text) >= 2: + for i in range(len(text) - 1): + terms.append(text[i:i+2]) + terms.extend(list(text)) + + # 提取英文单词 + english = re.findall(r'[a-zA-Z]+', content.lower()) + terms.extend(english) + + # 统计词频 + return dict(Counter(terms)) + + def _update_inverted_index(self, doc_id): + """更新倒排索引""" + chunks = DocumentChunk.query.filter_by(document_id=doc_id).all() + + # 收集所有词及其位置 + term_postings = {} + + for chunk in chunks: + # 从词频获取词 + tf = chunk.get_term_freq() + + # 从关键词获取词 + keywords = chunk.get_keywords() + + for kw in keywords: + if kw not in term_postings: + term_postings[kw] = [] + term_postings[kw].append({ + 'doc_id': doc_id, + 'chunk_id': chunk.id, + 'tf': tf.get(kw, 1), + 'weight': INDEX_CONFIG['keyword_weight'] + }) + + for term, freq in tf.items(): + if term not in term_postings: + term_postings[term] = [] + term_postings[term].append({ + 'doc_id': doc_id, + 'chunk_id': chunk.id, + 'tf': freq, + 'weight': INDEX_CONFIG['content_weight'] + }) + + # 更新数据库 + for term, postings in term_postings.items(): + index = InvertedIndex.get_or_create(term) + existing = index.get_postings() + + # 合并postings(去除旧的同一文档的记录) + existing = [p for p in existing if p['doc_id'] != doc_id] + existing.extend(postings) + + index.set_postings(existing) + + db.session.commit() + + +class SearchEngine: + """搜索引擎""" + + def __init__(self): + self.llm = LLMService() + self.k1 = INDEX_CONFIG['bm25_k1'] + self.b = INDEX_CONFIG['bm25_b'] + + def search(self, query, top_k=10): + """ + 搜索文档 + + Args: + query: 查询字符串 + top_k: 返回结果数 + + Returns: + list: 搜索结果 [{doc, score, highlights}] + """ + start_time = datetime.now() + + # 1. LLM处理查询 + print(f"处理查询: {query}") + query_analysis = self.llm.process_query(query) + + keywords = query_analysis.get('keywords', []) + expanded = query_analysis.get('expanded_terms', []) + + # 合并关键词 + all_terms = keywords + expanded + + print(f" 关键词: {keywords}") + print(f" 扩展词: {expanded}") + + # 2. 检索 + results = self._retrieve(all_terms) + + # 3. 计算BM25分数 + scored_results = self._score_results(results, all_terms) + + # 4. 排序 + scored_results.sort(key=lambda x: x['score'], reverse=True) + + # 5. 返回top_k + final_results = scored_results[:top_k] + + retrieval_time = (datetime.now() - start_time).total_seconds() + + # 6. 记录日志 + self._log_query(query, query_analysis, final_results, retrieval_time) + + return final_results + + def _retrieve(self, terms): + """ + 检索包含关键词的文档 + + Args: + terms: 关键词列表 + + Returns: + dict: {doc_id: {chunks: [], terms: []}} + """ + results = {} + + for term in terms: + # 查询倒排索引 + index = InvertedIndex.query.filter( + InvertedIndex.term.ilike(f'%{term}%') + ).all() + + for idx in index: + postings = idx.get_postings() + + for p in postings: + doc_id = p['doc_id'] + + if doc_id not in results: + results[doc_id] = { + 'chunks': set(), + 'terms': {}, + 'postings': [] + } + + results[doc_id]['chunks'].add(p['chunk_id']) + results[doc_id]['terms'][term] = results[doc_id]['terms'].get(term, 0) + p.get('tf', 1) + results[doc_id]['postings'].append({ + 'term': term, + 'chunk_id': p['chunk_id'], + 'tf': p.get('tf', 1), + 'weight': p.get('weight', 1.0) + }) + + return results + + def _score_results(self, results, query_terms): + """ + 使用BM25计算分数 + + Args: + results: 检索结果 + query_terms: 查询词 + + Returns: + list: [{doc, score, chunks}] + """ + scored = [] + + # 计算平均文档长度 + total_docs = Document.query.filter_by(status='indexed').count() + if total_docs == 0: + return [] + + avg_doc_len = db.session.query( + db.func.avg(Document.word_count) + ).filter(Document.status == 'indexed').scalar() or 1000 + + for doc_id, data in results.items(): + doc = Document.query.get(doc_id) + if not doc or doc.status != 'indexed': + continue + + # BM25计算 + score = 0 + doc_len = doc.word_count or 1000 + + for term in query_terms: + # 查询倒排索引获取文档频率 + index = InvertedIndex.query.filter_by(term=term).first() + df = index.doc_freq if index else 1 + + # IDF + idf = math.log((total_docs - df + 0.5) / (df + 0.5) + 1) + + # TF + tf = data['terms'].get(term, 0) + + # BM25公式 + tf_component = (tf * (self.k1 + 1)) / ( + tf + self.k1 * (1 - self.b + self.b * doc_len / avg_doc_len) + ) + + score += idf * tf_component + + # 获取匹配的chunk内容 + chunk_ids = list(data['chunks'])[:3] # 最多取3个chunk + chunks = DocumentChunk.query.filter(DocumentChunk.id.in_(chunk_ids)).all() + + scored.append({ + 'doc': doc.to_dict(), + 'score': score, + 'matched_chunks': [c.to_dict() for c in chunks], + 'matched_terms': list(data['terms'].keys()) + }) + + return scored + + def _log_query(self, query, analysis, results, retrieval_time): + """记录查询日志""" + log = QueryLog( + original_query=query, + processed_query=' '.join(analysis.get('keywords', [])), + expanded_terms=json.dumps(analysis.get('expanded_terms', [])), + intent=analysis.get('intent'), + entities=json.dumps(analysis.get('entities', {})), + result_count=len(results), + top_doc_ids=json.dumps([r['doc']['id'] for r in results[:5]]), + retrieval_time=retrieval_time, + total_time=retrieval_time + ) + db.session.add(log) + db.session.commit() + + +class RAGGenerator: + """RAG生成器""" + + def __init__(self): + self.llm = LLMService() + self.search_engine = SearchEngine() + + def answer(self, query, top_k=5): + """ + RAG回答 + + Args: + query: 用户查询 + top_k: 检索文档数 + + Returns: + dict: {answer, sources, confidence} + """ + # 1. 检索相关文档 + results = self.search_engine.search(query, top_k) + + if not results: + return { + 'answer': '抱歉,没有找到相关信息。', + 'sources': [], + 'confidence': 0 + } + + # 2. 构建上下文 + context_parts = [] + sources = [] + + for i, r in enumerate(results[:3]): # 最多使用3个文档 + doc = r['doc'] + chunks = r['matched_chunks'] + + context_parts.append(f"【文档{i+1}】{doc.get('title', doc['filename'])}") + + for chunk in chunks[:2]: # 每个文档最多2个chunk + context_parts.append(chunk.get('content', '')[:500]) + + sources.append({ + 'id': doc['id'], + 'title': doc.get('title', doc['filename']), + 'score': r['score'] + }) + + context = '\n\n'.join(context_parts) + + # 3. LLM生成回答 + prompt = f"""基于以下参考信息回答问题。如果参考信息中没有相关内容,请说明。 + +问题:{query} + +参考信息: +{context} + +请给出准确、简洁的回答,并标注信息来源。""" + + try: + client = OpenAI( + api_key=LLM_CONFIG['api_key'], + base_url=LLM_CONFIG['api_base'], + ) + + response = client.chat.completions.create( + model=LLM_CONFIG['model'], + messages=[{"role": "user", "content": prompt}], + max_tokens=1000, + temperature=0.5, + ) + + answer = response.choices[0].message.content + + except Exception as e: + answer = f"生成回答时出错: {e}" + + # 4. 返回结果 + return { + 'answer': answer, + 'sources': sources, + 'confidence': min(1.0, results[0]['score'] / 10) if results else 0 + } \ No newline at end of file diff --git a/templates/documents.html b/templates/documents.html new file mode 100644 index 0000000..be4fb25 --- /dev/null +++ b/templates/documents.html @@ -0,0 +1,217 @@ + + + + + 文档管理 - LLM Index RAG + + + + + + + +
+
+

文档管理

+
+ + +
+
+ + +
+
+
上传文档
+
+
+
+
+
+ +
+
+ +
+
+ +
+
+
+
+
+ + +
+
+
文档列表
+
+
+ + + + + + + + + + + + + + + +
文件名类型大小分块数状态索引时间操作
加载中...
+
+
+ + + +
+ + + + \ No newline at end of file diff --git a/templates/index.html b/templates/index.html new file mode 100644 index 0000000..8dc96c6 --- /dev/null +++ b/templates/index.html @@ -0,0 +1,162 @@ + + + + + + LLM Index RAG - 基于索引的知识检索系统 + + + + + + + + + +
+
+

LLM Index RAG

+

基于索引和搜索的知识检索系统(不使用向量模型)

+

使用LLM构建索引 • 关键词检索 • BM25排序

+
+
+ + +
+ +
+ + +
+
+
+
+ +

{{ stats.total_documents or 0 }}

+ 文档数量 +
+
+
+
+ +

{{ stats.total_chunks or 0 }}

+ 文档分块 +
+
+
+
+ +

{{ stats.total_terms or 0 }}

+ 索引词条 +
+
+
+
+ +

{{ "{:,}".format(stats.total_words or 0) }}

+ 总字数 +
+
+
+
+ + +
+ + + + +
+ + +
+

工作原理

+
+
+
+
+ +
1. 文档索引
+

使用LLM分析文档,提取关键词、摘要、主题、实体等信息构建索引

+
+
+
+
+
+
+ +
2. 查询处理
+

LLM分析查询意图,提取关键词并进行查询扩展

+
+
+
+
+
+
+ +
3. BM25检索
+

基于倒排索引和BM25算法计算相关性得分,返回最相关文档

+
+
+
+
+
+ +
+
+

LLM Index RAG v1.0.0 | 基于索引的知识检索系统

+
+
+ + + + + \ No newline at end of file diff --git a/templates/search.html b/templates/search.html new file mode 100644 index 0000000..ceb5862 --- /dev/null +++ b/templates/search.html @@ -0,0 +1,195 @@ + + + + + 知识检索 - LLM Index RAG + + + + + + + + + + + +
+ + + + + + + + +
+ + + + \ No newline at end of file