Files
llm-index-rag/models.py
coder 8baecc520a fix: 修复文档列表显示问题和添加文档详情页面
修复:
- Document.to_dict() 添加 file_type 和 file_size 字段
- 文档列表正确显示类型和大小

新增:
- /documents/<id> 文档详情页面
- document_detail.html 模板(显示文档信息、分块内容、关键词等)
2026-04-09 17:02:17 +08:00

277 lines
9.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
数据库模型定义
"""
from datetime import datetime
from flask_sqlalchemy import SQLAlchemy
import json
db = SQLAlchemy()
class Document(db.Model):
"""文档表"""
__tablename__ = 'documents'
id = db.Column(db.Integer, primary_key=True)
# 文件信息
filename = db.Column(db.String(255), nullable=False)
filepath = db.Column(db.String(512), nullable=False)
file_type = db.Column(db.String(20), nullable=False)
file_size = db.Column(db.Integer, default=0)
# 文档元数据
title = db.Column(db.String(500), nullable=True)
author = db.Column(db.String(100), nullable=True)
source = db.Column(db.String(255), nullable=True)
# 处理状态
status = db.Column(db.String(20), default='pending') # pending, processing, indexed, failed
error_message = db.Column(db.Text, nullable=True)
# 文档内容(可选存储原文)
content = db.Column(db.Text, nullable=True)
# 文档摘要LLM生成
summary = db.Column(db.Text, nullable=True)
# 主要关键词JSON数组
keywords = db.Column(db.Text, nullable=True)
# 文档分类/主题
category = db.Column(db.String(100), nullable=True)
topics = db.Column(db.Text, nullable=True) # JSON数组
# 文档实体(人物、地点、组织等)
entities = db.Column(db.Text, nullable=True) # JSON对象
# 统计信息
chunk_count = db.Column(db.Integer, default=0) # 分块数量
word_count = db.Column(db.Integer, default=0) # 字数
# 时间戳
created_at = db.Column(db.DateTime, default=datetime.utcnow)
indexed_at = db.Column(db.DateTime, nullable=True)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 关系
chunks = db.relationship('DocumentChunk', backref='document', lazy=True, cascade='all, delete-orphan')
def get_keywords(self):
"""获取关键词列表"""
if self.keywords:
return json.loads(self.keywords)
return []
def set_keywords(self, keywords):
"""设置关键词"""
self.keywords = json.dumps(keywords, ensure_ascii=False)
def get_topics(self):
"""获取主题列表"""
if self.topics:
return json.loads(self.topics)
return []
def set_topics(self, topics):
"""设置主题"""
self.topics = json.dumps(topics, ensure_ascii=False)
def get_entities(self):
"""获取实体"""
if self.entities:
return json.loads(self.entities)
return {}
def set_entities(self, entities):
"""设置实体"""
self.entities = json.dumps(entities, ensure_ascii=False)
def to_dict(self):
return {
'id': self.id,
'filename': self.filename,
'file_type': self.file_type,
'file_size': self.file_size,
'title': self.title,
'status': self.status,
'summary': self.summary,
'keywords': self.get_keywords(),
'category': self.category,
'chunk_count': self.chunk_count,
'word_count': self.word_count,
'created_at': self.created_at.isoformat() if self.created_at else None,
'indexed_at': self.indexed_at.isoformat() if self.indexed_at else None,
}
class DocumentChunk(db.Model):
"""文档分块表"""
__tablename__ = 'document_chunks'
id = db.Column(db.Integer, primary_key=True)
document_id = db.Column(db.Integer, db.ForeignKey('documents.id'), nullable=False)
# 分块信息
chunk_index = db.Column(db.Integer, default=0) # 块序号
content = db.Column(db.Text, nullable=False) # 块内容
# LLM生成的索引信息
summary = db.Column(db.Text, nullable=True) # 块摘要
keywords = db.Column(db.Text, nullable=True) # 关键词JSON数组
topics = db.Column(db.Text, nullable=True) # 主题JSON数组
# 位置信息
start_char = db.Column(db.Integer, default=0)
end_char = db.Column(db.Integer, default=0)
# 词频统计用于BM25
term_freq = db.Column(db.Text, nullable=True) # JSON对象 {term: count}
created_at = db.Column(db.DateTime, default=datetime.utcnow)
def get_keywords(self):
if self.keywords:
return json.loads(self.keywords)
return []
def set_keywords(self, keywords):
self.keywords = json.dumps(keywords, ensure_ascii=False)
def get_term_freq(self):
if self.term_freq:
return json.loads(self.term_freq)
return {}
def set_term_freq(self, tf):
self.term_freq = json.dumps(tf, ensure_ascii=False)
def to_dict(self):
return {
'id': self.id,
'document_id': self.document_id,
'chunk_index': self.chunk_index,
'content': self.content[:200] + '...' if len(self.content) > 200 else self.content,
'summary': self.summary,
'keywords': self.get_keywords(),
}
class InvertedIndex(db.Model):
"""倒排索引表"""
__tablename__ = 'inverted_index'
id = db.Column(db.Integer, primary_key=True)
# 索引项
term = db.Column(db.String(100), nullable=False, index=True) # 关键词
term_type = db.Column(db.String(20), default='keyword') # keyword, entity, topic
# 文档频率
doc_freq = db.Column(db.Integer, default=0) # 包含该词的文档数
# 倒排列表JSON[{doc_id, chunk_id, tf, positions}]
postings = db.Column(db.Text, nullable=False)
# 统计
total_freq = db.Column(db.Integer, default=0) # 总词频
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def get_postings(self):
if self.postings:
return json.loads(self.postings)
return []
def set_postings(self, postings):
self.postings = json.dumps(postings, ensure_ascii=False)
self.doc_freq = len(set(p['doc_id'] for p in postings))
self.total_freq = sum(p.get('tf', 1) for p in postings)
@staticmethod
def get_or_create(term, term_type='keyword'):
"""获取或创建索引项"""
index = InvertedIndex.query.filter_by(term=term, term_type=term_type).first()
if not index:
index = InvertedIndex(term=term, term_type=term_type, postings='[]')
db.session.add(index)
return index
class QueryLog(db.Model):
"""查询日志表"""
__tablename__ = 'query_logs'
id = db.Column(db.Integer, primary_key=True)
# 查询信息
original_query = db.Column(db.Text, nullable=False) # 原始查询
processed_query = db.Column(db.Text, nullable=True) # 处理后的查询
expanded_terms = db.Column(db.Text, nullable=True) # 扩展词JSON数组
# 查询意图
intent = db.Column(db.String(50), nullable=True) # 查询意图
entities = db.Column(db.Text, nullable=True) # 识别的实体
# 检索结果
result_count = db.Column(db.Integer, default=0)
top_doc_ids = db.Column(db.Text, nullable=True) # 返回的文档ID JSON数组
# 性能
retrieval_time = db.Column(db.Float, default=0) # 检索耗时(秒)
total_time = db.Column(db.Float, default=0) # 总耗时(秒)
# 用户反馈
rating = db.Column(db.Integer, nullable=True) # 评分1-5
feedback = db.Column(db.Text, nullable=True) # 反馈文本
created_at = db.Column(db.DateTime, default=datetime.utcnow)
def to_dict(self):
return {
'id': self.id,
'query': self.original_query,
'result_count': self.result_count,
'retrieval_time': self.retrieval_time,
'created_at': self.created_at.isoformat() if self.created_at else None,
}
class IndexStats(db.Model):
"""索引统计表"""
__tablename__ = 'index_stats'
id = db.Column(db.Integer, primary_key=True)
# 统计信息
total_documents = db.Column(db.Integer, default=0)
total_chunks = db.Column(db.Integer, default=0)
total_terms = db.Column(db.Integer, default=0)
total_words = db.Column(db.Integer, default=0)
# 索引大小
index_size_mb = db.Column(db.Float, default=0)
# 最后更新
last_indexed_at = db.Column(db.DateTime, nullable=True)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
@staticmethod
def get_stats():
stats = IndexStats.query.first()
if not stats:
stats = IndexStats()
db.session.add(stats)
db.session.commit()
return stats
def update_stats(self):
"""更新统计信息"""
self.total_documents = Document.query.filter_by(status='indexed').count()
self.total_chunks = DocumentChunk.query.count()
self.total_terms = InvertedIndex.query.count()
self.total_words = db.session.query(db.func.sum(Document.word_count)).scalar() or 0
self.last_indexed_at = datetime.utcnow()
db.session.commit()