Files
xian-favor/xian_favor/db.py
hubian 105f4d5492 feat(v4.3.0): 新增/编辑弹框中添加文件夹选择
- 新增数据时可选择所属文件夹
- 编辑数据时可更改所属文件夹
- 切换类型时自动更新文件夹列表
2026-04-22 18:08:41 +08:00

1274 lines
50 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""数据库操作"""
import sqlite3
import json
import shutil
import os
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any
from contextlib import contextmanager
from .config import DATABASE_URL, TODO_STATUS, PRIORITY_LEVELS
# 备份目录
BACKUP_DIR = os.path.join(os.path.dirname(DATABASE_URL), 'backups')
class Database:
"""SQLite数据库管理"""
def __init__(self, db_path: str = DATABASE_URL):
self.db_path = db_path
self._initialized = False
def _ensure_init(self):
"""确保数据库已初始化"""
if self._initialized:
return
self._init_db()
self._initialized = True
@contextmanager
def get_conn(self):
"""获取数据库连接"""
conn = sqlite3.connect(self.db_path, timeout=30.0)
conn.row_factory = sqlite3.Row
# 启用WAL模式提高并发性能
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=30000")
try:
yield conn
finally:
conn.close()
def _init_db(self):
"""初始化数据库表"""
with self.get_conn() as conn:
cursor = conn.cursor()
# 主内容表
cursor.execute("""
CREATE TABLE IF NOT EXISTS items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
type TEXT NOT NULL DEFAULT 'text',
title TEXT,
content TEXT,
url TEXT,
source TEXT,
status TEXT DEFAULT 'pending',
priority TEXT DEFAULT 'medium',
due_date TEXT,
note TEXT,
is_starred INTEGER DEFAULT 0,
views INTEGER DEFAULT 0,
is_deleted INTEGER DEFAULT 0,
deleted_at TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""")
# 标签表
cursor.execute("""
CREATE TABLE IF NOT EXISTS tags (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
color TEXT DEFAULT '#3498db',
created_at TEXT NOT NULL
)
""")
# 内容-标签关联表
cursor.execute("""
CREATE TABLE IF NOT EXISTS item_tags (
item_id INTEGER NOT NULL,
tag_id INTEGER NOT NULL,
PRIMARY KEY (item_id, tag_id),
FOREIGN KEY (item_id) REFERENCES items(id) ON DELETE CASCADE,
FOREIGN KEY (tag_id) REFERENCES tags(id) ON DELETE CASCADE
)
""")
# 邮箱表
cursor.execute("""
CREATE TABLE IF NOT EXISTS emails (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT NOT NULL UNIQUE,
name TEXT,
created_at TEXT NOT NULL
)
""")
# 草稿表
cursor.execute("""
CREATE TABLE IF NOT EXISTS drafts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
type TEXT NOT NULL DEFAULT 'text',
title TEXT,
content TEXT,
url TEXT,
source TEXT,
status TEXT DEFAULT 'pending',
priority TEXT DEFAULT 'medium',
due_date TEXT,
note TEXT,
tags TEXT,
is_starred INTEGER DEFAULT 0,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""")
# 邮件发送记录表
cursor.execute("""
CREATE TABLE IF NOT EXISTS email_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
item_id INTEGER NOT NULL,
email TEXT NOT NULL,
sent_at TEXT NOT NULL,
success INTEGER DEFAULT 1,
FOREIGN KEY (item_id) REFERENCES items(id) ON DELETE CASCADE
)
""")
# 文件夹表
cursor.execute("""
CREATE TABLE IF NOT EXISTS folders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
type TEXT NOT NULL,
parent_id INTEGER DEFAULT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY (parent_id) REFERENCES folders(id) ON DELETE CASCADE
)
""")
# 创建索引
cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_type ON items(type)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_status ON items(status)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_created ON items(created_at)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_item_tags_item ON item_tags(item_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_item_tags_tag ON item_tags(tag_id)")
# 检查并添加 is_starred 字段(兼容旧数据库)
try:
cursor.execute("SELECT is_starred FROM items LIMIT 1")
except sqlite3.OperationalError:
cursor.execute("ALTER TABLE items ADD COLUMN is_starred INTEGER DEFAULT 0")
# 创建 is_starred 索引(字段添加后再创建)
cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_starred ON items(is_starred)")
# 检查并添加 views 字段(兼容旧数据库)
try:
cursor.execute("SELECT views FROM items LIMIT 1")
except sqlite3.OperationalError:
cursor.execute("ALTER TABLE items ADD COLUMN views INTEGER DEFAULT 0")
# 检查并添加 is_deleted 和 deleted_at 字段(兼容旧数据库)
try:
cursor.execute("SELECT is_deleted FROM items LIMIT 1")
except sqlite3.OperationalError:
cursor.execute("ALTER TABLE items ADD COLUMN is_deleted INTEGER DEFAULT 0")
try:
cursor.execute("SELECT deleted_at FROM items LIMIT 1")
except sqlite3.OperationalError:
cursor.execute("ALTER TABLE items ADD COLUMN deleted_at TEXT")
# 检查并添加 folder_id 字段(兼容旧数据库)
try:
cursor.execute("SELECT folder_id FROM items LIMIT 1")
except sqlite3.OperationalError:
cursor.execute("ALTER TABLE items ADD COLUMN folder_id INTEGER DEFAULT NULL")
# 创建 folder 相关索引
cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_folder ON items(folder_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_folders_type ON folders(type)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_folders_parent ON folders(parent_id)")
# 待办事务表关联到文本类别的items
cursor.execute("""
CREATE TABLE IF NOT EXISTS todo_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
item_id INTEGER NOT NULL,
content TEXT NOT NULL,
remaining_days INTEGER DEFAULT 1,
is_completed INTEGER DEFAULT 0,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY (item_id) REFERENCES items(id) ON DELETE CASCADE
)
""")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_todo_events_item ON todo_events(item_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_todo_events_created ON todo_events(created_at)")
conn.commit()
# ============ Item 操作 ============
def create_item(self, type: str = "text", title: str = None, content: str = None,
url: str = None, source: str = None, status: str = "pending",
priority: str = "medium", due_date: str = None, note: str = None,
tags: List[str] = None, is_starred: bool = False,
folder_id: int = None) -> int:
"""创建新条目"""
self._ensure_init()
now = datetime.now().isoformat()
# 验证状态
if type == "todo" and status not in TODO_STATUS:
status = "pending"
if priority not in PRIORITY_LEVELS:
priority = "medium"
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO items (type, title, content, url, source, status, priority, due_date, note, is_starred, folder_id, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (type, title, content, url, source, status, priority, due_date, note, 1 if is_starred else 0, folder_id, now, now))
item_id = cursor.lastrowid
# 添加标签
if tags:
self._add_tags_to_item(conn, item_id, tags)
conn.commit()
return item_id
def get_item(self, item_id: int) -> Optional[Dict[str, Any]]:
"""获取单个条目"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM items WHERE id = ?", (item_id,))
row = cursor.fetchone()
if not row:
return None
item = dict(row)
item['tags'] = self._get_item_tags(conn, item_id)
return item
def list_items(self, type: str = None, status: str = None, tag: str = None,
keyword: str = None, starred: bool = None, folder_id: int = None,
sort_by: str = None, sort_order: str = None,
limit: int = 50, offset: int = 0) -> List[Dict[str, Any]]:
"""列出条目
sort_by: created_at, updated_at
sort_order: desc, asc
folder_id: 文件夹IDNone表示不限制-1表示未分类folder_id为null
"""
with self.get_conn() as conn:
cursor = conn.cursor()
query = "SELECT DISTINCT i.* FROM items i"
params = []
conditions = []
# 只显示未删除的数据
conditions.append("i.is_deleted = 0")
# 标签过滤需要JOIN
if tag:
query += " JOIN item_tags it ON i.id = it.item_id JOIN tags t ON it.tag_id = t.id"
conditions.append("t.name = ?")
params.append(tag)
if type:
conditions.append("i.type = ?")
params.append(type)
if status:
conditions.append("i.status = ?")
params.append(status)
if starred is not None:
conditions.append("i.is_starred = ?")
params.append(1 if starred else 0)
# 文件夹过滤
if folder_id is not None:
if folder_id == -1:
# -1 表示未分类folder_id 为 null
conditions.append("i.folder_id IS NULL")
else:
conditions.append("i.folder_id = ?")
params.append(folder_id)
if keyword:
conditions.append("(i.title LIKE ? OR i.content LIKE ? OR i.note LIKE ?)")
keyword_pattern = f"%{keyword}%"
params.extend([keyword_pattern, keyword_pattern, keyword_pattern])
if conditions:
query += " WHERE " + " AND ".join(conditions)
# 排序逻辑
if sort_by == 'updated_at':
order_field = 'i.updated_at'
elif sort_by == 'created_at':
order_field = 'i.created_at'
else:
# 默认:重点关注优先 + 创建时间降序
order_field = 'i.created_at'
order_dir = 'DESC' if (sort_order == 'asc' or sort_order is None) else 'ASC'
# 这里反转逻辑:用户选择"降序"时用DESC选择"升序"时用ASC
if sort_order == 'asc':
order_dir = 'ASC'
elif sort_order == 'desc':
order_dir = 'DESC'
else:
order_dir = 'DESC' # 默认降序
# 如果有指定排序字段,按该字段排序;否则默认重点关注优先
if sort_by:
query += f" ORDER BY {order_field} {order_dir} LIMIT ? OFFSET ?"
else:
# 默认:重点关注优先,然后创建时间降序
query += f" ORDER BY i.is_starred DESC, i.created_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
cursor.execute(query, params)
items = []
for row in cursor.fetchall():
item = dict(row)
item['tags'] = self._get_item_tags(conn, item['id'])
items.append(item)
return items
def count_items(self, type: str = None, status: str = None, tag: str = None,
keyword: str = None, starred: bool = None, folder_id: int = None) -> int:
"""计算符合条件的条目总数"""
with self.get_conn() as conn:
cursor = conn.cursor()
query = "SELECT COUNT(DISTINCT i.id) as count FROM items i"
params = []
conditions = []
# 标签过滤需要JOIN
if tag:
query += " JOIN item_tags it ON i.id = it.item_id JOIN tags t ON it.tag_id = t.id"
conditions.append("t.name = ?")
params.append(tag)
if type:
conditions.append("i.type = ?")
params.append(type)
if status:
conditions.append("i.status = ?")
params.append(status)
if starred is not None:
conditions.append("i.is_starred = ?")
params.append(1 if starred else 0)
if keyword:
conditions.append("(i.title LIKE ? OR i.content LIKE ? OR i.note LIKE ?)")
keyword_pattern = f"%{keyword}%"
params.extend([keyword_pattern, keyword_pattern, keyword_pattern])
# 文件夹过滤
if folder_id is not None:
if folder_id == -1:
# 未分类folder_id为NULL
conditions.append("i.folder_id IS NULL")
else:
conditions.append("i.folder_id = ?")
params.append(folder_id)
if conditions:
query += " WHERE " + " AND ".join(conditions)
cursor.execute(query, params)
return cursor.fetchone()['count']
def update_item(self, item_id: int, **kwargs) -> bool:
"""更新条目"""
allowed_fields = ['type', 'title', 'content', 'url', 'source', 'status', 'priority', 'due_date', 'note', 'is_starred', 'folder_id']
update_fields = {k: v for k, v in kwargs.items() if k in allowed_fields}
# 只有 tags 变化也算有效更新
if not update_fields and 'tags' not in kwargs:
return False
now = datetime.now().isoformat()
with self.get_conn() as conn:
cursor = conn.cursor()
# 检查条目是否存在
cursor.execute("SELECT id FROM items WHERE id = ?", (item_id,))
if not cursor.fetchone():
return False
if update_fields:
set_clause = ", ".join(f"{k} = ?" for k in update_fields.keys())
set_clause += ", updated_at = ?"
values = list(update_fields.values()) + [now, item_id]
cursor.execute(f"UPDATE items SET {set_clause} WHERE id = ?", values)
if 'tags' in kwargs:
# 先删除旧标签关联
cursor.execute("DELETE FROM item_tags WHERE item_id = ?", (item_id,))
# 添加新标签
if kwargs['tags']:
self._add_tags_to_item(conn, item_id, kwargs['tags'])
conn.commit()
return True
def delete_item(self, item_id: int) -> bool:
"""删除条目(移动到回收站)"""
with self.get_conn() as conn:
cursor = conn.cursor()
now = datetime.now().isoformat()
cursor.execute("UPDATE items SET is_deleted = 1, deleted_at = ? WHERE id = ?", (now, item_id))
conn.commit()
return cursor.rowcount > 0
def list_trash(self, limit: int = 50, offset: int = 0) -> List[Dict[str, Any]]:
"""列出回收站数据"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM items WHERE is_deleted = 1 ORDER BY deleted_at DESC LIMIT ? OFFSET ?", (limit, offset))
items = []
for row in cursor.fetchall():
item = dict(row)
item['tags'] = self._get_item_tags(conn, item['id'])
items.append(item)
return items
def count_trash(self) -> int:
"""计算回收站数据总数"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) as count FROM items WHERE is_deleted = 1")
return cursor.fetchone()['count']
def restore_item(self, item_id: int) -> bool:
"""从回收站恢复数据"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("UPDATE items SET is_deleted = 0, deleted_at = NULL WHERE id = ?", (item_id,))
conn.commit()
return cursor.rowcount > 0
def delete_permanently(self, item_id: int) -> bool:
"""彻底删除数据(从数据库中删除)"""
with self.get_conn() as conn:
cursor = conn.cursor()
# 删除标签关联
cursor.execute("DELETE FROM item_tags WHERE item_id = ?", (item_id,))
# 删除邮件发送记录
cursor.execute("DELETE FROM email_logs WHERE item_id = ?", (item_id,))
# 删除数据
cursor.execute("DELETE FROM items WHERE id = ?", (item_id,))
conn.commit()
return cursor.rowcount > 0
def empty_trash(self) -> int:
"""清空回收站"""
with self.get_conn() as conn:
cursor = conn.cursor()
# 获取所有回收站数据ID
cursor.execute("SELECT id FROM items WHERE is_deleted = 1")
ids = [row['id'] for row in cursor.fetchall()]
# 删除所有关联数据
for item_id in ids:
cursor.execute("DELETE FROM item_tags WHERE item_id = ?", (item_id,))
cursor.execute("DELETE FROM email_logs WHERE item_id = ?", (item_id,))
# 删除所有回收站数据
cursor.execute("DELETE FROM items WHERE is_deleted = 1")
deleted_count = cursor.rowcount
conn.commit()
return deleted_count
def toggle_star(self, item_id: int) -> bool:
"""切换重点关注状态"""
with self.get_conn() as conn:
cursor = conn.cursor()
# 先获取当前状态
cursor.execute("SELECT is_starred FROM items WHERE id = ?", (item_id,))
row = cursor.fetchone()
if not row:
return False
new_status = 0 if row['is_starred'] else 1
now = datetime.now().isoformat()
cursor.execute("UPDATE items SET is_starred = ?, updated_at = ? WHERE id = ?", (new_status, now, item_id))
conn.commit()
return True
def set_star(self, item_id: int, starred: bool = True) -> bool:
"""设置重点关注状态"""
with self.get_conn() as conn:
cursor = conn.cursor()
now = datetime.now().isoformat()
cursor.execute("UPDATE items SET is_starred = ?, updated_at = ? WHERE id = ?", (1 if starred else 0, now, item_id))
conn.commit()
return cursor.rowcount > 0
def increment_views(self, item_id: int) -> bool:
"""增加阅读数"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("UPDATE items SET views = views + 1 WHERE id = ?", (item_id,))
conn.commit()
return cursor.rowcount > 0
def move_item_to_folder(self, item_id: int, folder_id: int) -> bool:
"""将条目移动到文件夹"""
with self.get_conn() as conn:
cursor = conn.cursor()
now = datetime.now().isoformat()
cursor.execute("UPDATE items SET folder_id = ?, updated_at = ? WHERE id = ?", (folder_id, now, item_id))
conn.commit()
return cursor.rowcount > 0
# ============ Folder 文件夹操作 ============
def create_folder(self, name: str, type: str, parent_id: int = None) -> int:
"""创建文件夹
Args:
name: 文件夹名称
type: 类别类型text/link/column/todo
parent_id: 父文件夹ID目前不支持嵌套预留
"""
self._ensure_init()
now = datetime.now().isoformat()
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO folders (name, type, parent_id, created_at, updated_at)
VALUES (?, ?, ?, ?, ?)
""", (name, type, parent_id, now, now))
folder_id = cursor.lastrowid
conn.commit()
return folder_id
def get_folder(self, folder_id: int) -> Optional[Dict[str, Any]]:
"""获取文件夹信息"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM folders WHERE id = ?", (folder_id,))
row = cursor.fetchone()
if not row:
return None
folder = dict(row)
# 获取文件夹内条目数量
cursor.execute("SELECT COUNT(*) FROM items WHERE folder_id = ? AND is_deleted = 0", (folder_id,))
folder['item_count'] = cursor.fetchone()[0]
return folder
def list_folders(self, type: str = None) -> List[Dict[str, Any]]:
"""列出文件夹
Args:
type: 按类型过滤None表示列出所有
"""
with self.get_conn() as conn:
cursor = conn.cursor()
if type:
cursor.execute("SELECT * FROM folders WHERE type = ? ORDER BY created_at DESC", (type,))
else:
cursor.execute("SELECT * FROM folders ORDER BY created_at DESC")
folders = []
for row in cursor.fetchall():
folder = dict(row)
# 获取文件夹内条目数量
cursor.execute("SELECT COUNT(*) FROM items WHERE folder_id = ? AND is_deleted = 0", (folder['id'],))
folder['item_count'] = cursor.fetchone()[0]
folders.append(folder)
return folders
def update_folder(self, folder_id: int, name: str = None) -> bool:
"""更新文件夹"""
with self.get_conn() as conn:
cursor = conn.cursor()
now = datetime.now().isoformat()
if name:
cursor.execute("UPDATE folders SET name = ?, updated_at = ? WHERE id = ?", (name, now, folder_id))
conn.commit()
return cursor.rowcount > 0
return False
def delete_folder(self, folder_id: int, move_items_to_root: bool = True) -> bool:
"""删除文件夹
Args:
folder_id: 文件夹ID
move_items_to_root: 是否将条目移出文件夹到未分类True则移动False则一起删除
"""
with self.get_conn() as conn:
cursor = conn.cursor()
if move_items_to_root:
# 将条目移出文件夹folder_id设为null
cursor.execute("UPDATE items SET folder_id = NULL WHERE folder_id = ?", (folder_id,))
else:
# 删除文件夹内的所有条目
cursor.execute("DELETE FROM items WHERE folder_id = ?", (folder_id,))
# 删除文件夹
cursor.execute("DELETE FROM folders WHERE id = ?", (folder_id,))
conn.commit()
return cursor.rowcount > 0
def count_items_by_folder(self, folder_id: int) -> int:
"""统计文件夹内的条目数量"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM items WHERE folder_id = ? AND is_deleted = 0", (folder_id,))
return cursor.fetchone()[0]
# ============ Draft 草稿操作 ============
def save_draft(self, type: str = "text", title: str = None, content: str = None,
url: str = None, source: str = None, status: str = "pending",
priority: str = "medium", due_date: str = None, note: str = None,
tags: str = None, is_starred: bool = False) -> int:
"""保存草稿"""
self._ensure_init()
now = datetime.now().isoformat()
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO drafts (type, title, content, url, source, status, priority, due_date, note, tags, is_starred, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (type, title, content, url, source, status, priority, due_date, note, tags, 1 if is_starred else 0, now, now))
draft_id = cursor.lastrowid
conn.commit()
return draft_id
def update_draft(self, draft_id: int, **kwargs) -> bool:
"""更新草稿"""
allowed_fields = ['type', 'title', 'content', 'url', 'source', 'status', 'priority', 'due_date', 'note', 'tags', 'is_starred']
update_fields = {k: v for k, v in kwargs.items() if k in allowed_fields}
if not update_fields:
return False
now = datetime.now().isoformat()
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("SELECT id FROM drafts WHERE id = ?", (draft_id,))
if not cursor.fetchone():
return False
set_clause = ", ".join(f"{k} = ?" for k in update_fields.keys())
set_clause += ", updated_at = ?"
values = list(update_fields.values()) + [now, draft_id]
cursor.execute(f"UPDATE drafts SET {set_clause} WHERE id = ?", values)
conn.commit()
return True
def list_drafts(self, limit: int = 50, offset: int = 0) -> List[Dict[str, Any]]:
"""列出草稿"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM drafts ORDER BY updated_at DESC LIMIT ? OFFSET ?", (limit, offset))
return [dict(row) for row in cursor.fetchall()]
def count_drafts(self) -> int:
"""计算草稿总数"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) as count FROM drafts")
return cursor.fetchone()['count']
def get_draft(self, draft_id: int) -> Optional[Dict[str, Any]]:
"""获取单个草稿"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM drafts WHERE id = ?", (draft_id,))
row = cursor.fetchone()
return dict(row) if row else None
def delete_draft(self, draft_id: int) -> bool:
"""删除草稿"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM drafts WHERE id = ?", (draft_id,))
conn.commit()
return cursor.rowcount > 0
def draft_to_item(self, draft_id: int) -> Optional[int]:
"""将草稿转为正式条目"""
draft = self.get_draft(draft_id)
if not draft:
return None
# 创建条目
tags_list = draft['tags'].split(',') if draft['tags'] else []
tags_list = [t.strip() for t in tags_list if t.strip()]
item_id = self.create_item(
type=draft['type'],
title=draft['title'],
content=draft['content'],
url=draft['url'],
source=draft['source'],
status=draft['status'],
priority=draft['priority'],
due_date=draft['due_date'],
note=draft['note'],
tags=tags_list,
is_starred=draft['is_starred']
)
# 删除草稿
if item_id:
self.delete_draft(draft_id)
return item_id
# ============ Tag 操作 ============
def create_tag(self, name: str, color: str = "#3498db") -> int:
"""创建标签"""
now = datetime.now().isoformat()
with self.get_conn() as conn:
cursor = conn.cursor()
try:
cursor.execute("INSERT INTO tags (name, color, created_at) VALUES (?, ?, ?)",
(name, color, now))
conn.commit()
return cursor.lastrowid
except sqlite3.IntegrityError:
# 标签已存在
cursor.execute("SELECT id FROM tags WHERE name = ?", (name,))
return cursor.fetchone()['id']
def list_tags(self) -> List[Dict[str, Any]]:
"""列出所有标签"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT t.*, COUNT(it.item_id) as item_count
FROM tags t
LEFT JOIN item_tags it ON t.id = it.tag_id
GROUP BY t.id
ORDER BY t.name
""")
return [dict(row) for row in cursor.fetchall()]
def update_tag(self, tag_id: int, name: str) -> bool:
"""更新标签名称"""
with self.get_conn() as conn:
cursor = conn.cursor()
# 检查名称是否已存在(排除自己)
cursor.execute("SELECT id FROM tags WHERE name = ? AND id != ?", (name, tag_id))
if cursor.fetchone():
return False # 名称已存在
cursor.execute("UPDATE tags SET name = ? WHERE id = ?", (name, tag_id))
conn.commit()
return cursor.rowcount > 0
def delete_tag(self, tag_id: int = None, name: str = None) -> bool:
"""删除标签"""
with self.get_conn() as conn:
cursor = conn.cursor()
if name:
cursor.execute("DELETE FROM tags WHERE name = ?", (name,))
elif tag_id:
cursor.execute("DELETE FROM tags WHERE id = ?", (tag_id,))
conn.commit()
return cursor.rowcount > 0
# ============ 辅助方法 ============
def _add_tags_to_item(self, conn, item_id: int, tags: List[str]):
"""为条目添加标签"""
cursor = conn.cursor()
for tag_name in tags:
tag_name = tag_name.strip()
if not tag_name:
continue
# 确保标签存在 - 使用同一个连接
cursor.execute("SELECT id FROM tags WHERE name = ?", (tag_name,))
row = cursor.fetchone()
if row:
tag_id = row['id']
else:
# 创建新标签
now = datetime.now().isoformat()
cursor.execute("INSERT INTO tags (name, color, created_at) VALUES (?, '#3498db', ?)",
(tag_name, now))
tag_id = cursor.lastrowid
# 创建关联
cursor.execute("INSERT OR IGNORE INTO item_tags (item_id, tag_id) VALUES (?, ?)",
(item_id, tag_id))
def _get_item_tags(self, conn, item_id: int) -> List[str]:
"""获取条目的标签"""
cursor = conn.cursor()
cursor.execute("""
SELECT t.name FROM tags t
JOIN item_tags it ON t.id = it.tag_id
WHERE it.item_id = ?
ORDER BY t.name
""", (item_id,))
return [row['name'] for row in cursor.fetchall()]
# ============ Email 操作 ============
def create_email(self, email: str, name: str = None) -> int:
"""创建邮箱"""
now = datetime.now().isoformat()
with self.get_conn() as conn:
cursor = conn.cursor()
try:
cursor.execute("INSERT INTO emails (email, name, created_at) VALUES (?, ?, ?)",
(email, name, now))
conn.commit()
return cursor.lastrowid
except sqlite3.IntegrityError:
# 邮箱已存在返回已有ID
cursor.execute("SELECT id FROM emails WHERE email = ?", (email,))
return cursor.fetchone()['id']
def list_emails(self) -> List[Dict[str, Any]]:
"""列出所有邮箱"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM emails ORDER BY created_at DESC")
return [dict(row) for row in cursor.fetchall()]
def get_email(self, email_id: int) -> Optional[Dict[str, Any]]:
"""获取单个邮箱"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM emails WHERE id = ?", (email_id,))
row = cursor.fetchone()
return dict(row) if row else None
def update_email(self, email_id: int, email: str = None, name: str = None) -> bool:
"""更新邮箱"""
now = datetime.now().isoformat()
with self.get_conn() as conn:
cursor = conn.cursor()
if email:
# 检查邮箱是否已存在(排除自己)
cursor.execute("SELECT id FROM emails WHERE email = ? AND id != ?", (email, email_id))
if cursor.fetchone():
return False # 邎箱已存在
if email and name:
cursor.execute("UPDATE emails SET email = ?, name = ? WHERE id = ?", (email, name, email_id))
elif email:
cursor.execute("UPDATE emails SET email = ? WHERE id = ?", (email, email_id))
elif name:
cursor.execute("UPDATE emails SET name = ? WHERE id = ?", (name, email_id))
conn.commit()
return cursor.rowcount > 0
def delete_email(self, email_id: int) -> bool:
"""删除邮箱"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM emails WHERE id = ?", (email_id,))
conn.commit()
return cursor.rowcount > 0
# ============ 邮件日志 操作 ============
def log_email_send(self, item_id: int, email: str, success: bool = True) -> int:
"""记录邮件发送"""
now = datetime.now().isoformat()
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO email_logs (item_id, email, sent_at, success)
VALUES (?, ?, ?, ?)
""", (item_id, email, now, 1 if success else 0))
conn.commit()
return cursor.lastrowid
def get_email_logs(self, item_id: int) -> List[Dict[str, Any]]:
"""获取收藏的邮件发送记录"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM email_logs
WHERE item_id = ?
ORDER BY sent_at DESC
""", (item_id,))
return [dict(row) for row in cursor.fetchall()]
def stats(self) -> Dict[str, Any]:
"""获取统计信息"""
self._ensure_init()
with self.get_conn() as conn:
cursor = conn.cursor()
stats = {}
# 总数
cursor.execute("SELECT COUNT(*) as count FROM items")
stats['total'] = cursor.fetchone()['count']
# 按类型统计
cursor.execute("SELECT type, COUNT(*) as count FROM items GROUP BY type")
stats['by_type'] = {row['type']: row['count'] for row in cursor.fetchall()}
# 待办状态统计
cursor.execute("SELECT status, COUNT(*) as count FROM items WHERE type = 'todo' GROUP BY status")
stats['todo_status'] = {row['status']: row['count'] for row in cursor.fetchall()}
# 标签数
cursor.execute("SELECT COUNT(*) as count FROM tags")
stats['tags'] = cursor.fetchone()['count']
return stats
# ============ 提醒相关 ============
def get_reminders(self) -> Dict[str, Any]:
"""获取提醒信息:即将到期和已过期的待办"""
self._ensure_init()
with self.get_conn() as conn:
cursor = conn.cursor()
now = datetime.now()
reminders = {
'overdue': [], # 已过期
'due_today': [], # 今天到期
'due_soon': [] # 24小时内到期不含今天
}
# 查询未完成的待办(有截止日期的)
cursor.execute("""
SELECT * FROM items
WHERE type = 'todo'
AND status != 'completed'
AND due_date IS NOT NULL
AND due_date != ''
ORDER BY due_date ASC
""")
for row in cursor.fetchall():
item = dict(row)
item['tags'] = self._get_item_tags(conn, item['id'])
try:
due_date_str = item['due_date']
# 支持多种日期格式
if 'T' in due_date_str:
# ISO 格式2026-04-16T14:30
due_date = datetime.strptime(due_date_str[:16], '%Y-%m-%dT%H:%M')
elif len(due_date_str) == 10:
# 只有日期2026-04-16视为当天 23:59:59
due_date = datetime.strptime(due_date_str, '%Y-%m-%d').replace(hour=23, minute=59, second=59)
else:
# 其他格式,尝试解析
due_date = datetime.strptime(due_date_str.split('.')[0], '%Y-%m-%dT%H:%M:%S')
# 计算距离到期的时间
time_left = due_date - now
if time_left.total_seconds() < 0:
# 已过期
days_overdue = abs(int(time_left.total_seconds() / 86400))
item['days_overdue'] = days_overdue
reminders['overdue'].append(item)
elif time_left.total_seconds() < 86400: # 24小时内
# 判断是今天还是明天
if due_date.date() == now.date():
reminders['due_today'].append(item)
else:
reminders['due_soon'].append(item)
elif due_date.date() == now.date():
# 今天到期超过24小时的情况比如现在凌晨截止时间是晚上
reminders['due_today'].append(item)
except (ValueError, AttributeError) as e:
# 日期格式错误,跳过
continue
# 统计总数
reminders['total'] = len(reminders['overdue']) + len(reminders['due_today']) + len(reminders['due_soon'])
return reminders
# ============ 备份操作 ============
def create_backup(self, manual: bool = False) -> Dict[str, Any]:
"""创建数据库备份"""
import os
# 确保备份目录存在
os.makedirs(BACKUP_DIR, exist_ok=True)
now = datetime.now()
backup_name = now.strftime('%Y-%m-%d_%H%M%S')
if manual:
backup_name += '_manual'
backup_path = os.path.join(BACKUP_DIR, f'{backup_name}.db')
# 复制数据库文件
shutil.copy2(self.db_path, backup_path)
# 获取备份信息
backup_info = {
'name': backup_name,
'path': backup_path,
'size': os.path.getsize(backup_path),
'created_at': now.isoformat(),
'manual': manual,
'is_first_of_month': now.day == 1
}
# 保存备份元数据
self._save_backup_meta(backup_info)
# 清理旧备份
self._cleanup_old_backups()
return backup_info
def list_backups(self) -> List[Dict[str, Any]]:
"""列出所有备份"""
import os
if not os.path.exists(BACKUP_DIR):
return []
# 读取备份元数据
meta_path = os.path.join(BACKUP_DIR, 'backup_meta.json')
if os.path.exists(meta_path):
with open(meta_path, 'r') as f:
backups = json.load(f)
else:
# 从文件重建元数据
backups = []
for f in os.listdir(BACKUP_DIR):
if f.endswith('.db'):
path = os.path.join(BACKUP_DIR, f)
backups.append({
'name': f.replace('.db', ''),
'path': path,
'size': os.path.getsize(path),
'created_at': datetime.fromtimestamp(os.path.getmtime(path)).isoformat(),
'manual': '_manual' in f,
'is_first_of_month': self._is_first_of_month_filename(f)
})
# 按时间倒序排列
backups.sort(key=lambda x: x['created_at'], reverse=True)
return backups
def restore_backup(self, backup_name: str) -> bool:
"""恢复备份"""
import os
backup_path = os.path.join(BACKUP_DIR, f'{backup_name}.db')
if not os.path.exists(backup_path):
return False
# 先备份当前数据库(以防万一)
current_backup = self.db_path + '.before_restore'
shutil.copy2(self.db_path, current_backup)
# 恢复备份
shutil.copy2(backup_path, self.db_path)
return True
def delete_backup(self, backup_name: str) -> bool:
"""删除备份"""
import os
backup_path = os.path.join(BACKUP_DIR, f'{backup_name}.db')
if not os.path.exists(backup_path):
return False
os.remove(backup_path)
# 更新元数据
self._remove_backup_meta(backup_name)
return True
def _save_backup_meta(self, backup_info: Dict[str, Any]):
"""保存备份元数据"""
import os
meta_path = os.path.join(BACKUP_DIR, 'backup_meta.json')
# 读取现有元数据
backups = []
if os.path.exists(meta_path):
with open(meta_path, 'r') as f:
backups = json.load(f)
# 添加新备份
backups.append(backup_info)
# 保存
with open(meta_path, 'w') as f:
json.dump(backups, f, indent=2)
def _remove_backup_meta(self, backup_name: str):
"""从元数据中删除备份"""
import os
meta_path = os.path.join(BACKUP_DIR, 'backup_meta.json')
if not os.path.exists(meta_path):
return
with open(meta_path, 'r') as f:
backups = json.load(f)
backups = [b for b in backups if b['name'] != backup_name]
with open(meta_path, 'w') as f:
json.dump(backups, f, indent=2)
def _is_first_of_month_filename(self, filename: str) -> bool:
"""判断是否是每月第一天的备份"""
# 格式2026-04-01_040000.db 或 2026-05-01_...
try:
date_part = filename.split('_')[0]
day = int(date_part.split('-')[2])
return day == 1
except:
return False
def _cleanup_old_backups(self):
"""清理旧备份保留30天 + 每月第一天"""
import os
backups = self.list_backups()
now = datetime.now()
keep_paths = []
for backup in backups:
backup_date = datetime.fromisoformat(backup['created_at'])
days_old = (now - backup_date).days
# 保留条件:
# 1. 手动备份永久保留最多10个
# 2. 30天内的备份
# 3. 每月第一天的备份
if backup['manual']:
# 手动备份保留但最多10个
manual_backups = [b for b in backups if b['manual']]
if manual_backups.index(backup) < 10:
keep_paths.append(backup['path'])
else:
self.delete_backup(backup['name'])
elif days_old <= 30:
keep_paths.append(backup['path'])
elif backup['is_first_of_month']:
# 每月第一天永久保留
keep_paths.append(backup['path'])
else:
# 删除
self.delete_backup(backup['name'])
# ============ Todo Events 待办事务操作 ============
def create_todo_event(self, item_id: int, content: str, remaining_days: int = 1) -> int:
"""创建待办事务"""
self._ensure_init()
now = datetime.now().isoformat()
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO todo_events (item_id, content, remaining_days, is_completed, created_at, updated_at)
VALUES (?, ?, ?, 0, ?, ?)
""", (item_id, content, remaining_days, now, now))
conn.commit()
return cursor.lastrowid
def list_todo_events(self, item_id: int, limit: int = 10, offset: int = 0) -> List[Dict[str, Any]]:
"""列出待办事务(按时间倒序)"""
self._ensure_init()
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM todo_events
WHERE item_id = ?
ORDER BY created_at DESC
LIMIT ? OFFSET ?
""", (item_id, limit, offset))
rows = cursor.fetchall()
return [dict(row) for row in rows]
def count_todo_events(self, item_id: int) -> int:
"""计算待办事务总数"""
self._ensure_init()
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) as count FROM todo_events WHERE item_id = ?", (item_id,))
return cursor.fetchone()['count']
def get_todo_event(self, event_id: int) -> Optional[Dict[str, Any]]:
"""获取单个待办事务"""
self._ensure_init()
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM todo_events WHERE id = ?", (event_id,))
row = cursor.fetchone()
return dict(row) if row else None
def update_todo_event(self, event_id: int, content: str = None, remaining_days: int = None, is_completed: bool = None) -> bool:
"""更新待办事务"""
self._ensure_init()
now = datetime.now().isoformat()
with self.get_conn() as conn:
cursor = conn.cursor()
# 获取当前数据
cursor.execute("SELECT * FROM todo_events WHERE id = ?", (event_id,))
row = cursor.fetchone()
if not row:
return False
# 更新字段
new_content = content if content is not None else row['content']
new_days = remaining_days if remaining_days is not None else row['remaining_days']
new_completed = 1 if is_completed is True else (0 if is_completed is False else row['is_completed'])
cursor.execute("""
UPDATE todo_events
SET content = ?, remaining_days = ?, is_completed = ?, updated_at = ?
WHERE id = ?
""", (new_content, new_days, new_completed, now, event_id))
conn.commit()
return True
def delete_todo_event(self, event_id: int) -> bool:
"""删除待办事务"""
self._ensure_init()
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM todo_events WHERE id = ?", (event_id,))
conn.commit()
return cursor.rowcount > 0
# 全局数据库实例
db = Database()