Files
xian-favor/xian_favor/db.py
hubian 0864c99b75 feat: 新增重点关注功能
- 数据库新增 is_starred 字段,兼容旧数据库自动添加
- 所有类别数据支持一键设置/取消重点关注
- 侧边栏新增"重点关注"过滤选项,重点关注数据优先显示
- 新增数据时可直接勾选"设为重点关注"开关
- 编辑时可切换重点关注状态
- 卡片显示重点关注标记(星标图标)和特殊样式
- API新增 /api/items/<id>/star 接口用于切换重点关注状态
- 重点关注数据按创建时间倒序排列并优先显示
2026-04-19 00:11:24 +08:00

768 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""数据库操作"""
import sqlite3
import json
import shutil
import os
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any
from contextlib import contextmanager
from .config import DATABASE_URL, TODO_STATUS, PRIORITY_LEVELS
# 备份目录
BACKUP_DIR = os.path.join(os.path.dirname(DATABASE_URL), 'backups')
class Database:
"""SQLite数据库管理"""
def __init__(self, db_path: str = DATABASE_URL):
self.db_path = db_path
self._initialized = False
def _ensure_init(self):
"""确保数据库已初始化"""
if self._initialized:
return
self._init_db()
self._initialized = True
@contextmanager
def get_conn(self):
"""获取数据库连接"""
conn = sqlite3.connect(self.db_path, timeout=30.0)
conn.row_factory = sqlite3.Row
# 启用WAL模式提高并发性能
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=30000")
try:
yield conn
finally:
conn.close()
def _init_db(self):
"""初始化数据库表"""
with self.get_conn() as conn:
cursor = conn.cursor()
# 主内容表
cursor.execute("""
CREATE TABLE IF NOT EXISTS items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
type TEXT NOT NULL DEFAULT 'text',
title TEXT,
content TEXT,
url TEXT,
source TEXT,
status TEXT DEFAULT 'pending',
priority TEXT DEFAULT 'medium',
due_date TEXT,
note TEXT,
is_starred INTEGER DEFAULT 0,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""")
# 标签表
cursor.execute("""
CREATE TABLE IF NOT EXISTS tags (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
color TEXT DEFAULT '#3498db',
created_at TEXT NOT NULL
)
""")
# 内容-标签关联表
cursor.execute("""
CREATE TABLE IF NOT EXISTS item_tags (
item_id INTEGER NOT NULL,
tag_id INTEGER NOT NULL,
PRIMARY KEY (item_id, tag_id),
FOREIGN KEY (item_id) REFERENCES items(id) ON DELETE CASCADE,
FOREIGN KEY (tag_id) REFERENCES tags(id) ON DELETE CASCADE
)
""")
# 邮箱表
cursor.execute("""
CREATE TABLE IF NOT EXISTS emails (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT NOT NULL UNIQUE,
name TEXT,
created_at TEXT NOT NULL
)
""")
# 邮件发送记录表
cursor.execute("""
CREATE TABLE IF NOT EXISTS email_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
item_id INTEGER NOT NULL,
email TEXT NOT NULL,
sent_at TEXT NOT NULL,
success INTEGER DEFAULT 1,
FOREIGN KEY (item_id) REFERENCES items(id) ON DELETE CASCADE
)
""")
# 创建索引
cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_type ON items(type)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_status ON items(status)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_created ON items(created_at)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_starred ON items(is_starred)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_item_tags_item ON item_tags(item_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_item_tags_tag ON item_tags(tag_id)")
# 检查并添加 is_starred 字段(兼容旧数据库)
try:
cursor.execute("SELECT is_starred FROM items LIMIT 1")
except sqlite3.OperationalError:
cursor.execute("ALTER TABLE items ADD COLUMN is_starred INTEGER DEFAULT 0")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_starred ON items(is_starred)")
conn.commit()
# ============ Item 操作 ============
def create_item(self, type: str = "text", title: str = None, content: str = None,
url: str = None, source: str = None, status: str = "pending",
priority: str = "medium", due_date: str = None, note: str = None,
tags: List[str] = None, is_starred: bool = False) -> int:
"""创建新条目"""
self._ensure_init()
now = datetime.now().isoformat()
# 验证状态
if type == "todo" and status not in TODO_STATUS:
status = "pending"
if priority not in PRIORITY_LEVELS:
priority = "medium"
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO items (type, title, content, url, source, status, priority, due_date, note, is_starred, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (type, title, content, url, source, status, priority, due_date, note, 1 if is_starred else 0, now, now))
item_id = cursor.lastrowid
# 添加标签
if tags:
self._add_tags_to_item(conn, item_id, tags)
conn.commit()
return item_id
def get_item(self, item_id: int) -> Optional[Dict[str, Any]]:
"""获取单个条目"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM items WHERE id = ?", (item_id,))
row = cursor.fetchone()
if not row:
return None
item = dict(row)
item['tags'] = self._get_item_tags(conn, item_id)
return item
def list_items(self, type: str = None, status: str = None, tag: str = None,
keyword: str = None, starred: bool = None, limit: int = 50, offset: int = 0) -> List[Dict[str, Any]]:
"""列出条目"""
with self.get_conn() as conn:
cursor = conn.cursor()
query = "SELECT DISTINCT i.* FROM items i"
params = []
conditions = []
# 标签过滤需要JOIN
if tag:
query += " JOIN item_tags it ON i.id = it.item_id JOIN tags t ON it.tag_id = t.id"
conditions.append("t.name = ?")
params.append(tag)
if type:
conditions.append("i.type = ?")
params.append(type)
if status:
conditions.append("i.status = ?")
params.append(status)
if starred is not None:
conditions.append("i.is_starred = ?")
params.append(1 if starred else 0)
if keyword:
conditions.append("(i.title LIKE ? OR i.content LIKE ? OR i.note LIKE ?)")
keyword_pattern = f"%{keyword}%"
params.extend([keyword_pattern, keyword_pattern, keyword_pattern])
if conditions:
query += " WHERE " + " AND ".join(conditions)
# 重点关注优先显示
query += " ORDER BY i.is_starred DESC, i.created_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
cursor.execute(query, params)
items = []
for row in cursor.fetchall():
item = dict(row)
item['tags'] = self._get_item_tags(conn, item['id'])
items.append(item)
return items
def count_items(self, type: str = None, status: str = None, tag: str = None,
keyword: str = None, starred: bool = None) -> int:
"""计算符合条件的条目总数"""
with self.get_conn() as conn:
cursor = conn.cursor()
query = "SELECT COUNT(DISTINCT i.id) as count FROM items i"
params = []
conditions = []
# 标签过滤需要JOIN
if tag:
query += " JOIN item_tags it ON i.id = it.item_id JOIN tags t ON it.tag_id = t.id"
conditions.append("t.name = ?")
params.append(tag)
if type:
conditions.append("i.type = ?")
params.append(type)
if status:
conditions.append("i.status = ?")
params.append(status)
if starred is not None:
conditions.append("i.is_starred = ?")
params.append(1 if starred else 0)
if keyword:
conditions.append("(i.title LIKE ? OR i.content LIKE ? OR i.note LIKE ?)")
keyword_pattern = f"%{keyword}%"
params.extend([keyword_pattern, keyword_pattern, keyword_pattern])
if conditions:
query += " WHERE " + " AND ".join(conditions)
cursor.execute(query, params)
return cursor.fetchone()['count']
def update_item(self, item_id: int, **kwargs) -> bool:
"""更新条目"""
allowed_fields = ['type', 'title', 'content', 'url', 'source', 'status', 'priority', 'due_date', 'note', 'is_starred']
update_fields = {k: v for k, v in kwargs.items() if k in allowed_fields}
if not update_fields and 'tags' not in kwargs:
return False
now = datetime.now().isoformat()
with self.get_conn() as conn:
cursor = conn.cursor()
if update_fields:
set_clause = ", ".join(f"{k} = ?" for k in update_fields.keys())
set_clause += ", updated_at = ?"
values = list(update_fields.values()) + [now, item_id]
cursor.execute(f"UPDATE items SET {set_clause} WHERE id = ?", values)
if 'tags' in kwargs:
# 先删除旧标签关联
cursor.execute("DELETE FROM item_tags WHERE item_id = ?", (item_id,))
# 添加新标签
if kwargs['tags']:
self._add_tags_to_item(conn, item_id, kwargs['tags'])
conn.commit()
return cursor.rowcount > 0
def delete_item(self, item_id: int) -> bool:
"""删除条目"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM items WHERE id = ?", (item_id,))
conn.commit()
return cursor.rowcount > 0
def toggle_star(self, item_id: int) -> bool:
"""切换重点关注状态"""
with self.get_conn() as conn:
cursor = conn.cursor()
# 先获取当前状态
cursor.execute("SELECT is_starred FROM items WHERE id = ?", (item_id,))
row = cursor.fetchone()
if not row:
return False
new_status = 0 if row['is_starred'] else 1
now = datetime.now().isoformat()
cursor.execute("UPDATE items SET is_starred = ?, updated_at = ? WHERE id = ?", (new_status, now, item_id))
conn.commit()
return True
def set_star(self, item_id: int, starred: bool = True) -> bool:
"""设置重点关注状态"""
with self.get_conn() as conn:
cursor = conn.cursor()
now = datetime.now().isoformat()
cursor.execute("UPDATE items SET is_starred = ?, updated_at = ? WHERE id = ?", (1 if starred else 0, now, item_id))
conn.commit()
return cursor.rowcount > 0
# ============ Tag 操作 ============
def create_tag(self, name: str, color: str = "#3498db") -> int:
"""创建标签"""
now = datetime.now().isoformat()
with self.get_conn() as conn:
cursor = conn.cursor()
try:
cursor.execute("INSERT INTO tags (name, color, created_at) VALUES (?, ?, ?)",
(name, color, now))
conn.commit()
return cursor.lastrowid
except sqlite3.IntegrityError:
# 标签已存在
cursor.execute("SELECT id FROM tags WHERE name = ?", (name,))
return cursor.fetchone()['id']
def list_tags(self) -> List[Dict[str, Any]]:
"""列出所有标签"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT t.*, COUNT(it.item_id) as item_count
FROM tags t
LEFT JOIN item_tags it ON t.id = it.tag_id
GROUP BY t.id
ORDER BY t.name
""")
return [dict(row) for row in cursor.fetchall()]
def update_tag(self, tag_id: int, name: str) -> bool:
"""更新标签名称"""
with self.get_conn() as conn:
cursor = conn.cursor()
# 检查名称是否已存在(排除自己)
cursor.execute("SELECT id FROM tags WHERE name = ? AND id != ?", (name, tag_id))
if cursor.fetchone():
return False # 名称已存在
cursor.execute("UPDATE tags SET name = ? WHERE id = ?", (name, tag_id))
conn.commit()
return cursor.rowcount > 0
def delete_tag(self, tag_id: int = None, name: str = None) -> bool:
"""删除标签"""
with self.get_conn() as conn:
cursor = conn.cursor()
if name:
cursor.execute("DELETE FROM tags WHERE name = ?", (name,))
elif tag_id:
cursor.execute("DELETE FROM tags WHERE id = ?", (tag_id,))
conn.commit()
return cursor.rowcount > 0
# ============ 辅助方法 ============
def _add_tags_to_item(self, conn, item_id: int, tags: List[str]):
"""为条目添加标签"""
cursor = conn.cursor()
for tag_name in tags:
tag_name = tag_name.strip()
if not tag_name:
continue
# 确保标签存在 - 使用同一个连接
cursor.execute("SELECT id FROM tags WHERE name = ?", (tag_name,))
row = cursor.fetchone()
if row:
tag_id = row['id']
else:
# 创建新标签
now = datetime.now().isoformat()
cursor.execute("INSERT INTO tags (name, color, created_at) VALUES (?, '#3498db', ?)",
(tag_name, now))
tag_id = cursor.lastrowid
# 创建关联
cursor.execute("INSERT OR IGNORE INTO item_tags (item_id, tag_id) VALUES (?, ?)",
(item_id, tag_id))
def _get_item_tags(self, conn, item_id: int) -> List[str]:
"""获取条目的标签"""
cursor = conn.cursor()
cursor.execute("""
SELECT t.name FROM tags t
JOIN item_tags it ON t.id = it.tag_id
WHERE it.item_id = ?
ORDER BY t.name
""", (item_id,))
return [row['name'] for row in cursor.fetchall()]
# ============ Email 操作 ============
def create_email(self, email: str, name: str = None) -> int:
"""创建邮箱"""
now = datetime.now().isoformat()
with self.get_conn() as conn:
cursor = conn.cursor()
try:
cursor.execute("INSERT INTO emails (email, name, created_at) VALUES (?, ?, ?)",
(email, name, now))
conn.commit()
return cursor.lastrowid
except sqlite3.IntegrityError:
# 邮箱已存在返回已有ID
cursor.execute("SELECT id FROM emails WHERE email = ?", (email,))
return cursor.fetchone()['id']
def list_emails(self) -> List[Dict[str, Any]]:
"""列出所有邮箱"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM emails ORDER BY created_at DESC")
return [dict(row) for row in cursor.fetchall()]
def get_email(self, email_id: int) -> Optional[Dict[str, Any]]:
"""获取单个邮箱"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM emails WHERE id = ?", (email_id,))
row = cursor.fetchone()
return dict(row) if row else None
def update_email(self, email_id: int, email: str = None, name: str = None) -> bool:
"""更新邮箱"""
now = datetime.now().isoformat()
with self.get_conn() as conn:
cursor = conn.cursor()
if email:
# 检查邮箱是否已存在(排除自己)
cursor.execute("SELECT id FROM emails WHERE email = ? AND id != ?", (email, email_id))
if cursor.fetchone():
return False # 邎箱已存在
if email and name:
cursor.execute("UPDATE emails SET email = ?, name = ? WHERE id = ?", (email, name, email_id))
elif email:
cursor.execute("UPDATE emails SET email = ? WHERE id = ?", (email, email_id))
elif name:
cursor.execute("UPDATE emails SET name = ? WHERE id = ?", (name, email_id))
conn.commit()
return cursor.rowcount > 0
def delete_email(self, email_id: int) -> bool:
"""删除邮箱"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM emails WHERE id = ?", (email_id,))
conn.commit()
return cursor.rowcount > 0
# ============ 邮件日志 操作 ============
def log_email_send(self, item_id: int, email: str, success: bool = True) -> int:
"""记录邮件发送"""
now = datetime.now().isoformat()
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO email_logs (item_id, email, sent_at, success)
VALUES (?, ?, ?, ?)
""", (item_id, email, now, 1 if success else 0))
conn.commit()
return cursor.lastrowid
def get_email_logs(self, item_id: int) -> List[Dict[str, Any]]:
"""获取收藏的邮件发送记录"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM email_logs
WHERE item_id = ?
ORDER BY sent_at DESC
""", (item_id,))
return [dict(row) for row in cursor.fetchall()]
def stats(self) -> Dict[str, Any]:
"""获取统计信息"""
self._ensure_init()
with self.get_conn() as conn:
cursor = conn.cursor()
stats = {}
# 总数
cursor.execute("SELECT COUNT(*) as count FROM items")
stats['total'] = cursor.fetchone()['count']
# 按类型统计
cursor.execute("SELECT type, COUNT(*) as count FROM items GROUP BY type")
stats['by_type'] = {row['type']: row['count'] for row in cursor.fetchall()}
# 待办状态统计
cursor.execute("SELECT status, COUNT(*) as count FROM items WHERE type = 'todo' GROUP BY status")
stats['todo_status'] = {row['status']: row['count'] for row in cursor.fetchall()}
# 标签数
cursor.execute("SELECT COUNT(*) as count FROM tags")
stats['tags'] = cursor.fetchone()['count']
return stats
# ============ 提醒相关 ============
def get_reminders(self) -> Dict[str, Any]:
"""获取提醒信息:即将到期和已过期的待办"""
self._ensure_init()
with self.get_conn() as conn:
cursor = conn.cursor()
now = datetime.now()
reminders = {
'overdue': [], # 已过期
'due_today': [], # 今天到期
'due_soon': [] # 24小时内到期不含今天
}
# 查询未完成的待办(有截止日期的)
cursor.execute("""
SELECT * FROM items
WHERE type = 'todo'
AND status != 'completed'
AND due_date IS NOT NULL
AND due_date != ''
ORDER BY due_date ASC
""")
for row in cursor.fetchall():
item = dict(row)
item['tags'] = self._get_item_tags(conn, item['id'])
try:
due_date_str = item['due_date']
# 支持多种日期格式
if 'T' in due_date_str:
# ISO 格式2026-04-16T14:30
due_date = datetime.strptime(due_date_str[:16], '%Y-%m-%dT%H:%M')
elif len(due_date_str) == 10:
# 只有日期2026-04-16视为当天 23:59:59
due_date = datetime.strptime(due_date_str, '%Y-%m-%d').replace(hour=23, minute=59, second=59)
else:
# 其他格式,尝试解析
due_date = datetime.strptime(due_date_str.split('.')[0], '%Y-%m-%dT%H:%M:%S')
# 计算距离到期的时间
time_left = due_date - now
if time_left.total_seconds() < 0:
# 已过期
days_overdue = abs(int(time_left.total_seconds() / 86400))
item['days_overdue'] = days_overdue
reminders['overdue'].append(item)
elif time_left.total_seconds() < 86400: # 24小时内
# 判断是今天还是明天
if due_date.date() == now.date():
reminders['due_today'].append(item)
else:
reminders['due_soon'].append(item)
elif due_date.date() == now.date():
# 今天到期超过24小时的情况比如现在凌晨截止时间是晚上
reminders['due_today'].append(item)
except (ValueError, AttributeError) as e:
# 日期格式错误,跳过
continue
# 统计总数
reminders['total'] = len(reminders['overdue']) + len(reminders['due_today']) + len(reminders['due_soon'])
return reminders
# ============ 备份操作 ============
def create_backup(self, manual: bool = False) -> Dict[str, Any]:
"""创建数据库备份"""
import os
# 确保备份目录存在
os.makedirs(BACKUP_DIR, exist_ok=True)
now = datetime.now()
backup_name = now.strftime('%Y-%m-%d_%H%M%S')
if manual:
backup_name += '_manual'
backup_path = os.path.join(BACKUP_DIR, f'{backup_name}.db')
# 复制数据库文件
shutil.copy2(self.db_path, backup_path)
# 获取备份信息
backup_info = {
'name': backup_name,
'path': backup_path,
'size': os.path.getsize(backup_path),
'created_at': now.isoformat(),
'manual': manual,
'is_first_of_month': now.day == 1
}
# 保存备份元数据
self._save_backup_meta(backup_info)
# 清理旧备份
self._cleanup_old_backups()
return backup_info
def list_backups(self) -> List[Dict[str, Any]]:
"""列出所有备份"""
import os
if not os.path.exists(BACKUP_DIR):
return []
# 读取备份元数据
meta_path = os.path.join(BACKUP_DIR, 'backup_meta.json')
if os.path.exists(meta_path):
with open(meta_path, 'r') as f:
backups = json.load(f)
else:
# 从文件重建元数据
backups = []
for f in os.listdir(BACKUP_DIR):
if f.endswith('.db'):
path = os.path.join(BACKUP_DIR, f)
backups.append({
'name': f.replace('.db', ''),
'path': path,
'size': os.path.getsize(path),
'created_at': datetime.fromtimestamp(os.path.getmtime(path)).isoformat(),
'manual': '_manual' in f,
'is_first_of_month': self._is_first_of_month_filename(f)
})
# 按时间倒序排列
backups.sort(key=lambda x: x['created_at'], reverse=True)
return backups
def restore_backup(self, backup_name: str) -> bool:
"""恢复备份"""
import os
backup_path = os.path.join(BACKUP_DIR, f'{backup_name}.db')
if not os.path.exists(backup_path):
return False
# 先备份当前数据库(以防万一)
current_backup = self.db_path + '.before_restore'
shutil.copy2(self.db_path, current_backup)
# 恢复备份
shutil.copy2(backup_path, self.db_path)
return True
def delete_backup(self, backup_name: str) -> bool:
"""删除备份"""
import os
backup_path = os.path.join(BACKUP_DIR, f'{backup_name}.db')
if not os.path.exists(backup_path):
return False
os.remove(backup_path)
# 更新元数据
self._remove_backup_meta(backup_name)
return True
def _save_backup_meta(self, backup_info: Dict[str, Any]):
"""保存备份元数据"""
import os
meta_path = os.path.join(BACKUP_DIR, 'backup_meta.json')
# 读取现有元数据
backups = []
if os.path.exists(meta_path):
with open(meta_path, 'r') as f:
backups = json.load(f)
# 添加新备份
backups.append(backup_info)
# 保存
with open(meta_path, 'w') as f:
json.dump(backups, f, indent=2)
def _remove_backup_meta(self, backup_name: str):
"""从元数据中删除备份"""
import os
meta_path = os.path.join(BACKUP_DIR, 'backup_meta.json')
if not os.path.exists(meta_path):
return
with open(meta_path, 'r') as f:
backups = json.load(f)
backups = [b for b in backups if b['name'] != backup_name]
with open(meta_path, 'w') as f:
json.dump(backups, f, indent=2)
def _is_first_of_month_filename(self, filename: str) -> bool:
"""判断是否是每月第一天的备份"""
# 格式2026-04-01_040000.db 或 2026-05-01_...
try:
date_part = filename.split('_')[0]
day = int(date_part.split('-')[2])
return day == 1
except:
return False
def _cleanup_old_backups(self):
"""清理旧备份保留30天 + 每月第一天"""
import os
backups = self.list_backups()
now = datetime.now()
keep_paths = []
for backup in backups:
backup_date = datetime.fromisoformat(backup['created_at'])
days_old = (now - backup_date).days
# 保留条件:
# 1. 手动备份永久保留最多10个
# 2. 30天内的备份
# 3. 每月第一天的备份
if backup['manual']:
# 手动备份保留但最多10个
manual_backups = [b for b in backups if b['manual']]
if manual_backups.index(backup) < 10:
keep_paths.append(backup['path'])
else:
self.delete_backup(backup['name'])
elif days_old <= 30:
keep_paths.append(backup['path'])
elif backup['is_first_of_month']:
# 每月第一天永久保留
keep_paths.append(backup['path'])
else:
# 删除
self.delete_backup(backup['name'])
# 全局数据库实例
db = Database()