From cdaadef10ce20f61673af7808acd75c0645470cf Mon Sep 17 00:00:00 2001 From: coder Date: Tue, 7 Apr 2026 23:48:06 +0800 Subject: [PATCH] =?UTF-8?q?V1.0.0:=20=E5=9F=BA=E4=BA=8E=E7=B4=A2=E5=BC=95?= =?UTF-8?q?=E7=9A=84=E7=9F=A5=E8=AF=86=E6=A3=80=E7=B4=A2=E7=B3=BB=E7=BB=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 核心功能: - 文档索引:使用LLM分析提取关键词/摘要/主题/实体 - 查询处理:LLM分析查询意图并扩展关键词 - BM25检索:基于倒排索引的相关性排序 - RAG问答:检索增强生成 技术栈: - Flask + SQLAlchemy - OpenAI API兼容LLM - BM25算法 特点: 不依赖向量模型和向量库 --- .gitignore | 5 + README.md | 179 +++++++++++ app.py | 321 +++++++++++++++++++ config.py | 65 ++++ models.py | 275 ++++++++++++++++ requirements.txt | 5 + services.py | 655 +++++++++++++++++++++++++++++++++++++++ templates/documents.html | 217 +++++++++++++ templates/index.html | 162 ++++++++++ templates/search.html | 195 ++++++++++++ 10 files changed, 2079 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 app.py create mode 100644 config.py create mode 100644 models.py create mode 100644 requirements.txt create mode 100644 services.py create mode 100644 templates/documents.html create mode 100644 templates/index.html create mode 100644 templates/search.html diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0b298dd --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +instance/ +*.db +__pycache__/ +*.pyc +.env \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..79d88f4 --- /dev/null +++ b/README.md @@ -0,0 +1,179 @@ +# LLM Index RAG + +基于索引和搜索的知识检索系统(不使用向量模型和向量库) + +## 特点 + +- **不依赖向量模型** - 使用传统信息检索技术 +- **LLM增强索引** - 使用大语言模型提取关键词、摘要、实体 +- **BM25排序** - 经典相关性算法 +- **智能查询处理** - LLM分析查询意图并扩展关键词 + +## 工作原理 + +### 1. 文档索引阶段 + +``` +文档 → LLM分析 → 提取关键词/摘要/主题/实体 → 构建倒排索引 +``` + +- 对每个文档使用LLM进行分析 +- 提取:关键词、摘要、主题分类、命名实体 +- 分块处理,计算词频 +- 构建倒排索引(term → [doc_ids]) + +### 2. 查询处理阶段 + +``` +Query → LLM分析 → 提取意图/关键词 → 查询扩展 → 检索 +``` + +- LLM分析查询意图(查找/比较/解释等) +- 提取主要关键词 +- 生成同义词/相关词扩展 +- 使用BM25计算相关性得分 + +### 3. 检索与生成 + +- **文档检索模式**:返回相关文档列表 +- **智能问答模式**:RAG生成回答 + +## 快速开始 + +```bash +# 安装依赖 +pip install -r requirements.txt + +# 启动服务 +python app.py +``` + +访问 http://localhost:19001 + +## API接口 + +### 文档管理 + +| 接口 | 方法 | 说明 | +|------|------|------| +| `/api/documents` | GET | 获取文档列表 | +| `/api/documents` | POST | 上传文档 | +| `/api/documents/` | GET | 获取文档详情 | +| `/api/documents/` | DELETE | 删除文档 | + +### 索引管理 + +| 接口 | 方法 | 说明 | +|------|------|------| +| `/api/index/` | POST | 索引单个文档 | +| `/api/index/batch` | POST | 批量索引 | +| `/api/index/rebuild` | POST | 重建索引 | +| `/api/stats` | GET | 获取统计信息 | + +### 搜索 + +| 接口 | 方法 | 说明 | +|------|------|------| +| `/api/search` | POST | 文档检索 | +| `/api/rag/answer` | POST | 智能问答 | + +## 使用示例 + +### 上传并索引文档 + +```python +import requests + +# 上传文档 +files = {'file': open('document.pdf', 'rb')} +r = requests.post('http://localhost:19001/api/documents', files=files) +doc_id = r.json()['document']['id'] + +# 索引文档 +requests.post(f'http://localhost:19001/api/index/{doc_id}') +``` + +### 搜索 + +```python +import requests + +# 文档检索 +r = requests.post('http://localhost:19001/api/search', json={ + 'query': '机器学习的基本原理', + 'top_k': 10 +}) +results = r.json()['results'] + +# 智能问答 +r = requests.post('http://localhost:19001/api/rag/answer', json={ + 'query': '什么是深度学习?' +}) +answer = r.json()['answer'] +``` + +## 配置说明 + +修改 `config.py`: + +```python +# LLM配置 +LLM_CONFIG = { + "api_base": "http://192.168.2.5:1234/v1", + "model": "qwen/qwen3.5-35b-a3b", +} + +# 索引配置 +INDEX_CONFIG = { + "bm25_k1": 1.5, # BM25参数 + "bm25_b": 0.75, + "max_results": 20, +} + +# 文档处理 +DOC_CONFIG = { + "chunk_size": 2000, # 分块大小 + "max_keywords": 20, # 最大关键词数 +} +``` + +## 目录结构 + +``` +llm-index-rag/ +├── app.py # Flask应用 +├── config.py # 配置文件 +├── models.py # 数据模型 +├── services.py # 核心服务 +├── requirements.txt # 依赖 +├── documents/ # 文档存储 +├── indexes/ # 索引文件 +├── templates/ # HTML模板 +│ ├── index.html +│ ├── documents.html +│ └── search.html +└── static/ # 静态资源 +``` + +## 对比向量RAG + +| 维度 | 向量RAG | 索引RAG(本系统) | +|------|---------|------------------| +| 依赖 | 向量模型、向量库 | 仅LLM | +| 精确匹配 | 较弱 | 强 | +| 语义理解 | 强 | 中等 | +| 可解释性 | 弱 | 强 | +| 部署复杂度 | 高 | 低 | +| 资源消耗 | 高 | 中等 | + +## 扩展建议 + +1. **中文分词** - 集成jieba等专业分词 +2. **同义词库** - 添加领域同义词扩展 +3. **混合检索** - 结合向量检索 +4. **增量索引** - 支持实时更新 +5. **分布式** - 支持大规模文档 + +## License + +MIT \ No newline at end of file diff --git a/app.py b/app.py new file mode 100644 index 0000000..a94eb09 --- /dev/null +++ b/app.py @@ -0,0 +1,321 @@ +""" +LLM Index RAG 主应用 +""" + +import os +import json +from datetime import datetime +from flask import Flask, request, jsonify, render_template, send_file +from flask_sqlalchemy import SQLAlchemy +from werkzeug.utils import secure_filename + +from config import * +from models import db, Document, DocumentChunk, InvertedIndex, QueryLog, IndexStats +from services import DocumentIndexer, SearchEngine, RAGGenerator + +# ==================== 创建应用 ==================== +app = Flask(__name__) +app.config['SECRET_KEY'] = SECRET_KEY +app.config['SQLALCHEMY_DATABASE_URI'] = DATABASE_URL +app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False +app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024 # 50MB + +# 初始化数据库 +db.init_app(app) + +# 初始化服务 +indexer = DocumentIndexer() +search_engine = SearchEngine() +rag_generator = RAGGenerator() + + +# ==================== 初始化函数 ==================== +def init_app(): + """初始化应用""" + # 创建目录 + for dir_name in [DOCUMENT_DIR, INDEX_DIR, LOG_DIR]: + if not os.path.exists(dir_name): + os.makedirs(dir_name) + + # 创建数据库表 + with app.app_context(): + db.create_all() + # 初始化统计 + IndexStats.get_stats() + + +# ==================== 页面路由 ==================== +@app.route('/') +def index(): + """首页""" + stats = IndexStats.get_stats() + return render_template('index.html', stats=stats) + + +@app.route('/documents') +def documents(): + """文档列表页""" + page = request.args.get('page', 1, type=int) + docs = Document.query.order_by(Document.created_at.desc()).paginate(page=page, per_page=20) + return render_template('documents.html', docs=docs) + + +@app.route('/search') +def search_page(): + """搜索页""" + return render_template('search.html') + + +# ==================== API路由 ==================== + +# === 文档管理 === + +@app.route('/api/documents', methods=['GET']) +def api_list_documents(): + """获取文档列表""" + page = request.args.get('page', 1, type=int) + status = request.args.get('status', '') + + query = Document.query + + if status: + query = query.filter_by(status=status) + + docs = query.order_by(Document.created_at.desc()).paginate(page=page, per_page=20) + + return jsonify({ + 'documents': [d.to_dict() for d in docs.items], + 'total': docs.total, + 'pages': docs.pages, + 'current_page': docs.page + }) + + +@app.route('/api/documents', methods=['POST']) +def api_upload_document(): + """上传文档""" + if 'file' not in request.files: + return jsonify({'error': '未上传文件'}), 400 + + file = request.files['file'] + if file.filename == '': + return jsonify({'error': '未选择文件'}), 400 + + # 检查文件类型 + ext = os.path.splitext(file.filename)[1].lower() + if ext not in SUPPORTED_FORMATS: + return jsonify({'error': f'不支持的文件格式: {ext}'}), 400 + + # 保存文件 + filename = secure_filename(file.filename) + filepath = os.path.join(DOCUMENT_DIR, filename) + + # 避免重名 + if os.path.exists(filepath): + base, ext = os.path.splitext(filename) + filename = f"{base}_{datetime.now().strftime('%Y%m%d%H%M%S')}{ext}" + filepath = os.path.join(DOCUMENT_DIR, filename) + + file.save(filepath) + + # 创建文档记录 + doc = Document( + filename=filename, + filepath=filepath, + file_type=ext, + file_size=os.path.getsize(filepath), + title=request.form.get('title', filename), + source=request.form.get('source', ''), + ) + db.session.add(doc) + db.session.commit() + + return jsonify({ + 'success': True, + 'document': doc.to_dict() + }) + + +@app.route('/api/documents/', methods=['GET']) +def api_get_document(doc_id): + """获取文档详情""" + doc = Document.query.get_or_404(doc_id) + + chunks = DocumentChunk.query.filter_by(document_id=doc_id).all() + + return jsonify({ + 'document': doc.to_dict(), + 'chunks': [c.to_dict() for c in chunks] + }) + + +@app.route('/api/documents/', methods=['DELETE']) +def api_delete_document(doc_id): + """删除文档""" + doc = Document.query.get_or_404(doc_id) + + # 删除文件 + if os.path.exists(doc.filepath): + os.remove(doc.filepath) + + # 删除数据库记录(级联删除chunks) + db.session.delete(doc) + db.session.commit() + + # 更新统计 + IndexStats.get_stats().update_stats() + + return jsonify({'success': True}) + + +# === 索引管理 === + +@app.route('/api/index/', methods=['POST']) +def api_index_document(doc_id): + """索引单个文档""" + success = indexer.index_document(doc_id) + + if success: + return jsonify({'success': True, 'message': '索引完成'}) + else: + return jsonify({'error': '索引失败'}), 500 + + +@app.route('/api/index/batch', methods=['POST']) +def api_batch_index(): + """批量索引所有待索引文档""" + pending_docs = Document.query.filter_by(status='pending').all() + + results = {'success': 0, 'failed': 0, 'total': len(pending_docs)} + + for doc in pending_docs: + if indexer.index_document(doc.id): + results['success'] += 1 + else: + results['failed'] += 1 + + return jsonify(results) + + +@app.route('/api/index/rebuild', methods=['POST']) +def api_rebuild_index(): + """重建所有索引""" + # 清除旧索引 + InvertedIndex.query.delete() + DocumentChunk.query.delete() + + docs = Document.query.all() + for doc in docs: + doc.status = 'pending' + doc.indexed_at = None + + db.session.commit() + + # 批量索引 + return api_batch_index() + + +@app.route('/api/stats', methods=['GET']) +def api_get_stats(): + """获取索引统计""" + stats = IndexStats.get_stats() + + return jsonify({ + 'total_documents': stats.total_documents, + 'total_chunks': stats.total_chunks, + 'total_terms': stats.total_terms, + 'total_words': stats.total_words, + 'last_indexed_at': stats.last_indexed_at.isoformat() if stats.last_indexed_at else None + }) + + +# === 搜索 === + +@app.route('/api/search', methods=['POST']) +def api_search(): + """搜索文档""" + data = request.json + query = data.get('query', '') + top_k = data.get('top_k', 10) + + if not query: + return jsonify({'error': '查询不能为空'}), 400 + + results = search_engine.search(query, top_k) + + return jsonify({ + 'query': query, + 'results': results, + 'total': len(results) + }) + + +@app.route('/api/search/suggestions', methods=['GET']) +def api_search_suggestions(): + """获取搜索建议(自动补全)""" + prefix = request.args.get('prefix', '') + + if len(prefix) < 2: + return jsonify({'suggestions': []}) + + # 从倒排索引中查找匹配的词 + terms = InvertedIndex.query.filter( + InvertedIndex.term.ilike(f'{prefix}%') + ).order_by(InvertedIndex.total_freq.desc()).limit(10).all() + + return jsonify({ + 'suggestions': [t.term for t in terms] + }) + + +# === RAG === + +@app.route('/api/rag/answer', methods=['POST']) +def api_rag_answer(): + """RAG问答""" + data = request.json + query = data.get('query', '') + top_k = data.get('top_k', 5) + + if not query: + return jsonify({'error': '查询不能为空'}), 400 + + result = rag_generator.answer(query, top_k) + + return jsonify(result) + + +# === 查询日志 === + +@app.route('/api/logs', methods=['GET']) +def api_get_logs(): + """获取查询日志""" + page = request.args.get('page', 1, type=int) + + logs = QueryLog.query.order_by(QueryLog.created_at.desc()).paginate(page=page, per_page=50) + + return jsonify({ + 'logs': [l.to_dict() for l in logs.items], + 'total': logs.total, + 'pages': logs.pages + }) + + +@app.route('/api/logs//feedback', methods=['POST']) +def api_log_feedback(log_id): + """提交查询反馈""" + log = QueryLog.query.get_or_404(log_id) + + data = request.json + log.rating = data.get('rating') + log.feedback = data.get('feedback') + + db.session.commit() + + return jsonify({'success': True}) + + +# ==================== 启动 ==================== +if __name__ == '__main__': + init_app() + app.run(host=API_HOST, port=API_PORT, debug=True) \ No newline at end of file diff --git a/config.py b/config.py new file mode 100644 index 0000000..c4fbe78 --- /dev/null +++ b/config.py @@ -0,0 +1,65 @@ +""" +LLM Index RAG 配置文件 +基于索引和搜索的RAG系统(不使用向量模型) +""" + +# ==================== 应用配置 ==================== +APP_NAME = "LLM Index RAG" +APP_VERSION = "1.0.0" +SECRET_KEY = "llm-index-rag-secret-key" + +# ==================== LLM配置 ==================== +LLM_CONFIG = { + "api_base": "http://192.168.2.5:1234/v1", + "api_key": "sk-lm-fuP5tGU8:Hi7YU87jHyDP6Ay8Tl2j", + "model": "qwen/qwen3.5-35b-a3b", + "max_tokens": 4000, + "temperature": 0.3, + "timeout": 120, +} + +# ==================== 文档配置 ==================== +DOCUMENT_DIR = "documents" # 文档存储目录 +INDEX_DIR = "indexes" # 索引存储目录 +LOG_DIR = "logs" # 日志目录 + +SUPPORTED_FORMATS = ['.txt', '.md', '.pdf', '.docx', '.html', '.json'] + +# 文档处理配置 +DOC_CONFIG = { + "chunk_size": 2000, # 文档分块大小 + "chunk_overlap": 200, # 分块重叠 + "max_keywords": 20, # 每个文档块最大关键词数 + "max_summary_length": 500, # 摘要最大长度 +} + +# ==================== 索引配置 ==================== +INDEX_CONFIG = { + # BM25参数 + "bm25_k1": 1.5, # 词频饱和参数 + "bm25_b": 0.75, # 文档长度归一化参数 + + # 检索配置 + "max_results": 20, # 最大返回结果数 + "min_score": 0.1, # 最低相关性分数 + + # 关键词权重 + "title_weight": 3.0, # 标题关键词权重 + "keyword_weight": 2.0, # 显式关键词权重 + "content_weight": 1.0, # 内容关键词权重 + "summary_weight": 1.5, # 摘要关键词权重 +} + +# ==================== 查询配置 ==================== +QUERY_CONFIG = { + "max_expansion_terms": 5, # 查询扩展最大词数 + "use_query_expansion": True, # 是否启用查询扩展 + "use_rerank": True, # 是否使用重排 +} + +# ==================== 数据库配置 ==================== +DATABASE_URL = "sqlite:///llm_index_rag.db" + +# ==================== API配置 ==================== +API_HOST = "0.0.0.0" +API_PORT = 19001 \ No newline at end of file diff --git a/models.py b/models.py new file mode 100644 index 0000000..44ad38c --- /dev/null +++ b/models.py @@ -0,0 +1,275 @@ +""" +数据库模型定义 +""" + +from datetime import datetime +from flask_sqlalchemy import SQLAlchemy +import json + +db = SQLAlchemy() + + +class Document(db.Model): + """文档表""" + __tablename__ = 'documents' + + id = db.Column(db.Integer, primary_key=True) + + # 文件信息 + filename = db.Column(db.String(255), nullable=False) + filepath = db.Column(db.String(512), nullable=False) + file_type = db.Column(db.String(20), nullable=False) + file_size = db.Column(db.Integer, default=0) + + # 文档元数据 + title = db.Column(db.String(500), nullable=True) + author = db.Column(db.String(100), nullable=True) + source = db.Column(db.String(255), nullable=True) + + # 处理状态 + status = db.Column(db.String(20), default='pending') # pending, processing, indexed, failed + error_message = db.Column(db.Text, nullable=True) + + # 文档内容(可选存储原文) + content = db.Column(db.Text, nullable=True) + + # 文档摘要(LLM生成) + summary = db.Column(db.Text, nullable=True) + + # 主要关键词(JSON数组) + keywords = db.Column(db.Text, nullable=True) + + # 文档分类/主题 + category = db.Column(db.String(100), nullable=True) + topics = db.Column(db.Text, nullable=True) # JSON数组 + + # 文档实体(人物、地点、组织等) + entities = db.Column(db.Text, nullable=True) # JSON对象 + + # 统计信息 + chunk_count = db.Column(db.Integer, default=0) # 分块数量 + word_count = db.Column(db.Integer, default=0) # 字数 + + # 时间戳 + created_at = db.Column(db.DateTime, default=datetime.utcnow) + indexed_at = db.Column(db.DateTime, nullable=True) + updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + # 关系 + chunks = db.relationship('DocumentChunk', backref='document', lazy=True, cascade='all, delete-orphan') + + def get_keywords(self): + """获取关键词列表""" + if self.keywords: + return json.loads(self.keywords) + return [] + + def set_keywords(self, keywords): + """设置关键词""" + self.keywords = json.dumps(keywords, ensure_ascii=False) + + def get_topics(self): + """获取主题列表""" + if self.topics: + return json.loads(self.topics) + return [] + + def set_topics(self, topics): + """设置主题""" + self.topics = json.dumps(topics, ensure_ascii=False) + + def get_entities(self): + """获取实体""" + if self.entities: + return json.loads(self.entities) + return {} + + def set_entities(self, entities): + """设置实体""" + self.entities = json.dumps(entities, ensure_ascii=False) + + def to_dict(self): + return { + 'id': self.id, + 'filename': self.filename, + 'title': self.title, + 'status': self.status, + 'summary': self.summary, + 'keywords': self.get_keywords(), + 'category': self.category, + 'chunk_count': self.chunk_count, + 'word_count': self.word_count, + 'created_at': self.created_at.isoformat() if self.created_at else None, + 'indexed_at': self.indexed_at.isoformat() if self.indexed_at else None, + } + + +class DocumentChunk(db.Model): + """文档分块表""" + __tablename__ = 'document_chunks' + + id = db.Column(db.Integer, primary_key=True) + document_id = db.Column(db.Integer, db.ForeignKey('documents.id'), nullable=False) + + # 分块信息 + chunk_index = db.Column(db.Integer, default=0) # 块序号 + content = db.Column(db.Text, nullable=False) # 块内容 + + # LLM生成的索引信息 + summary = db.Column(db.Text, nullable=True) # 块摘要 + keywords = db.Column(db.Text, nullable=True) # 关键词JSON数组 + topics = db.Column(db.Text, nullable=True) # 主题JSON数组 + + # 位置信息 + start_char = db.Column(db.Integer, default=0) + end_char = db.Column(db.Integer, default=0) + + # 词频统计(用于BM25) + term_freq = db.Column(db.Text, nullable=True) # JSON对象 {term: count} + + created_at = db.Column(db.DateTime, default=datetime.utcnow) + + def get_keywords(self): + if self.keywords: + return json.loads(self.keywords) + return [] + + def set_keywords(self, keywords): + self.keywords = json.dumps(keywords, ensure_ascii=False) + + def get_term_freq(self): + if self.term_freq: + return json.loads(self.term_freq) + return {} + + def set_term_freq(self, tf): + self.term_freq = json.dumps(tf, ensure_ascii=False) + + def to_dict(self): + return { + 'id': self.id, + 'document_id': self.document_id, + 'chunk_index': self.chunk_index, + 'content': self.content[:200] + '...' if len(self.content) > 200 else self.content, + 'summary': self.summary, + 'keywords': self.get_keywords(), + } + + +class InvertedIndex(db.Model): + """倒排索引表""" + __tablename__ = 'inverted_index' + + id = db.Column(db.Integer, primary_key=True) + + # 索引项 + term = db.Column(db.String(100), nullable=False, index=True) # 关键词 + term_type = db.Column(db.String(20), default='keyword') # keyword, entity, topic + + # 文档频率 + doc_freq = db.Column(db.Integer, default=0) # 包含该词的文档数 + + # 倒排列表(JSON:[{doc_id, chunk_id, tf, positions}]) + postings = db.Column(db.Text, nullable=False) + + # 统计 + total_freq = db.Column(db.Integer, default=0) # 总词频 + + updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + def get_postings(self): + if self.postings: + return json.loads(self.postings) + return [] + + def set_postings(self, postings): + self.postings = json.dumps(postings, ensure_ascii=False) + self.doc_freq = len(set(p['doc_id'] for p in postings)) + self.total_freq = sum(p.get('tf', 1) for p in postings) + + @staticmethod + def get_or_create(term, term_type='keyword'): + """获取或创建索引项""" + index = InvertedIndex.query.filter_by(term=term, term_type=term_type).first() + if not index: + index = InvertedIndex(term=term, term_type=term_type, postings='[]') + db.session.add(index) + return index + + +class QueryLog(db.Model): + """查询日志表""" + __tablename__ = 'query_logs' + + id = db.Column(db.Integer, primary_key=True) + + # 查询信息 + original_query = db.Column(db.Text, nullable=False) # 原始查询 + processed_query = db.Column(db.Text, nullable=True) # 处理后的查询 + expanded_terms = db.Column(db.Text, nullable=True) # 扩展词JSON数组 + + # 查询意图 + intent = db.Column(db.String(50), nullable=True) # 查询意图 + entities = db.Column(db.Text, nullable=True) # 识别的实体 + + # 检索结果 + result_count = db.Column(db.Integer, default=0) + top_doc_ids = db.Column(db.Text, nullable=True) # 返回的文档ID JSON数组 + + # 性能 + retrieval_time = db.Column(db.Float, default=0) # 检索耗时(秒) + total_time = db.Column(db.Float, default=0) # 总耗时(秒) + + # 用户反馈 + rating = db.Column(db.Integer, nullable=True) # 评分1-5 + feedback = db.Column(db.Text, nullable=True) # 反馈文本 + + created_at = db.Column(db.DateTime, default=datetime.utcnow) + + def to_dict(self): + return { + 'id': self.id, + 'query': self.original_query, + 'result_count': self.result_count, + 'retrieval_time': self.retrieval_time, + 'created_at': self.created_at.isoformat() if self.created_at else None, + } + + +class IndexStats(db.Model): + """索引统计表""" + __tablename__ = 'index_stats' + + id = db.Column(db.Integer, primary_key=True) + + # 统计信息 + total_documents = db.Column(db.Integer, default=0) + total_chunks = db.Column(db.Integer, default=0) + total_terms = db.Column(db.Integer, default=0) + total_words = db.Column(db.Integer, default=0) + + # 索引大小 + index_size_mb = db.Column(db.Float, default=0) + + # 最后更新 + last_indexed_at = db.Column(db.DateTime, nullable=True) + + updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + @staticmethod + def get_stats(): + stats = IndexStats.query.first() + if not stats: + stats = IndexStats() + db.session.add(stats) + db.session.commit() + return stats + + def update_stats(self): + """更新统计信息""" + self.total_documents = Document.query.filter_by(status='indexed').count() + self.total_chunks = DocumentChunk.query.count() + self.total_terms = InvertedIndex.query.count() + self.total_words = db.session.query(db.func.sum(Document.word_count)).scalar() or 0 + self.last_indexed_at = datetime.utcnow() + db.session.commit() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..ffb41ce --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +flask>=2.3.0 +flask-sqlalchemy>=3.0.0 +openai>=2.0.0 +pypdf>=6.0.0 +python-docx>=0.8.11 \ No newline at end of file diff --git a/services.py b/services.py new file mode 100644 index 0000000..4b08e77 --- /dev/null +++ b/services.py @@ -0,0 +1,655 @@ +""" +文档索引服务 +使用LLM分析文档并构建索引 +""" + +import os +import re +import json +import math +from datetime import datetime +from collections import Counter +from openai import OpenAI +from flask import current_app + +from config import LLM_CONFIG, DOC_CONFIG, INDEX_CONFIG +from models import db, Document, DocumentChunk, InvertedIndex, IndexStats + + +class LLMService: + """LLM服务封装""" + + def __init__(self): + self.client = OpenAI( + api_key=LLM_CONFIG['api_key'], + base_url=LLM_CONFIG['api_base'], + ) + self.model = LLM_CONFIG['model'] + self.max_tokens = LLM_CONFIG['max_tokens'] + self.temperature = LLM_CONFIG['temperature'] + + def analyze_document(self, content, title=None): + """ + 分析文档,提取关键信息 + + Returns: + dict: {summary, keywords, topics, entities, category} + """ + prompt = f"""请分析以下文档内容,提取关键信息。 + +{'文档标题:' + title if title else ''} + +文档内容(前3000字): +{content[:3000]} + +请以JSON格式返回以下信息: +{{ + "summary": "文档摘要(100-200字)", + "keywords": ["关键词1", "关键词2", ...最多10个], + "topics": ["主题1", "主题2", ...最多5个], + "category": "文档分类", + "entities": {{ + "persons": ["人名"], + "organizations": ["组织名"], + "locations": ["地点"], + "dates": ["日期"], + "others": ["其他实体"] + }} +}} + +只返回JSON,不要其他内容。""" + + try: + response = self.client.chat.completions.create( + model=self.model, + messages=[{"role": "user", "content": prompt}], + max_tokens=1000, + temperature=0.3, + ) + + result = response.choices[0].message.content.strip() + # 清理可能的markdown标记 + result = re.sub(r'^```json\s*', '', result) + result = re.sub(r'\s*```$', '', result) + + return json.loads(result) + + except Exception as e: + print(f"LLM分析失败: {e}") + return { + "summary": "", + "keywords": [], + "topics": [], + "category": "", + "entities": {} + } + + def analyze_chunk(self, content): + """ + 分析文档块,提取关键词 + + Returns: + dict: {summary, keywords, topics} + """ + prompt = f"""分析以下文本片段,提取关键信息。 + +文本: +{content[:1500]} + +请以JSON格式返回: +{{ + "summary": "片段摘要(50字以内)", + "keywords": ["关键词", ...最多8个], + "topics": ["主题", ...最多3个] +}} + +只返回JSON。""" + + try: + response = self.client.chat.completions.create( + model=self.model, + messages=[{"role": "user", "content": prompt}], + max_tokens=500, + temperature=0.3, + ) + + result = response.choices[0].message.content.strip() + result = re.sub(r'^```json\s*', '', result) + result = re.sub(r'\s*```$', '', result) + + return json.loads(result) + + except Exception as e: + return {"summary": "", "keywords": [], "topics": []} + + def process_query(self, query): + """ + 处理查询,提取意图和关键词 + + Returns: + dict: {intent, keywords, expanded_terms, entities} + """ + prompt = f"""分析以下查询,提取搜索意图和关键词。 + +查询:{query} + +请以JSON格式返回: +{{ + "intent": "查询意图(如:查找信息、比较、解释、列表等)", + "keywords": ["主要关键词", ...最多10个], + "expanded_terms": ["同义词或相关词", ...最多5个], + "entities": {{ + "persons": [], + "organizations": [], + "locations": [], + "dates": [], + "others": [] + }} +}} + +只返回JSON。""" + + try: + response = self.client.chat.completions.create( + model=self.model, + messages=[{"role": "user", "content": prompt}], + max_tokens=500, + temperature=0.3, + ) + + result = response.choices[0].message.content.strip() + result = re.sub(r'^```json\s*', '', result) + result = re.sub(r'\s*```$', '', result) + + return json.loads(result) + + except Exception as e: + return { + "intent": "search", + "keywords": query.split(), + "expanded_terms": [], + "entities": {} + } + + +class DocumentIndexer: + """文档索引器""" + + def __init__(self): + self.llm = LLMService() + self.chunk_size = DOC_CONFIG['chunk_size'] + self.chunk_overlap = DOC_CONFIG['chunk_overlap'] + + def index_document(self, doc_id): + """ + 索引单个文档 + + Args: + doc_id: 文档ID + + Returns: + bool: 是否成功 + """ + doc = Document.query.get(doc_id) + if not doc: + return False + + try: + doc.status = 'processing' + db.session.commit() + + # 读取文档内容 + content = self._read_document(doc.filepath) + if not content: + raise Exception("无法读取文档内容") + + # 存储原文 + doc.content = content + doc.word_count = len(content) + + # 使用LLM分析整个文档 + print(f" 正在分析文档: {doc.filename}") + analysis = self.llm.analyze_document(content, doc.title) + + doc.summary = analysis.get('summary', '') + doc.set_keywords(analysis.get('keywords', [])) + doc.set_topics(analysis.get('topics', [])) + doc.category = analysis.get('category', '') + doc.set_entities(analysis.get('entities', {})) + + # 分块处理 + chunks = self._split_content(content) + doc.chunk_count = len(chunks) + + # 清理旧分块 + DocumentChunk.query.filter_by(document_id=doc.id).delete() + + # 索引每个分块 + for i, chunk_content in enumerate(chunks): + chunk = DocumentChunk( + document_id=doc.id, + chunk_index=i, + content=chunk_content, + start_char=0, + end_char=len(chunk_content) + ) + + # LLM分析分块 + chunk_analysis = self.llm.analyze_chunk(chunk_content) + chunk.summary = chunk_analysis.get('summary', '') + chunk.set_keywords(chunk_analysis.get('keywords', [])) + + # 计算词频 + term_freq = self._compute_term_freq(chunk_content) + chunk.set_term_freq(term_freq) + + db.session.add(chunk) + + db.session.commit() + + # 更新倒排索引 + self._update_inverted_index(doc.id) + + # 标记完成 + doc.status = 'indexed' + doc.indexed_at = datetime.utcnow() + db.session.commit() + + # 更新统计 + IndexStats.get_stats().update_stats() + + print(f" ✓ 文档索引完成: {doc.filename}") + return True + + except Exception as e: + doc.status = 'failed' + doc.error_message = str(e) + db.session.commit() + print(f" ✗ 索引失败: {e}") + return False + + def _read_document(self, filepath): + """读取文档内容""" + ext = os.path.splitext(filepath)[1].lower() + + if ext in ['.txt', '.md', '.json', '.html']: + with open(filepath, 'r', encoding='utf-8', errors='ignore') as f: + return f.read() + + elif ext == '.pdf': + try: + from pypdf import PdfReader + reader = PdfReader(filepath) + text = '' + for page in reader.pages: + text += page.extract_text() + '\n' + return text + except: + pass + + elif ext == '.docx': + try: + from docx import Document as DocxDocument + doc = DocxDocument(filepath) + return '\n'.join([p.text for p in doc.paragraphs]) + except: + pass + + return None + + def _split_content(self, content): + """ + 分割内容为块 + + Args: + content: 文档内容 + + Returns: + list: 内容块列表 + """ + chunks = [] + + # 按段落分割 + paragraphs = content.split('\n\n') + + current_chunk = "" + for para in paragraphs: + if len(current_chunk) + len(para) < self.chunk_size: + current_chunk += para + '\n\n' + else: + if current_chunk.strip(): + chunks.append(current_chunk.strip()) + current_chunk = para + '\n\n' + + if current_chunk.strip(): + chunks.append(current_chunk.strip()) + + return chunks if chunks else [content[:self.chunk_size]] + + def _compute_term_freq(self, content): + """计算词频""" + # 简单分词(中英文混合) + # 中文按字符,英文按空格 + terms = [] + + # 提取中文词汇(简单按字,实际可用jieba) + chinese = re.findall(r'[\u4e00-\u9fff]+', content) + for text in chinese: + # 简单的双字词分割 + if len(text) >= 2: + for i in range(len(text) - 1): + terms.append(text[i:i+2]) + terms.extend(list(text)) + + # 提取英文单词 + english = re.findall(r'[a-zA-Z]+', content.lower()) + terms.extend(english) + + # 统计词频 + return dict(Counter(terms)) + + def _update_inverted_index(self, doc_id): + """更新倒排索引""" + chunks = DocumentChunk.query.filter_by(document_id=doc_id).all() + + # 收集所有词及其位置 + term_postings = {} + + for chunk in chunks: + # 从词频获取词 + tf = chunk.get_term_freq() + + # 从关键词获取词 + keywords = chunk.get_keywords() + + for kw in keywords: + if kw not in term_postings: + term_postings[kw] = [] + term_postings[kw].append({ + 'doc_id': doc_id, + 'chunk_id': chunk.id, + 'tf': tf.get(kw, 1), + 'weight': INDEX_CONFIG['keyword_weight'] + }) + + for term, freq in tf.items(): + if term not in term_postings: + term_postings[term] = [] + term_postings[term].append({ + 'doc_id': doc_id, + 'chunk_id': chunk.id, + 'tf': freq, + 'weight': INDEX_CONFIG['content_weight'] + }) + + # 更新数据库 + for term, postings in term_postings.items(): + index = InvertedIndex.get_or_create(term) + existing = index.get_postings() + + # 合并postings(去除旧的同一文档的记录) + existing = [p for p in existing if p['doc_id'] != doc_id] + existing.extend(postings) + + index.set_postings(existing) + + db.session.commit() + + +class SearchEngine: + """搜索引擎""" + + def __init__(self): + self.llm = LLMService() + self.k1 = INDEX_CONFIG['bm25_k1'] + self.b = INDEX_CONFIG['bm25_b'] + + def search(self, query, top_k=10): + """ + 搜索文档 + + Args: + query: 查询字符串 + top_k: 返回结果数 + + Returns: + list: 搜索结果 [{doc, score, highlights}] + """ + start_time = datetime.now() + + # 1. LLM处理查询 + print(f"处理查询: {query}") + query_analysis = self.llm.process_query(query) + + keywords = query_analysis.get('keywords', []) + expanded = query_analysis.get('expanded_terms', []) + + # 合并关键词 + all_terms = keywords + expanded + + print(f" 关键词: {keywords}") + print(f" 扩展词: {expanded}") + + # 2. 检索 + results = self._retrieve(all_terms) + + # 3. 计算BM25分数 + scored_results = self._score_results(results, all_terms) + + # 4. 排序 + scored_results.sort(key=lambda x: x['score'], reverse=True) + + # 5. 返回top_k + final_results = scored_results[:top_k] + + retrieval_time = (datetime.now() - start_time).total_seconds() + + # 6. 记录日志 + self._log_query(query, query_analysis, final_results, retrieval_time) + + return final_results + + def _retrieve(self, terms): + """ + 检索包含关键词的文档 + + Args: + terms: 关键词列表 + + Returns: + dict: {doc_id: {chunks: [], terms: []}} + """ + results = {} + + for term in terms: + # 查询倒排索引 + index = InvertedIndex.query.filter( + InvertedIndex.term.ilike(f'%{term}%') + ).all() + + for idx in index: + postings = idx.get_postings() + + for p in postings: + doc_id = p['doc_id'] + + if doc_id not in results: + results[doc_id] = { + 'chunks': set(), + 'terms': {}, + 'postings': [] + } + + results[doc_id]['chunks'].add(p['chunk_id']) + results[doc_id]['terms'][term] = results[doc_id]['terms'].get(term, 0) + p.get('tf', 1) + results[doc_id]['postings'].append({ + 'term': term, + 'chunk_id': p['chunk_id'], + 'tf': p.get('tf', 1), + 'weight': p.get('weight', 1.0) + }) + + return results + + def _score_results(self, results, query_terms): + """ + 使用BM25计算分数 + + Args: + results: 检索结果 + query_terms: 查询词 + + Returns: + list: [{doc, score, chunks}] + """ + scored = [] + + # 计算平均文档长度 + total_docs = Document.query.filter_by(status='indexed').count() + if total_docs == 0: + return [] + + avg_doc_len = db.session.query( + db.func.avg(Document.word_count) + ).filter(Document.status == 'indexed').scalar() or 1000 + + for doc_id, data in results.items(): + doc = Document.query.get(doc_id) + if not doc or doc.status != 'indexed': + continue + + # BM25计算 + score = 0 + doc_len = doc.word_count or 1000 + + for term in query_terms: + # 查询倒排索引获取文档频率 + index = InvertedIndex.query.filter_by(term=term).first() + df = index.doc_freq if index else 1 + + # IDF + idf = math.log((total_docs - df + 0.5) / (df + 0.5) + 1) + + # TF + tf = data['terms'].get(term, 0) + + # BM25公式 + tf_component = (tf * (self.k1 + 1)) / ( + tf + self.k1 * (1 - self.b + self.b * doc_len / avg_doc_len) + ) + + score += idf * tf_component + + # 获取匹配的chunk内容 + chunk_ids = list(data['chunks'])[:3] # 最多取3个chunk + chunks = DocumentChunk.query.filter(DocumentChunk.id.in_(chunk_ids)).all() + + scored.append({ + 'doc': doc.to_dict(), + 'score': score, + 'matched_chunks': [c.to_dict() for c in chunks], + 'matched_terms': list(data['terms'].keys()) + }) + + return scored + + def _log_query(self, query, analysis, results, retrieval_time): + """记录查询日志""" + log = QueryLog( + original_query=query, + processed_query=' '.join(analysis.get('keywords', [])), + expanded_terms=json.dumps(analysis.get('expanded_terms', [])), + intent=analysis.get('intent'), + entities=json.dumps(analysis.get('entities', {})), + result_count=len(results), + top_doc_ids=json.dumps([r['doc']['id'] for r in results[:5]]), + retrieval_time=retrieval_time, + total_time=retrieval_time + ) + db.session.add(log) + db.session.commit() + + +class RAGGenerator: + """RAG生成器""" + + def __init__(self): + self.llm = LLMService() + self.search_engine = SearchEngine() + + def answer(self, query, top_k=5): + """ + RAG回答 + + Args: + query: 用户查询 + top_k: 检索文档数 + + Returns: + dict: {answer, sources, confidence} + """ + # 1. 检索相关文档 + results = self.search_engine.search(query, top_k) + + if not results: + return { + 'answer': '抱歉,没有找到相关信息。', + 'sources': [], + 'confidence': 0 + } + + # 2. 构建上下文 + context_parts = [] + sources = [] + + for i, r in enumerate(results[:3]): # 最多使用3个文档 + doc = r['doc'] + chunks = r['matched_chunks'] + + context_parts.append(f"【文档{i+1}】{doc.get('title', doc['filename'])}") + + for chunk in chunks[:2]: # 每个文档最多2个chunk + context_parts.append(chunk.get('content', '')[:500]) + + sources.append({ + 'id': doc['id'], + 'title': doc.get('title', doc['filename']), + 'score': r['score'] + }) + + context = '\n\n'.join(context_parts) + + # 3. LLM生成回答 + prompt = f"""基于以下参考信息回答问题。如果参考信息中没有相关内容,请说明。 + +问题:{query} + +参考信息: +{context} + +请给出准确、简洁的回答,并标注信息来源。""" + + try: + client = OpenAI( + api_key=LLM_CONFIG['api_key'], + base_url=LLM_CONFIG['api_base'], + ) + + response = client.chat.completions.create( + model=LLM_CONFIG['model'], + messages=[{"role": "user", "content": prompt}], + max_tokens=1000, + temperature=0.5, + ) + + answer = response.choices[0].message.content + + except Exception as e: + answer = f"生成回答时出错: {e}" + + # 4. 返回结果 + return { + 'answer': answer, + 'sources': sources, + 'confidence': min(1.0, results[0]['score'] / 10) if results else 0 + } \ No newline at end of file diff --git a/templates/documents.html b/templates/documents.html new file mode 100644 index 0000000..be4fb25 --- /dev/null +++ b/templates/documents.html @@ -0,0 +1,217 @@ + + + + + 文档管理 - LLM Index RAG + + + + + + + +
+
+

