"""数据库操作""" import sqlite3 import json import shutil import os from datetime import datetime, timedelta from typing import Optional, List, Dict, Any from contextlib import contextmanager from .config import DATABASE_URL, TODO_STATUS, PRIORITY_LEVELS # 备份目录 BACKUP_DIR = os.path.join(os.path.dirname(DATABASE_URL), 'backups') class Database: """SQLite数据库管理""" def __init__(self, db_path: str = DATABASE_URL): self.db_path = db_path self._initialized = False def _ensure_init(self): """确保数据库已初始化""" if self._initialized: return self._init_db() self._initialized = True @contextmanager def get_conn(self): """获取数据库连接""" conn = sqlite3.connect(self.db_path, timeout=30.0) conn.row_factory = sqlite3.Row # 启用WAL模式,提高并发性能 conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA busy_timeout=30000") try: yield conn finally: conn.close() def _init_db(self): """初始化数据库表""" with self.get_conn() as conn: cursor = conn.cursor() # 主内容表 cursor.execute(""" CREATE TABLE IF NOT EXISTS items ( id INTEGER PRIMARY KEY AUTOINCREMENT, type TEXT NOT NULL DEFAULT 'text', title TEXT, content TEXT, url TEXT, source TEXT, status TEXT DEFAULT 'pending', priority TEXT DEFAULT 'medium', due_date TEXT, note TEXT, is_starred INTEGER DEFAULT 0, views INTEGER DEFAULT 0, is_deleted INTEGER DEFAULT 0, deleted_at TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ) """) # 标签表 cursor.execute(""" CREATE TABLE IF NOT EXISTS tags ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE, color TEXT DEFAULT '#3498db', created_at TEXT NOT NULL ) """) # 内容-标签关联表 cursor.execute(""" CREATE TABLE IF NOT EXISTS item_tags ( item_id INTEGER NOT NULL, tag_id INTEGER NOT NULL, PRIMARY KEY (item_id, tag_id), FOREIGN KEY (item_id) REFERENCES items(id) ON DELETE CASCADE, FOREIGN KEY (tag_id) REFERENCES tags(id) ON DELETE CASCADE ) """) # 邮箱表 cursor.execute(""" CREATE TABLE IF NOT EXISTS emails ( id INTEGER PRIMARY KEY AUTOINCREMENT, email TEXT NOT NULL UNIQUE, name TEXT, created_at TEXT NOT NULL ) """) # 草稿表 cursor.execute(""" CREATE TABLE IF NOT EXISTS drafts ( id INTEGER PRIMARY KEY AUTOINCREMENT, type TEXT NOT NULL DEFAULT 'text', title TEXT, content TEXT, url TEXT, source TEXT, status TEXT DEFAULT 'pending', priority TEXT DEFAULT 'medium', due_date TEXT, note TEXT, tags TEXT, is_starred INTEGER DEFAULT 0, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ) """) # 邮件发送记录表 cursor.execute(""" CREATE TABLE IF NOT EXISTS email_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, item_id INTEGER NOT NULL, email TEXT NOT NULL, sent_at TEXT NOT NULL, success INTEGER DEFAULT 1, FOREIGN KEY (item_id) REFERENCES items(id) ON DELETE CASCADE ) """) # 文件夹表 cursor.execute(""" CREATE TABLE IF NOT EXISTS folders ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, type TEXT NOT NULL, parent_id INTEGER DEFAULT NULL, created_at TEXT NOT NULL, updated_at TEXT NOT NULL, FOREIGN KEY (parent_id) REFERENCES folders(id) ON DELETE CASCADE ) """) # 创建索引 cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_type ON items(type)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_status ON items(status)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_created ON items(created_at)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_item_tags_item ON item_tags(item_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_item_tags_tag ON item_tags(tag_id)") # 检查并添加 is_starred 字段(兼容旧数据库) try: cursor.execute("SELECT is_starred FROM items LIMIT 1") except sqlite3.OperationalError: cursor.execute("ALTER TABLE items ADD COLUMN is_starred INTEGER DEFAULT 0") # 创建 is_starred 索引(字段添加后再创建) cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_starred ON items(is_starred)") # 检查并添加 views 字段(兼容旧数据库) try: cursor.execute("SELECT views FROM items LIMIT 1") except sqlite3.OperationalError: cursor.execute("ALTER TABLE items ADD COLUMN views INTEGER DEFAULT 0") # 检查并添加 is_deleted 和 deleted_at 字段(兼容旧数据库) try: cursor.execute("SELECT is_deleted FROM items LIMIT 1") except sqlite3.OperationalError: cursor.execute("ALTER TABLE items ADD COLUMN is_deleted INTEGER DEFAULT 0") try: cursor.execute("SELECT deleted_at FROM items LIMIT 1") except sqlite3.OperationalError: cursor.execute("ALTER TABLE items ADD COLUMN deleted_at TEXT") # 检查并添加 folder_id 字段(兼容旧数据库) try: cursor.execute("SELECT folder_id FROM items LIMIT 1") except sqlite3.OperationalError: cursor.execute("ALTER TABLE items ADD COLUMN folder_id INTEGER DEFAULT NULL") # 创建 folder 相关索引 cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_folder ON items(folder_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_folders_type ON folders(type)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_folders_parent ON folders(parent_id)") # 检查并添加 is_pinned 字段(置顶功能,兼容旧数据库) try: cursor.execute("SELECT is_pinned FROM items LIMIT 1") except sqlite3.OperationalError: cursor.execute("ALTER TABLE items ADD COLUMN is_pinned INTEGER DEFAULT 0") # 创建 is_pinned 索引 cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_pinned ON items(is_pinned)") # 待办事务表(关联到文本类别的items) cursor.execute(""" CREATE TABLE IF NOT EXISTS todo_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, item_id INTEGER NOT NULL, content TEXT NOT NULL, remaining_days INTEGER DEFAULT 1, is_completed INTEGER DEFAULT 0, created_at TEXT NOT NULL, updated_at TEXT NOT NULL, FOREIGN KEY (item_id) REFERENCES items(id) ON DELETE CASCADE ) """) cursor.execute("CREATE INDEX IF NOT EXISTS idx_todo_events_item ON todo_events(item_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_todo_events_created ON todo_events(created_at)") conn.commit() # ============ Item 操作 ============ def create_item(self, type: str = "text", title: str = None, content: str = None, url: str = None, source: str = None, status: str = "pending", priority: str = "medium", due_date: str = None, note: str = None, tags: List[str] = None, is_starred: bool = False, folder_id: int = None) -> int: """创建新条目""" self._ensure_init() now = datetime.now().isoformat() # 验证状态 if type == "todo" and status not in TODO_STATUS: status = "pending" if priority not in PRIORITY_LEVELS: priority = "medium" with self.get_conn() as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO items (type, title, content, url, source, status, priority, due_date, note, is_starred, folder_id, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, (type, title, content, url, source, status, priority, due_date, note, 1 if is_starred else 0, folder_id, now, now)) item_id = cursor.lastrowid # 添加标签 if tags: self._add_tags_to_item(conn, item_id, tags) conn.commit() return item_id def get_item(self, item_id: int) -> Optional[Dict[str, Any]]: """获取单个条目""" with self.get_conn() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM items WHERE id = ?", (item_id,)) row = cursor.fetchone() if not row: return None item = dict(row) item['tags'] = self._get_item_tags(conn, item_id) return item def list_items(self, type: str = None, status: str = None, tag: str = None, keyword: str = None, starred: bool = None, folder_id: int = None, sort_by: str = None, sort_order: str = None, limit: int = 50, offset: int = 0) -> List[Dict[str, Any]]: """列出条目 sort_by: created_at, updated_at sort_order: desc, asc folder_id: 文件夹ID,None表示不限制,-1表示未分类(folder_id为null) """ with self.get_conn() as conn: cursor = conn.cursor() query = "SELECT DISTINCT i.* FROM items i" params = [] conditions = [] # 只显示未删除的数据 conditions.append("i.is_deleted = 0") # 标签过滤需要JOIN if tag: query += " JOIN item_tags it ON i.id = it.item_id JOIN tags t ON it.tag_id = t.id" conditions.append("t.name = ?") params.append(tag) if type: conditions.append("i.type = ?") params.append(type) if status: conditions.append("i.status = ?") params.append(status) if starred is not None: conditions.append("i.is_starred = ?") params.append(1 if starred else 0) # 文件夹过滤 if folder_id is not None: if folder_id == -1: # -1 表示未分类(folder_id 为 null) conditions.append("i.folder_id IS NULL") else: conditions.append("i.folder_id = ?") params.append(folder_id) if keyword: conditions.append("(i.title LIKE ? OR i.content LIKE ? OR i.note LIKE ?)") keyword_pattern = f"%{keyword}%" params.extend([keyword_pattern, keyword_pattern, keyword_pattern]) if conditions: query += " WHERE " + " AND ".join(conditions) # 排序逻辑:置顶 > 关注 > 创建时间 if sort_by == 'updated_at': order_field = 'i.updated_at' elif sort_by == 'created_at': order_field = 'i.created_at' else: # 默认:置顶优先 + 关注优先 + 创建时间降序 order_field = 'i.created_at' # 确定排序方向 if sort_order == 'asc': order_dir = 'ASC' elif sort_order == 'desc': order_dir = 'DESC' else: order_dir = 'DESC' # 默认降序 # 如果有指定排序字段,按该字段排序,但置顶始终优先 if sort_by: query += f" ORDER BY i.is_pinned DESC, {order_field} {order_dir} LIMIT ? OFFSET ?" else: # 默认:置顶 > 关注 > 创建时间降序 query += f" ORDER BY i.is_pinned DESC, i.is_starred DESC, i.created_at DESC LIMIT ? OFFSET ?" params.extend([limit, offset]) cursor.execute(query, params) items = [] for row in cursor.fetchall(): item = dict(row) item['tags'] = self._get_item_tags(conn, item['id']) items.append(item) return items def count_items(self, type: str = None, status: str = None, tag: str = None, keyword: str = None, starred: bool = None, folder_id: int = None) -> int: """计算符合条件的条目总数""" with self.get_conn() as conn: cursor = conn.cursor() query = "SELECT COUNT(DISTINCT i.id) as count FROM items i" params = [] conditions = [] # 标签过滤需要JOIN if tag: query += " JOIN item_tags it ON i.id = it.item_id JOIN tags t ON it.tag_id = t.id" conditions.append("t.name = ?") params.append(tag) if type: conditions.append("i.type = ?") params.append(type) if status: conditions.append("i.status = ?") params.append(status) if starred is not None: conditions.append("i.is_starred = ?") params.append(1 if starred else 0) if keyword: conditions.append("(i.title LIKE ? OR i.content LIKE ? OR i.note LIKE ?)") keyword_pattern = f"%{keyword}%" params.extend([keyword_pattern, keyword_pattern, keyword_pattern]) # 文件夹过滤 if folder_id is not None: if folder_id == -1: # 未分类:folder_id为NULL conditions.append("i.folder_id IS NULL") else: conditions.append("i.folder_id = ?") params.append(folder_id) if conditions: query += " WHERE " + " AND ".join(conditions) cursor.execute(query, params) return cursor.fetchone()['count'] def update_item(self, item_id: int, **kwargs) -> bool: """更新条目""" allowed_fields = ['type', 'title', 'content', 'url', 'source', 'status', 'priority', 'due_date', 'note', 'is_starred', 'folder_id'] update_fields = {k: v for k, v in kwargs.items() if k in allowed_fields} # 只有 tags 变化也算有效更新 if not update_fields and 'tags' not in kwargs: return False now = datetime.now().isoformat() with self.get_conn() as conn: cursor = conn.cursor() # 检查条目是否存在 cursor.execute("SELECT id FROM items WHERE id = ?", (item_id,)) if not cursor.fetchone(): return False if update_fields: set_clause = ", ".join(f"{k} = ?" for k in update_fields.keys()) set_clause += ", updated_at = ?" values = list(update_fields.values()) + [now, item_id] cursor.execute(f"UPDATE items SET {set_clause} WHERE id = ?", values) if 'tags' in kwargs: # 先删除旧标签关联 cursor.execute("DELETE FROM item_tags WHERE item_id = ?", (item_id,)) # 添加新标签 if kwargs['tags']: self._add_tags_to_item(conn, item_id, kwargs['tags']) conn.commit() return True def delete_item(self, item_id: int) -> bool: """删除条目(移动到回收站)""" with self.get_conn() as conn: cursor = conn.cursor() now = datetime.now().isoformat() cursor.execute("UPDATE items SET is_deleted = 1, deleted_at = ? WHERE id = ?", (now, item_id)) conn.commit() return cursor.rowcount > 0 def list_trash(self, limit: int = 50, offset: int = 0) -> List[Dict[str, Any]]: """列出回收站数据""" with self.get_conn() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM items WHERE is_deleted = 1 ORDER BY deleted_at DESC LIMIT ? OFFSET ?", (limit, offset)) items = [] for row in cursor.fetchall(): item = dict(row) item['tags'] = self._get_item_tags(conn, item['id']) items.append(item) return items def count_trash(self) -> int: """计算回收站数据总数""" with self.get_conn() as conn: cursor = conn.cursor() cursor.execute("SELECT COUNT(*) as count FROM items WHERE is_deleted = 1") return cursor.fetchone()['count'] def restore_item(self, item_id: int) -> bool: """从回收站恢复数据""" with self.get_conn() as conn: cursor = conn.cursor() cursor.execute("UPDATE items SET is_deleted = 0, deleted_at = NULL WHERE id = ?", (item_id,)) conn.commit() return cursor.rowcount > 0 def delete_permanently(self, item_id: int) -> bool: """彻底删除数据(从数据库中删除)""" with self.get_conn() as conn: cursor = conn.cursor() # 删除标签关联 cursor.execute("DELETE FROM item_tags WHERE item_id = ?", (item_id,)) # 删除邮件发送记录 cursor.execute("DELETE FROM email_logs WHERE item_id = ?", (item_id,)) # 删除数据 cursor.execute("DELETE FROM items WHERE id = ?", (item_id,)) conn.commit() return cursor.rowcount > 0 def empty_trash(self) -> int: """清空回收站""" with self.get_conn() as conn: cursor = conn.cursor() # 获取所有回收站数据ID cursor.execute("SELECT id FROM items WHERE is_deleted = 1") ids = [row['id'] for row in cursor.fetchall()] # 删除所有关联数据 for item_id in ids: cursor.execute("DELETE FROM item_tags WHERE item_id = ?", (item_id,)) cursor.execute("DELETE FROM email_logs WHERE item_id = ?", (item_id,)) # 删除所有回收站数据 cursor.execute("DELETE FROM items WHERE is_deleted = 1") deleted_count = cursor.rowcount conn.commit() return deleted_count def toggle_star(self, item_id: int) -> bool: """切换重点关注状态""" with self.get_conn() as conn: cursor = conn.cursor() # 先获取当前状态 cursor.execute("SELECT is_starred FROM items WHERE id = ?", (item_id,)) row = cursor.fetchone() if not row: return False new_status = 0 if row['is_starred'] else 1 now = datetime.now().isoformat() cursor.execute("UPDATE items SET is_starred = ?, updated_at = ? WHERE id = ?", (new_status, now, item_id)) conn.commit() return True def set_star(self, item_id: int, starred: bool = True) -> bool: """设置重点关注状态""" with self.get_conn() as conn: cursor = conn.cursor() now = datetime.now().isoformat() cursor.execute("UPDATE items SET is_starred = ?, updated_at = ? WHERE id = ?", (1 if starred else 0, now, item_id)) conn.commit() return cursor.rowcount > 0 def toggle_pin(self, item_id: int) -> bool: """切换置顶状态""" with self.get_conn() as conn: cursor = conn.cursor() # 先获取当前状态 cursor.execute("SELECT is_pinned FROM items WHERE id = ?", (item_id,)) row = cursor.fetchone() if not row: return False new_status = 0 if row['is_pinned'] else 1 now = datetime.now().isoformat() cursor.execute("UPDATE items SET is_pinned = ?, updated_at = ? WHERE id = ?", (new_status, now, item_id)) conn.commit() return True def set_pin(self, item_id: int, pinned: bool = True) -> bool: """设置置顶状态""" with self.get_conn() as conn: cursor = conn.cursor() now = datetime.now().isoformat() cursor.execute("UPDATE items SET is_pinned = ?, updated_at = ? WHERE id = ?", (1 if pinned else 0, now, item_id)) conn.commit() return cursor.rowcount > 0 def increment_views(self, item_id: int) -> bool: """增加阅读数""" with self.get_conn() as conn: cursor = conn.cursor() cursor.execute("UPDATE items SET views = views + 1 WHERE id = ?", (item_id,)) conn.commit() return cursor.rowcount > 0 def move_item_to_folder(self, item_id: int, folder_id: int) -> bool: """将条目移动到文件夹""" with self.get_conn() as conn: cursor = conn.cursor() now = datetime.now().isoformat() cursor.execute("UPDATE items SET folder_id = ?, updated_at = ? WHERE id = ?", (folder_id, now, item_id)) conn.commit() return cursor.rowcount > 0 # ============ Folder 文件夹操作 ============ def create_folder(self, name: str, type: str, parent_id: int = None) -> int: """创建文件夹 Args: name: 文件夹名称 type: 类别类型(text/link/column/todo) parent_id: 父文件夹ID(目前不支持嵌套,预留) """ self._ensure_init() now = datetime.now().isoformat() with self.get_conn() as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO folders (name, type, parent_id, created_at, updated_at) VALUES (?, ?, ?, ?, ?) """, (name, type, parent_id, now, now)) folder_id = cursor.lastrowid conn.commit() return folder_id def get_folder(self, folder_id: int) -> Optional[Dict[str, Any]]: """获取文件夹信息""" with self.get_conn() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM folders WHERE id = ?", (folder_id,)) row = cursor.fetchone() if not row: return None folder = dict(row) # 获取文件夹内条目数量 cursor.execute("SELECT COUNT(*) FROM items WHERE folder_id = ? AND is_deleted = 0", (folder_id,)) folder['item_count'] = cursor.fetchone()[0] return folder def list_folders(self, type: str = None) -> List[Dict[str, Any]]: """列出文件夹 Args: type: 按类型过滤,None表示列出所有 """ with self.get_conn() as conn: cursor = conn.cursor() if type: cursor.execute("SELECT * FROM folders WHERE type = ? ORDER BY created_at DESC", (type,)) else: cursor.execute("SELECT * FROM folders ORDER BY created_at DESC") folders = [] for row in cursor.fetchall(): folder = dict(row) # 获取文件夹内条目数量 cursor.execute("SELECT COUNT(*) FROM items WHERE folder_id = ? AND is_deleted = 0", (folder['id'],)) folder['item_count'] = cursor.fetchone()[0] folders.append(folder) return folders def update_folder(self, folder_id: int, name: str = None) -> bool: """更新文件夹""" with self.get_conn() as conn: cursor = conn.cursor() now = datetime.now().isoformat() if name: cursor.execute("UPDATE folders SET name = ?, updated_at = ? WHERE id = ?", (name, now, folder_id)) conn.commit() return cursor.rowcount > 0 return False def delete_folder(self, folder_id: int, move_items_to_root: bool = True) -> bool: """删除文件夹 Args: folder_id: 文件夹ID move_items_to_root: 是否将条目移出文件夹(到未分类),True则移动,False则一起删除 """ with self.get_conn() as conn: cursor = conn.cursor() if move_items_to_root: # 将条目移出文件夹(folder_id设为null) cursor.execute("UPDATE items SET folder_id = NULL WHERE folder_id = ?", (folder_id,)) else: # 删除文件夹内的所有条目 cursor.execute("DELETE FROM items WHERE folder_id = ?", (folder_id,)) # 删除文件夹 cursor.execute("DELETE FROM folders WHERE id = ?", (folder_id,)) conn.commit() return cursor.rowcount > 0 def count_items_by_folder(self, folder_id: int) -> int: """统计文件夹内的条目数量""" with self.get_conn() as conn: cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM items WHERE folder_id = ? AND is_deleted = 0", (folder_id,)) return cursor.fetchone()[0] # ============ Draft 草稿操作 ============ def save_draft(self, type: str = "text", title: str = None, content: str = None, url: str = None, source: str = None, status: str = "pending", priority: str = "medium", due_date: str = None, note: str = None, tags: str = None, is_starred: bool = False) -> int: """保存草稿""" self._ensure_init() now = datetime.now().isoformat() with self.get_conn() as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO drafts (type, title, content, url, source, status, priority, due_date, note, tags, is_starred, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, (type, title, content, url, source, status, priority, due_date, note, tags, 1 if is_starred else 0, now, now)) draft_id = cursor.lastrowid conn.commit() return draft_id def update_draft(self, draft_id: int, **kwargs) -> bool: """更新草稿""" allowed_fields = ['type', 'title', 'content', 'url', 'source', 'status', 'priority', 'due_date', 'note', 'tags', 'is_starred'] update_fields = {k: v for k, v in kwargs.items() if k in allowed_fields} if not update_fields: return False now = datetime.now().isoformat() with self.get_conn() as conn: cursor = conn.cursor() cursor.execute("SELECT id FROM drafts WHERE id = ?", (draft_id,)) if not cursor.fetchone(): return False set_clause = ", ".join(f"{k} = ?" for k in update_fields.keys()) set_clause += ", updated_at = ?" values = list(update_fields.values()) + [now, draft_id] cursor.execute(f"UPDATE drafts SET {set_clause} WHERE id = ?", values) conn.commit() return True def list_drafts(self, limit: int = 50, offset: int = 0) -> List[Dict[str, Any]]: """列出草稿""" with self.get_conn() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM drafts ORDER BY updated_at DESC LIMIT ? OFFSET ?", (limit, offset)) return [dict(row) for row in cursor.fetchall()] def count_drafts(self) -> int: """计算草稿总数""" with self.get_conn() as conn: cursor = conn.cursor() cursor.execute("SELECT COUNT(*) as count FROM drafts") return cursor.fetchone()['count'] def get_draft(self, draft_id: int) -> Optional[Dict[str, Any]]: """获取单个草稿""" with self.get_conn() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM drafts WHERE id = ?", (draft_id,)) row = cursor.fetchone() return dict(row) if row else None def delete_draft(self, draft_id: int) -> bool: """删除草稿""" with self.get_conn() as conn: cursor = conn.cursor() cursor.execute("DELETE FROM drafts WHERE id = ?", (draft_id,)) conn.commit() return cursor.rowcount > 0 def draft_to_item(self, draft_id: int) -> Optional[int]: """将草稿转为正式条目""" draft = self.get_draft(draft_id) if not draft: return None # 创建条目 tags_list = draft['tags'].split(',') if draft['tags'] else [] tags_list = [t.strip() for t in tags_list if t.strip()] item_id = self.create_item( type=draft['type'], title=draft['title'], content=draft['content'], url=draft['url'], source=draft['source'], status=draft['status'], priority=draft['priority'], due_date=draft['due_date'], note=draft['note'], tags=tags_list, is_starred=draft['is_starred'] ) # 删除草稿 if item_id: self.delete_draft(draft_id) return item_id # ============ Tag 操作 ============ def create_tag(self, name: str, color: str = "#3498db") -> int: """创建标签""" now = datetime.now().isoformat() with self.get_conn() as conn: cursor = conn.cursor() try: cursor.execute("INSERT INTO tags (name, color, created_at) VALUES (?, ?, ?)", (name, color, now)) conn.commit() return cursor.lastrowid except sqlite3.IntegrityError: # 标签已存在 cursor.execute("SELECT id FROM tags WHERE name = ?", (name,)) return cursor.fetchone()['id'] def list_tags(self) -> List[Dict[str, Any]]: """列出所有标签""" with self.get_conn() as conn: cursor = conn.cursor() cursor.execute(""" SELECT t.*, COUNT(it.item_id) as item_count FROM tags t LEFT JOIN item_tags it ON t.id = it.tag_id GROUP BY t.id ORDER BY t.name """) return [dict(row) for row in cursor.fetchall()] def update_tag(self, tag_id: int, name: str) -> bool: """更新标签名称""" with self.get_conn() as conn: cursor = conn.cursor() # 检查名称是否已存在(排除自己) cursor.execute("SELECT id FROM tags WHERE name = ? AND id != ?", (name, tag_id)) if cursor.fetchone(): return False # 名称已存在 cursor.execute("UPDATE tags SET name = ? WHERE id = ?", (name, tag_id)) conn.commit() return cursor.rowcount > 0 def delete_tag(self, tag_id: int = None, name: str = None) -> bool: """删除标签""" with self.get_conn() as conn: cursor = conn.cursor() if name: cursor.execute("DELETE FROM tags WHERE name = ?", (name,)) elif tag_id: cursor.execute("DELETE FROM tags WHERE id = ?", (tag_id,)) conn.commit() return cursor.rowcount > 0 # ============ 辅助方法 ============ def _add_tags_to_item(self, conn, item_id: int, tags: List[str]): """为条目添加标签""" cursor = conn.cursor() for tag_name in tags: tag_name = tag_name.strip() if not tag_name: continue # 确保标签存在 - 使用同一个连接 cursor.execute("SELECT id FROM tags WHERE name = ?", (tag_name,)) row = cursor.fetchone() if row: tag_id = row['id'] else: # 创建新标签 now = datetime.now().isoformat() cursor.execute("INSERT INTO tags (name, color, created_at) VALUES (?, '#3498db', ?)", (tag_name, now)) tag_id = cursor.lastrowid # 创建关联 cursor.execute("INSERT OR IGNORE INTO item_tags (item_id, tag_id) VALUES (?, ?)", (item_id, tag_id)) def _get_item_tags(self, conn, item_id: int) -> List[str]: """获取条目的标签""" cursor = conn.cursor() cursor.execute(""" SELECT t.name FROM tags t JOIN item_tags it ON t.id = it.tag_id WHERE it.item_id = ? ORDER BY t.name """, (item_id,)) return [row['name'] for row in cursor.fetchall()] # ============ Email 操作 ============ def create_email(self, email: str, name: str = None) -> int: """创建邮箱""" now = datetime.now().isoformat() with self.get_conn() as conn: cursor = conn.cursor() try: cursor.execute("INSERT INTO emails (email, name, created_at) VALUES (?, ?, ?)", (email, name, now)) conn.commit() return cursor.lastrowid except sqlite3.IntegrityError: # 邮箱已存在,返回已有ID cursor.execute("SELECT id FROM emails WHERE email = ?", (email,)) return cursor.fetchone()['id'] def list_emails(self) -> List[Dict[str, Any]]: """列出所有邮箱""" with self.get_conn() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM emails ORDER BY created_at DESC") return [dict(row) for row in cursor.fetchall()] def get_email(self, email_id: int) -> Optional[Dict[str, Any]]: """获取单个邮箱""" with self.get_conn() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM emails WHERE id = ?", (email_id,)) row = cursor.fetchone() return dict(row) if row else None def update_email(self, email_id: int, email: str = None, name: str = None) -> bool: """更新邮箱""" now = datetime.now().isoformat() with self.get_conn() as conn: cursor = conn.cursor() if email: # 检查邮箱是否已存在(排除自己) cursor.execute("SELECT id FROM emails WHERE email = ? AND id != ?", (email, email_id)) if cursor.fetchone(): return False # 邎箱已存在 if email and name: cursor.execute("UPDATE emails SET email = ?, name = ? WHERE id = ?", (email, name, email_id)) elif email: cursor.execute("UPDATE emails SET email = ? WHERE id = ?", (email, email_id)) elif name: cursor.execute("UPDATE emails SET name = ? WHERE id = ?", (name, email_id)) conn.commit() return cursor.rowcount > 0 def delete_email(self, email_id: int) -> bool: """删除邮箱""" with self.get_conn() as conn: cursor = conn.cursor() cursor.execute("DELETE FROM emails WHERE id = ?", (email_id,)) conn.commit() return cursor.rowcount > 0 # ============ 邮件日志 操作 ============ def log_email_send(self, item_id: int, email: str, success: bool = True) -> int: """记录邮件发送""" now = datetime.now().isoformat() with self.get_conn() as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO email_logs (item_id, email, sent_at, success) VALUES (?, ?, ?, ?) """, (item_id, email, now, 1 if success else 0)) conn.commit() return cursor.lastrowid def get_email_logs(self, item_id: int) -> List[Dict[str, Any]]: """获取收藏的邮件发送记录""" with self.get_conn() as conn: cursor = conn.cursor() cursor.execute(""" SELECT * FROM email_logs WHERE item_id = ? ORDER BY sent_at DESC """, (item_id,)) return [dict(row) for row in cursor.fetchall()] def stats(self) -> Dict[str, Any]: """获取统计信息""" self._ensure_init() with self.get_conn() as conn: cursor = conn.cursor() stats = {} # 总数 cursor.execute("SELECT COUNT(*) as count FROM items WHERE is_deleted = 0") stats['total'] = cursor.fetchone()['count'] # 按类型统计 cursor.execute("SELECT type, COUNT(*) as count FROM items WHERE is_deleted = 0 GROUP BY type") stats['by_type'] = {row['type']: row['count'] for row in cursor.fetchall()} # 未读数量统计(views = 0) cursor.execute("SELECT COUNT(*) as count FROM items WHERE is_deleted = 0 AND (views IS NULL OR views = 0)") stats['unread'] = cursor.fetchone()['count'] # 按类型统计未读数量 cursor.execute("SELECT type, COUNT(*) as count FROM items WHERE is_deleted = 0 AND (views IS NULL OR views = 0) GROUP BY type") stats['unread_by_type'] = {row['type']: row['count'] for row in cursor.fetchall()} # 待办状态统计 cursor.execute("SELECT status, COUNT(*) as count FROM items WHERE type = 'todo' AND is_deleted = 0 GROUP BY status") stats['todo_status'] = {row['status']: row['count'] for row in cursor.fetchall()} # 标签数 cursor.execute("SELECT COUNT(*) as count FROM tags") stats['tags'] = cursor.fetchone()['count'] return stats # ============ 提醒相关 ============ def get_reminders(self) -> Dict[str, Any]: """获取提醒信息:即将到期和已过期的待办""" self._ensure_init() with self.get_conn() as conn: cursor = conn.cursor() now = datetime.now() reminders = { 'overdue': [], # 已过期 'due_today': [], # 今天到期 'due_soon': [] # 24小时内到期(不含今天) } # 查询未完成的待办(有截止日期的) cursor.execute(""" SELECT * FROM items WHERE type = 'todo' AND status != 'completed' AND due_date IS NOT NULL AND due_date != '' ORDER BY due_date ASC """) for row in cursor.fetchall(): item = dict(row) item['tags'] = self._get_item_tags(conn, item['id']) try: due_date_str = item['due_date'] # 支持多种日期格式 if 'T' in due_date_str: # ISO 格式:2026-04-16T14:30 due_date = datetime.strptime(due_date_str[:16], '%Y-%m-%dT%H:%M') elif len(due_date_str) == 10: # 只有日期:2026-04-16,视为当天 23:59:59 due_date = datetime.strptime(due_date_str, '%Y-%m-%d').replace(hour=23, minute=59, second=59) else: # 其他格式,尝试解析 due_date = datetime.strptime(due_date_str.split('.')[0], '%Y-%m-%dT%H:%M:%S') # 计算距离到期的时间 time_left = due_date - now if time_left.total_seconds() < 0: # 已过期 days_overdue = abs(int(time_left.total_seconds() / 86400)) item['days_overdue'] = days_overdue reminders['overdue'].append(item) elif time_left.total_seconds() < 86400: # 24小时内 # 判断是今天还是明天 if due_date.date() == now.date(): reminders['due_today'].append(item) else: reminders['due_soon'].append(item) elif due_date.date() == now.date(): # 今天到期(超过24小时的情况,比如现在凌晨,截止时间是晚上) reminders['due_today'].append(item) except (ValueError, AttributeError) as e: # 日期格式错误,跳过 continue # 统计总数 reminders['total'] = len(reminders['overdue']) + len(reminders['due_today']) + len(reminders['due_soon']) return reminders # ============ 备份操作 ============ def create_backup(self, manual: bool = False) -> Dict[str, Any]: """创建数据库备份""" import os # 确保备份目录存在 os.makedirs(BACKUP_DIR, exist_ok=True) now = datetime.now() backup_name = now.strftime('%Y-%m-%d_%H%M%S') if manual: backup_name += '_manual' backup_path = os.path.join(BACKUP_DIR, f'{backup_name}.db') # 复制数据库文件 shutil.copy2(self.db_path, backup_path) # 获取备份信息 backup_info = { 'name': backup_name, 'path': backup_path, 'size': os.path.getsize(backup_path), 'created_at': now.isoformat(), 'manual': manual, 'is_first_of_month': now.day == 1 } # 保存备份元数据 self._save_backup_meta(backup_info) # 清理旧备份 self._cleanup_old_backups() return backup_info def list_backups(self) -> List[Dict[str, Any]]: """列出所有备份""" import os if not os.path.exists(BACKUP_DIR): return [] # 读取备份元数据 meta_path = os.path.join(BACKUP_DIR, 'backup_meta.json') if os.path.exists(meta_path): with open(meta_path, 'r') as f: backups = json.load(f) else: # 从文件重建元数据 backups = [] for f in os.listdir(BACKUP_DIR): if f.endswith('.db'): path = os.path.join(BACKUP_DIR, f) backups.append({ 'name': f.replace('.db', ''), 'path': path, 'size': os.path.getsize(path), 'created_at': datetime.fromtimestamp(os.path.getmtime(path)).isoformat(), 'manual': '_manual' in f, 'is_first_of_month': self._is_first_of_month_filename(f) }) # 按时间倒序排列 backups.sort(key=lambda x: x['created_at'], reverse=True) return backups def restore_backup(self, backup_name: str) -> bool: """恢复备份""" import os backup_path = os.path.join(BACKUP_DIR, f'{backup_name}.db') if not os.path.exists(backup_path): return False # 先备份当前数据库(以防万一) current_backup = self.db_path + '.before_restore' shutil.copy2(self.db_path, current_backup) # 恢复备份 shutil.copy2(backup_path, self.db_path) return True def delete_backup(self, backup_name: str) -> bool: """删除备份""" import os backup_path = os.path.join(BACKUP_DIR, f'{backup_name}.db') if not os.path.exists(backup_path): return False os.remove(backup_path) # 更新元数据 self._remove_backup_meta(backup_name) return True def _save_backup_meta(self, backup_info: Dict[str, Any]): """保存备份元数据""" import os meta_path = os.path.join(BACKUP_DIR, 'backup_meta.json') # 读取现有元数据 backups = [] if os.path.exists(meta_path): with open(meta_path, 'r') as f: backups = json.load(f) # 添加新备份 backups.append(backup_info) # 保存 with open(meta_path, 'w') as f: json.dump(backups, f, indent=2) def _remove_backup_meta(self, backup_name: str): """从元数据中删除备份""" import os meta_path = os.path.join(BACKUP_DIR, 'backup_meta.json') if not os.path.exists(meta_path): return with open(meta_path, 'r') as f: backups = json.load(f) backups = [b for b in backups if b['name'] != backup_name] with open(meta_path, 'w') as f: json.dump(backups, f, indent=2) def _is_first_of_month_filename(self, filename: str) -> bool: """判断是否是每月第一天的备份""" # 格式:2026-04-01_040000.db 或 2026-05-01_... try: date_part = filename.split('_')[0] day = int(date_part.split('-')[2]) return day == 1 except: return False def _cleanup_old_backups(self): """清理旧备份:保留30天 + 每月第一天""" import os backups = self.list_backups() now = datetime.now() keep_paths = [] for backup in backups: backup_date = datetime.fromisoformat(backup['created_at']) days_old = (now - backup_date).days # 保留条件: # 1. 手动备份永久保留(最多10个) # 2. 30天内的备份 # 3. 每月第一天的备份 if backup['manual']: # 手动备份保留,但最多10个 manual_backups = [b for b in backups if b['manual']] if manual_backups.index(backup) < 10: keep_paths.append(backup['path']) else: self.delete_backup(backup['name']) elif days_old <= 30: keep_paths.append(backup['path']) elif backup['is_first_of_month']: # 每月第一天永久保留 keep_paths.append(backup['path']) else: # 删除 self.delete_backup(backup['name']) # ============ Todo Events 待办事务操作 ============ def create_todo_event(self, item_id: int, content: str, remaining_days: int = 1) -> int: """创建待办事务""" self._ensure_init() now = datetime.now().isoformat() with self.get_conn() as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO todo_events (item_id, content, remaining_days, is_completed, created_at, updated_at) VALUES (?, ?, ?, 0, ?, ?) """, (item_id, content, remaining_days, now, now)) conn.commit() return cursor.lastrowid def list_todo_events(self, item_id: int, limit: int = 10, offset: int = 0) -> List[Dict[str, Any]]: """列出待办事务(按时间倒序)""" self._ensure_init() with self.get_conn() as conn: cursor = conn.cursor() cursor.execute(""" SELECT * FROM todo_events WHERE item_id = ? ORDER BY created_at DESC LIMIT ? OFFSET ? """, (item_id, limit, offset)) rows = cursor.fetchall() return [dict(row) for row in rows] def count_todo_events(self, item_id: int) -> int: """计算待办事务总数""" self._ensure_init() with self.get_conn() as conn: cursor = conn.cursor() cursor.execute("SELECT COUNT(*) as count FROM todo_events WHERE item_id = ?", (item_id,)) return cursor.fetchone()['count'] def get_todo_event(self, event_id: int) -> Optional[Dict[str, Any]]: """获取单个待办事务""" self._ensure_init() with self.get_conn() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM todo_events WHERE id = ?", (event_id,)) row = cursor.fetchone() return dict(row) if row else None def update_todo_event(self, event_id: int, content: str = None, remaining_days: int = None, is_completed: bool = None) -> bool: """更新待办事务""" self._ensure_init() now = datetime.now().isoformat() with self.get_conn() as conn: cursor = conn.cursor() # 获取当前数据 cursor.execute("SELECT * FROM todo_events WHERE id = ?", (event_id,)) row = cursor.fetchone() if not row: return False # 更新字段 new_content = content if content is not None else row['content'] new_days = remaining_days if remaining_days is not None else row['remaining_days'] new_completed = 1 if is_completed is True else (0 if is_completed is False else row['is_completed']) cursor.execute(""" UPDATE todo_events SET content = ?, remaining_days = ?, is_completed = ?, updated_at = ? WHERE id = ? """, (new_content, new_days, new_completed, now, event_id)) conn.commit() return True def delete_todo_event(self, event_id: int) -> bool: """删除待办事务""" self._ensure_init() with self.get_conn() as conn: cursor = conn.cursor() cursor.execute("DELETE FROM todo_events WHERE id = ?", (event_id,)) conn.commit() return cursor.rowcount > 0 # 全局数据库实例 db = Database()