"""数据库操作""" import sqlite3 import json import shutil import os from datetime import datetime, timedelta from typing import Optional, List, Dict, Any from contextlib import contextmanager from .config import DATABASE_URL, TODO_STATUS, PRIORITY_LEVELS # 备份目录 BACKUP_DIR = os.path.join(os.path.dirname(DATABASE_URL), 'backups') class Database: """SQLite数据库管理""" def __init__(self, db_path: str = DATABASE_URL): self.db_path = db_path self._initialized = False def _ensure_init(self): """确保数据库已初始化""" if self._initialized: return self._init_db() self._initialized = True @contextmanager def get_conn(self): """获取数据库连接""" conn = sqlite3.connect(self.db_path, timeout=30.0) conn.row_factory = sqlite3.Row # 启用WAL模式,提高并发性能 conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA busy_timeout=30000") try: yield conn finally: conn.close() def _init_db(self): """初始化数据库表""" with self.get_conn() as conn: cursor = conn.cursor() # 主内容表 cursor.execute(""" CREATE TABLE IF NOT EXISTS items ( id INTEGER PRIMARY KEY AUTOINCREMENT, type TEXT NOT NULL DEFAULT 'text', title TEXT, content TEXT, url TEXT, source TEXT, status TEXT DEFAULT 'pending', priority TEXT DEFAULT 'medium', due_date TEXT, note TEXT, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ) """) # 标签表 cursor.execute(""" CREATE TABLE IF NOT EXISTS tags ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE, color TEXT DEFAULT '#3498db', created_at TEXT NOT NULL ) """) # 内容-标签关联表 cursor.execute(""" CREATE TABLE IF NOT EXISTS item_tags ( item_id INTEGER NOT NULL, tag_id INTEGER NOT NULL, PRIMARY KEY (item_id, tag_id), FOREIGN KEY (item_id) REFERENCES items(id) ON DELETE CASCADE, FOREIGN KEY (tag_id) REFERENCES tags(id) ON DELETE CASCADE ) """) # 邮箱表 cursor.execute(""" CREATE TABLE IF NOT EXISTS emails ( id INTEGER PRIMARY KEY AUTOINCREMENT, email TEXT NOT NULL UNIQUE, name TEXT, created_at TEXT NOT NULL ) """) # 邮件发送记录表 cursor.execute(""" CREATE TABLE IF NOT EXISTS email_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, item_id INTEGER NOT NULL, email TEXT NOT NULL, sent_at TEXT NOT NULL, success INTEGER DEFAULT 1, FOREIGN KEY (item_id) REFERENCES items(id) ON DELETE CASCADE ) """) # 创建索引 cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_type ON items(type)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_status ON items(status)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_created ON items(created_at)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_item_tags_item ON item_tags(item_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_item_tags_tag ON item_tags(tag_id)") conn.commit() # ============ Item 操作 ============ def create_item(self, type: str = "text", title: str = None, content: str = None, url: str = None, source: str = None, status: str = "pending", priority: str = "medium", due_date: str = None, note: str = None, tags: List[str] = None) -> int: """创建新条目""" self._ensure_init() now = datetime.now().isoformat() # 验证状态 if type == "todo" and status not in TODO_STATUS: status = "pending" if priority not in PRIORITY_LEVELS: priority = "medium" with self.get_conn() as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO items (type, title, content, url, source, status, priority, due_date, note, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, (type, title, content, url, source, status, priority, due_date, note, now, now)) item_id = cursor.lastrowid # 添加标签 if tags: self._add_tags_to_item(conn, item_id, tags) conn.commit() return item_id def get_item(self, item_id: int) -> Optional[Dict[str, Any]]: """获取单个条目""" with self.get_conn() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM items WHERE id = ?", (item_id,)) row = cursor.fetchone() if not row: return None item = dict(row) item['tags'] = self._get_item_tags(conn, item_id) return item def list_items(self, type: str = None, status: str = None, tag: str = None, keyword: str = None, limit: int = 50, offset: int = 0) -> List[Dict[str, Any]]: """列出条目""" with self.get_conn() as conn: cursor = conn.cursor() query = "SELECT DISTINCT i.* FROM items i" params = [] conditions = [] # 标签过滤需要JOIN if tag: query += " JOIN item_tags it ON i.id = it.item_id JOIN tags t ON it.tag_id = t.id" conditions.append("t.name = ?") params.append(tag) if type: conditions.append("i.type = ?") params.append(type) if status: conditions.append("i.status = ?") params.append(status) if keyword: conditions.append("(i.title LIKE ? OR i.content LIKE ? OR i.note LIKE ?)") keyword_pattern = f"%{keyword}%" params.extend([keyword_pattern, keyword_pattern, keyword_pattern]) if conditions: query += " WHERE " + " AND ".join(conditions) query += " ORDER BY i.created_at DESC LIMIT ? OFFSET ?" params.extend([limit, offset]) cursor.execute(query, params) items = [] for row in cursor.fetchall(): item = dict(row) item['tags'] = self._get_item_tags(conn, item['id']) items.append(item) return items def count_items(self, type: str = None, status: str = None, tag: str = None, keyword: str = None) -> int: """计算符合条件的条目总数""" with self.get_conn() as conn: cursor = conn.cursor() query = "SELECT COUNT(DISTINCT i.id) as count FROM items i" params = [] conditions = [] # 标签过滤需要JOIN if tag: query += " JOIN item_tags it ON i.id = it.item_id JOIN tags t ON it.tag_id = t.id" conditions.append("t.name = ?") params.append(tag) if type: conditions.append("i.type = ?") params.append(type) if status: conditions.append("i.status = ?") params.append(status) if keyword: conditions.append("(i.title LIKE ? OR i.content LIKE ? OR i.note LIKE ?)") keyword_pattern = f"%{keyword}%" params.extend([keyword_pattern, keyword_pattern, keyword_pattern]) if conditions: query += " WHERE " + " AND ".join(conditions) cursor.execute(query, params) return cursor.fetchone()['count'] def update_item(self, item_id: int, **kwargs) -> bool: """更新条目""" allowed_fields = ['type', 'title', 'content', 'url', 'source', 'status', 'priority', 'due_date', 'note'] update_fields = {k: v for k, v in kwargs.items() if k in allowed_fields} if not update_fields and 'tags' not in kwargs: return False now = datetime.now().isoformat() with self.get_conn() as conn: cursor = conn.cursor() if update_fields: set_clause = ", ".join(f"{k} = ?" for k in update_fields.keys()) set_clause += ", updated_at = ?" values = list(update_fields.values()) + [now, item_id] cursor.execute(f"UPDATE items SET {set_clause} WHERE id = ?", values) if 'tags' in kwargs: # 先删除旧标签关联 cursor.execute("DELETE FROM item_tags WHERE item_id = ?", (item_id,)) # 添加新标签 if kwargs['tags']: self._add_tags_to_item(conn, item_id, kwargs['tags']) conn.commit() return cursor.rowcount > 0 def delete_item(self, item_id: int) -> bool: """删除条目""" with self.get_conn() as conn: cursor = conn.cursor() cursor.execute("DELETE FROM items WHERE id = ?", (item_id,)) conn.commit() return cursor.rowcount > 0 # ============ Tag 操作 ============ def create_tag(self, name: str, color: str = "#3498db") -> int: """创建标签""" now = datetime.now().isoformat() with self.get_conn() as conn: cursor = conn.cursor() try: cursor.execute("INSERT INTO tags (name, color, created_at) VALUES (?, ?, ?)", (name, color, now)) conn.commit() return cursor.lastrowid except sqlite3.IntegrityError: # 标签已存在 cursor.execute("SELECT id FROM tags WHERE name = ?", (name,)) return cursor.fetchone()['id'] def list_tags(self) -> List[Dict[str, Any]]: """列出所有标签""" with self.get_conn() as conn: cursor = conn.cursor() cursor.execute(""" SELECT t.*, COUNT(it.item_id) as item_count FROM tags t LEFT JOIN item_tags it ON t.id = it.tag_id GROUP BY t.id ORDER BY t.name """) return [dict(row) for row in cursor.fetchall()] def update_tag(self, tag_id: int, name: str) -> bool: """更新标签名称""" with self.get_conn() as conn: cursor = conn.cursor() # 检查名称是否已存在(排除自己) cursor.execute("SELECT id FROM tags WHERE name = ? AND id != ?", (name, tag_id)) if cursor.fetchone(): return False # 名称已存在 cursor.execute("UPDATE tags SET name = ? WHERE id = ?", (name, tag_id)) conn.commit() return cursor.rowcount > 0 def delete_tag(self, tag_id: int = None, name: str = None) -> bool: """删除标签""" with self.get_conn() as conn: cursor = conn.cursor() if name: cursor.execute("DELETE FROM tags WHERE name = ?", (name,)) elif tag_id: cursor.execute("DELETE FROM tags WHERE id = ?", (tag_id,)) conn.commit() return cursor.rowcount > 0 # ============ 辅助方法 ============ def _add_tags_to_item(self, conn, item_id: int, tags: List[str]): """为条目添加标签""" cursor = conn.cursor() for tag_name in tags: tag_name = tag_name.strip() if not tag_name: continue # 确保标签存在 - 使用同一个连接 cursor.execute("SELECT id FROM tags WHERE name = ?", (tag_name,)) row = cursor.fetchone() if row: tag_id = row['id'] else: # 创建新标签 now = datetime.now().isoformat() cursor.execute("INSERT INTO tags (name, color, created_at) VALUES (?, '#3498db', ?)", (tag_name, now)) tag_id = cursor.lastrowid # 创建关联 cursor.execute("INSERT OR IGNORE INTO item_tags (item_id, tag_id) VALUES (?, ?)", (item_id, tag_id)) def _get_item_tags(self, conn, item_id: int) -> List[str]: """获取条目的标签""" cursor = conn.cursor() cursor.execute(""" SELECT t.name FROM tags t JOIN item_tags it ON t.id = it.tag_id WHERE it.item_id = ? ORDER BY t.name """, (item_id,)) return [row['name'] for row in cursor.fetchall()] # ============ Email 操作 ============ def create_email(self, email: str, name: str = None) -> int: """创建邮箱""" now = datetime.now().isoformat() with self.get_conn() as conn: cursor = conn.cursor() try: cursor.execute("INSERT INTO emails (email, name, created_at) VALUES (?, ?, ?)", (email, name, now)) conn.commit() return cursor.lastrowid except sqlite3.IntegrityError: # 邮箱已存在,返回已有ID cursor.execute("SELECT id FROM emails WHERE email = ?", (email,)) return cursor.fetchone()['id'] def list_emails(self) -> List[Dict[str, Any]]: """列出所有邮箱""" with self.get_conn() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM emails ORDER BY created_at DESC") return [dict(row) for row in cursor.fetchall()] def get_email(self, email_id: int) -> Optional[Dict[str, Any]]: """获取单个邮箱""" with self.get_conn() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM emails WHERE id = ?", (email_id,)) row = cursor.fetchone() return dict(row) if row else None def update_email(self, email_id: int, email: str = None, name: str = None) -> bool: """更新邮箱""" now = datetime.now().isoformat() with self.get_conn() as conn: cursor = conn.cursor() if email: # 检查邮箱是否已存在(排除自己) cursor.execute("SELECT id FROM emails WHERE email = ? AND id != ?", (email, email_id)) if cursor.fetchone(): return False # 邎箱已存在 if email and name: cursor.execute("UPDATE emails SET email = ?, name = ? WHERE id = ?", (email, name, email_id)) elif email: cursor.execute("UPDATE emails SET email = ? WHERE id = ?", (email, email_id)) elif name: cursor.execute("UPDATE emails SET name = ? WHERE id = ?", (name, email_id)) conn.commit() return cursor.rowcount > 0 def delete_email(self, email_id: int) -> bool: """删除邮箱""" with self.get_conn() as conn: cursor = conn.cursor() cursor.execute("DELETE FROM emails WHERE id = ?", (email_id,)) conn.commit() return cursor.rowcount > 0 # ============ 邮件日志 操作 ============ def log_email_send(self, item_id: int, email: str, success: bool = True) -> int: """记录邮件发送""" now = datetime.now().isoformat() with self.get_conn() as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO email_logs (item_id, email, sent_at, success) VALUES (?, ?, ?, ?) """, (item_id, email, now, 1 if success else 0)) conn.commit() return cursor.lastrowid def get_email_logs(self, item_id: int) -> List[Dict[str, Any]]: """获取收藏的邮件发送记录""" with self.get_conn() as conn: cursor = conn.cursor() cursor.execute(""" SELECT * FROM email_logs WHERE item_id = ? ORDER BY sent_at DESC """, (item_id,)) return [dict(row) for row in cursor.fetchall()] def stats(self) -> Dict[str, Any]: """获取统计信息""" self._ensure_init() with self.get_conn() as conn: cursor = conn.cursor() stats = {} # 总数 cursor.execute("SELECT COUNT(*) as count FROM items") stats['total'] = cursor.fetchone()['count'] # 按类型统计 cursor.execute("SELECT type, COUNT(*) as count FROM items GROUP BY type") stats['by_type'] = {row['type']: row['count'] for row in cursor.fetchall()} # 待办状态统计 cursor.execute("SELECT status, COUNT(*) as count FROM items WHERE type = 'todo' GROUP BY status") stats['todo_status'] = {row['status']: row['count'] for row in cursor.fetchall()} # 标签数 cursor.execute("SELECT COUNT(*) as count FROM tags") stats['tags'] = cursor.fetchone()['count'] return stats # ============ 提醒相关 ============ def get_reminders(self) -> Dict[str, Any]: """获取提醒信息:即将到期和已过期的待办""" self._ensure_init() with self.get_conn() as conn: cursor = conn.cursor() now = datetime.now() reminders = { 'overdue': [], # 已过期 'due_today': [], # 今天到期 'due_soon': [] # 24小时内到期(不含今天) } # 查询未完成的待办(有截止日期的) cursor.execute(""" SELECT * FROM items WHERE type = 'todo' AND status != 'completed' AND due_date IS NOT NULL AND due_date != '' ORDER BY due_date ASC """) for row in cursor.fetchall(): item = dict(row) item['tags'] = self._get_item_tags(conn, item['id']) try: due_date_str = item['due_date'] # 支持多种日期格式 if 'T' in due_date_str: # ISO 格式:2026-04-16T14:30 due_date = datetime.strptime(due_date_str[:16], '%Y-%m-%dT%H:%M') elif len(due_date_str) == 10: # 只有日期:2026-04-16,视为当天 23:59:59 due_date = datetime.strptime(due_date_str, '%Y-%m-%d').replace(hour=23, minute=59, second=59) else: # 其他格式,尝试解析 due_date = datetime.strptime(due_date_str.split('.')[0], '%Y-%m-%dT%H:%M:%S') # 计算距离到期的时间 time_left = due_date - now if time_left.total_seconds() < 0: # 已过期 days_overdue = abs(int(time_left.total_seconds() / 86400)) item['days_overdue'] = days_overdue reminders['overdue'].append(item) elif time_left.total_seconds() < 86400: # 24小时内 # 判断是今天还是明天 if due_date.date() == now.date(): reminders['due_today'].append(item) else: reminders['due_soon'].append(item) elif due_date.date() == now.date(): # 今天到期(超过24小时的情况,比如现在凌晨,截止时间是晚上) reminders['due_today'].append(item) except (ValueError, AttributeError) as e: # 日期格式错误,跳过 continue # 统计总数 reminders['total'] = len(reminders['overdue']) + len(reminders['due_today']) + len(reminders['due_soon']) return reminders # ============ 备份操作 ============ def create_backup(self, manual: bool = False) -> Dict[str, Any]: """创建数据库备份""" import os # 确保备份目录存在 os.makedirs(BACKUP_DIR, exist_ok=True) now = datetime.now() backup_name = now.strftime('%Y-%m-%d_%H%M%S') if manual: backup_name += '_manual' backup_path = os.path.join(BACKUP_DIR, f'{backup_name}.db') # 复制数据库文件 shutil.copy2(self.db_path, backup_path) # 获取备份信息 backup_info = { 'name': backup_name, 'path': backup_path, 'size': os.path.getsize(backup_path), 'created_at': now.isoformat(), 'manual': manual, 'is_first_of_month': now.day == 1 } # 保存备份元数据 self._save_backup_meta(backup_info) # 清理旧备份 self._cleanup_old_backups() return backup_info def list_backups(self) -> List[Dict[str, Any]]: """列出所有备份""" import os if not os.path.exists(BACKUP_DIR): return [] # 读取备份元数据 meta_path = os.path.join(BACKUP_DIR, 'backup_meta.json') if os.path.exists(meta_path): with open(meta_path, 'r') as f: backups = json.load(f) else: # 从文件重建元数据 backups = [] for f in os.listdir(BACKUP_DIR): if f.endswith('.db'): path = os.path.join(BACKUP_DIR, f) backups.append({ 'name': f.replace('.db', ''), 'path': path, 'size': os.path.getsize(path), 'created_at': datetime.fromtimestamp(os.path.getmtime(path)).isoformat(), 'manual': '_manual' in f, 'is_first_of_month': self._is_first_of_month_filename(f) }) # 按时间倒序排列 backups.sort(key=lambda x: x['created_at'], reverse=True) return backups def restore_backup(self, backup_name: str) -> bool: """恢复备份""" import os backup_path = os.path.join(BACKUP_DIR, f'{backup_name}.db') if not os.path.exists(backup_path): return False # 先备份当前数据库(以防万一) current_backup = self.db_path + '.before_restore' shutil.copy2(self.db_path, current_backup) # 恢复备份 shutil.copy2(backup_path, self.db_path) return True def delete_backup(self, backup_name: str) -> bool: """删除备份""" import os backup_path = os.path.join(BACKUP_DIR, f'{backup_name}.db') if not os.path.exists(backup_path): return False os.remove(backup_path) # 更新元数据 self._remove_backup_meta(backup_name) return True def _save_backup_meta(self, backup_info: Dict[str, Any]): """保存备份元数据""" import os meta_path = os.path.join(BACKUP_DIR, 'backup_meta.json') # 读取现有元数据 backups = [] if os.path.exists(meta_path): with open(meta_path, 'r') as f: backups = json.load(f) # 添加新备份 backups.append(backup_info) # 保存 with open(meta_path, 'w') as f: json.dump(backups, f, indent=2) def _remove_backup_meta(self, backup_name: str): """从元数据中删除备份""" import os meta_path = os.path.join(BACKUP_DIR, 'backup_meta.json') if not os.path.exists(meta_path): return with open(meta_path, 'r') as f: backups = json.load(f) backups = [b for b in backups if b['name'] != backup_name] with open(meta_path, 'w') as f: json.dump(backups, f, indent=2) def _is_first_of_month_filename(self, filename: str) -> bool: """判断是否是每月第一天的备份""" # 格式:2026-04-01_040000.db 或 2026-05-01_... try: date_part = filename.split('_')[0] day = int(date_part.split('-')[2]) return day == 1 except: return False def _cleanup_old_backups(self): """清理旧备份:保留30天 + 每月第一天""" import os backups = self.list_backups() now = datetime.now() keep_paths = [] for backup in backups: backup_date = datetime.fromisoformat(backup['created_at']) days_old = (now - backup_date).days # 保留条件: # 1. 手动备份永久保留(最多10个) # 2. 30天内的备份 # 3. 每月第一天的备份 if backup['manual']: # 手动备份保留,但最多10个 manual_backups = [b for b in backups if b['manual']] if manual_backups.index(backup) < 10: keep_paths.append(backup['path']) else: self.delete_backup(backup['name']) elif days_old <= 30: keep_paths.append(backup['path']) elif backup['is_first_of_month']: # 每月第一天永久保留 keep_paths.append(backup['path']) else: # 删除 self.delete_backup(backup['name']) # 全局数据库实例 db = Database()