文档管理

+
+ + +
+
+ + +
+
+
上传文档
+
+
+
+
+
+ +
+
+ +
+
+ +
+
+
+
+
+ + +
+
+
文档列表
+
+
+ + + + + + + + + + + + + + + +
文件名类型大小分块数状态索引时间操作
加载中...
+
+
+ + + +
+ + + + \ No newline at end of file diff --git a/templates/index.html b/templates/index.html new file mode 100644 index 0000000..8dc96c6 --- /dev/null +++ b/templates/index.html @@ -0,0 +1,162 @@ + + + + + + LLM Index RAG - 基于索引的知识检索系统 + + + + + + + + + +
+
+

LLM Index RAG

+

基于索引和搜索的知识检索系统(不使用向量模型)

+

使用LLM构建索引 • 关键词检索 • BM25排序

+
+
+ + +
+ +
+ + +
+
+
+
+ +

{{ stats.total_documents or 0 }}

+ 文档数量 +
+
+
+
+ +

{{ stats.total_chunks or 0 }}

+ 文档分块 +
+
+
+
+ +

{{ stats.total_terms or 0 }}

+ 索引词条 +
+
+
+
+ +

{{ "{:,}".format(stats.total_words or 0) }}

+ 总字数 +
+
+
+
+ + +
+ + + + +
+ + +
+

工作原理

+
+
+
+
+ +
1. 文档索引
+

使用LLM分析文档,提取关键词、摘要、主题、实体等信息构建索引

+
+
+
+
+
+
+ +
2. 查询处理
+

LLM分析查询意图,提取关键词并进行查询扩展

+
+
+
+
+
+
+ +
3. BM25检索
+

基于倒排索引和BM25算法计算相关性得分,返回最相关文档

+
+
+
+
+
+ +
+
+

LLM Index RAG v1.0.0 | 基于索引的知识检索系统

+
+
+ + + + + \ No newline at end of file diff --git a/templates/search.html b/templates/search.html new file mode 100644 index 0000000..ceb5862 --- /dev/null +++ b/templates/search.html @@ -0,0 +1,195 @@ + + + + + 知识检索 - LLM Index RAG + + + + + + + + + + + +
+ + + + + + + + +
+ + + + \ No newline at end of file