1314 lines
52 KiB
Python
1314 lines
52 KiB
Python
"""数据库操作"""
|
||
|
||
import sqlite3
|
||
import json
|
||
import shutil
|
||
import os
|
||
from datetime import datetime, timedelta
|
||
from typing import Optional, List, Dict, Any
|
||
from contextlib import contextmanager
|
||
|
||
from .config import DATABASE_URL, TODO_STATUS, PRIORITY_LEVELS
|
||
|
||
# 备份目录
|
||
BACKUP_DIR = os.path.join(os.path.dirname(DATABASE_URL), 'backups')
|
||
|
||
|
||
class Database:
|
||
"""SQLite数据库管理"""
|
||
|
||
def __init__(self, db_path: str = DATABASE_URL):
|
||
self.db_path = db_path
|
||
self._initialized = False
|
||
|
||
def _ensure_init(self):
|
||
"""确保数据库已初始化"""
|
||
if self._initialized:
|
||
return
|
||
self._init_db()
|
||
self._initialized = True
|
||
|
||
@contextmanager
|
||
def get_conn(self):
|
||
"""获取数据库连接"""
|
||
conn = sqlite3.connect(self.db_path, timeout=30.0)
|
||
conn.row_factory = sqlite3.Row
|
||
# 启用WAL模式,提高并发性能
|
||
conn.execute("PRAGMA journal_mode=WAL")
|
||
conn.execute("PRAGMA busy_timeout=30000")
|
||
try:
|
||
yield conn
|
||
finally:
|
||
conn.close()
|
||
|
||
def _init_db(self):
|
||
"""初始化数据库表"""
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
|
||
# 主内容表
|
||
cursor.execute("""
|
||
CREATE TABLE IF NOT EXISTS items (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
type TEXT NOT NULL DEFAULT 'text',
|
||
title TEXT,
|
||
content TEXT,
|
||
url TEXT,
|
||
source TEXT,
|
||
status TEXT DEFAULT 'pending',
|
||
priority TEXT DEFAULT 'medium',
|
||
due_date TEXT,
|
||
note TEXT,
|
||
is_starred INTEGER DEFAULT 0,
|
||
views INTEGER DEFAULT 0,
|
||
is_deleted INTEGER DEFAULT 0,
|
||
deleted_at TEXT,
|
||
created_at TEXT NOT NULL,
|
||
updated_at TEXT NOT NULL
|
||
)
|
||
""")
|
||
|
||
# 标签表
|
||
cursor.execute("""
|
||
CREATE TABLE IF NOT EXISTS tags (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
name TEXT NOT NULL UNIQUE,
|
||
color TEXT DEFAULT '#3498db',
|
||
created_at TEXT NOT NULL
|
||
)
|
||
""")
|
||
|
||
# 内容-标签关联表
|
||
cursor.execute("""
|
||
CREATE TABLE IF NOT EXISTS item_tags (
|
||
item_id INTEGER NOT NULL,
|
||
tag_id INTEGER NOT NULL,
|
||
PRIMARY KEY (item_id, tag_id),
|
||
FOREIGN KEY (item_id) REFERENCES items(id) ON DELETE CASCADE,
|
||
FOREIGN KEY (tag_id) REFERENCES tags(id) ON DELETE CASCADE
|
||
)
|
||
""")
|
||
|
||
# 邮箱表
|
||
cursor.execute("""
|
||
CREATE TABLE IF NOT EXISTS emails (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
email TEXT NOT NULL UNIQUE,
|
||
name TEXT,
|
||
created_at TEXT NOT NULL
|
||
)
|
||
""")
|
||
|
||
# 草稿表
|
||
cursor.execute("""
|
||
CREATE TABLE IF NOT EXISTS drafts (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
type TEXT NOT NULL DEFAULT 'text',
|
||
title TEXT,
|
||
content TEXT,
|
||
url TEXT,
|
||
source TEXT,
|
||
status TEXT DEFAULT 'pending',
|
||
priority TEXT DEFAULT 'medium',
|
||
due_date TEXT,
|
||
note TEXT,
|
||
tags TEXT,
|
||
is_starred INTEGER DEFAULT 0,
|
||
created_at TEXT NOT NULL,
|
||
updated_at TEXT NOT NULL
|
||
)
|
||
""")
|
||
|
||
# 邮件发送记录表
|
||
cursor.execute("""
|
||
CREATE TABLE IF NOT EXISTS email_logs (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
item_id INTEGER NOT NULL,
|
||
email TEXT NOT NULL,
|
||
sent_at TEXT NOT NULL,
|
||
success INTEGER DEFAULT 1,
|
||
FOREIGN KEY (item_id) REFERENCES items(id) ON DELETE CASCADE
|
||
)
|
||
""")
|
||
|
||
# 文件夹表
|
||
cursor.execute("""
|
||
CREATE TABLE IF NOT EXISTS folders (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
name TEXT NOT NULL,
|
||
type TEXT NOT NULL,
|
||
parent_id INTEGER DEFAULT NULL,
|
||
created_at TEXT NOT NULL,
|
||
updated_at TEXT NOT NULL,
|
||
FOREIGN KEY (parent_id) REFERENCES folders(id) ON DELETE CASCADE
|
||
)
|
||
""")
|
||
|
||
# 创建索引
|
||
cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_type ON items(type)")
|
||
cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_status ON items(status)")
|
||
cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_created ON items(created_at)")
|
||
cursor.execute("CREATE INDEX IF NOT EXISTS idx_item_tags_item ON item_tags(item_id)")
|
||
cursor.execute("CREATE INDEX IF NOT EXISTS idx_item_tags_tag ON item_tags(tag_id)")
|
||
|
||
# 检查并添加 is_starred 字段(兼容旧数据库)
|
||
try:
|
||
cursor.execute("SELECT is_starred FROM items LIMIT 1")
|
||
except sqlite3.OperationalError:
|
||
cursor.execute("ALTER TABLE items ADD COLUMN is_starred INTEGER DEFAULT 0")
|
||
|
||
# 创建 is_starred 索引(字段添加后再创建)
|
||
cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_starred ON items(is_starred)")
|
||
|
||
# 检查并添加 views 字段(兼容旧数据库)
|
||
try:
|
||
cursor.execute("SELECT views FROM items LIMIT 1")
|
||
except sqlite3.OperationalError:
|
||
cursor.execute("ALTER TABLE items ADD COLUMN views INTEGER DEFAULT 0")
|
||
|
||
# 检查并添加 is_deleted 和 deleted_at 字段(兼容旧数据库)
|
||
try:
|
||
cursor.execute("SELECT is_deleted FROM items LIMIT 1")
|
||
except sqlite3.OperationalError:
|
||
cursor.execute("ALTER TABLE items ADD COLUMN is_deleted INTEGER DEFAULT 0")
|
||
try:
|
||
cursor.execute("SELECT deleted_at FROM items LIMIT 1")
|
||
except sqlite3.OperationalError:
|
||
cursor.execute("ALTER TABLE items ADD COLUMN deleted_at TEXT")
|
||
|
||
# 检查并添加 folder_id 字段(兼容旧数据库)
|
||
try:
|
||
cursor.execute("SELECT folder_id FROM items LIMIT 1")
|
||
except sqlite3.OperationalError:
|
||
cursor.execute("ALTER TABLE items ADD COLUMN folder_id INTEGER DEFAULT NULL")
|
||
|
||
# 创建 folder 相关索引
|
||
cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_folder ON items(folder_id)")
|
||
cursor.execute("CREATE INDEX IF NOT EXISTS idx_folders_type ON folders(type)")
|
||
cursor.execute("CREATE INDEX IF NOT EXISTS idx_folders_parent ON folders(parent_id)")
|
||
|
||
# 检查并添加 is_pinned 字段(置顶功能,兼容旧数据库)
|
||
try:
|
||
cursor.execute("SELECT is_pinned FROM items LIMIT 1")
|
||
except sqlite3.OperationalError:
|
||
cursor.execute("ALTER TABLE items ADD COLUMN is_pinned INTEGER DEFAULT 0")
|
||
|
||
# 创建 is_pinned 索引
|
||
cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_pinned ON items(is_pinned)")
|
||
|
||
# 待办事务表(关联到文本类别的items)
|
||
cursor.execute("""
|
||
CREATE TABLE IF NOT EXISTS todo_events (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
item_id INTEGER NOT NULL,
|
||
content TEXT NOT NULL,
|
||
remaining_days INTEGER DEFAULT 1,
|
||
is_completed INTEGER DEFAULT 0,
|
||
created_at TEXT NOT NULL,
|
||
updated_at TEXT NOT NULL,
|
||
FOREIGN KEY (item_id) REFERENCES items(id) ON DELETE CASCADE
|
||
)
|
||
""")
|
||
cursor.execute("CREATE INDEX IF NOT EXISTS idx_todo_events_item ON todo_events(item_id)")
|
||
cursor.execute("CREATE INDEX IF NOT EXISTS idx_todo_events_created ON todo_events(created_at)")
|
||
|
||
conn.commit()
|
||
|
||
# ============ Item 操作 ============
|
||
|
||
def create_item(self, type: str = "text", title: str = None, content: str = None,
|
||
url: str = None, source: str = None, status: str = "pending",
|
||
priority: str = "medium", due_date: str = None, note: str = None,
|
||
tags: List[str] = None, is_starred: bool = False,
|
||
folder_id: int = None) -> int:
|
||
"""创建新条目"""
|
||
self._ensure_init()
|
||
now = datetime.now().isoformat()
|
||
|
||
# 验证状态
|
||
if type == "todo" and status not in TODO_STATUS:
|
||
status = "pending"
|
||
if priority not in PRIORITY_LEVELS:
|
||
priority = "medium"
|
||
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
INSERT INTO items (type, title, content, url, source, status, priority, due_date, note, is_starred, folder_id, created_at, updated_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""", (type, title, content, url, source, status, priority, due_date, note, 1 if is_starred else 0, folder_id, now, now))
|
||
item_id = cursor.lastrowid
|
||
|
||
# 添加标签
|
||
if tags:
|
||
self._add_tags_to_item(conn, item_id, tags)
|
||
|
||
conn.commit()
|
||
return item_id
|
||
|
||
def get_item(self, item_id: int) -> Optional[Dict[str, Any]]:
|
||
"""获取单个条目"""
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT * FROM items WHERE id = ?", (item_id,))
|
||
row = cursor.fetchone()
|
||
if not row:
|
||
return None
|
||
|
||
item = dict(row)
|
||
item['tags'] = self._get_item_tags(conn, item_id)
|
||
return item
|
||
|
||
def list_items(self, type: str = None, status: str = None, tag: str = None,
|
||
keyword: str = None, starred: bool = None, folder_id: int = None,
|
||
sort_by: str = None, sort_order: str = None,
|
||
limit: int = 50, offset: int = 0) -> List[Dict[str, Any]]:
|
||
"""列出条目
|
||
|
||
sort_by: created_at, updated_at
|
||
sort_order: desc, asc
|
||
folder_id: 文件夹ID,None表示不限制,-1表示未分类(folder_id为null)
|
||
"""
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
|
||
query = "SELECT DISTINCT i.* FROM items i"
|
||
params = []
|
||
conditions = []
|
||
|
||
# 只显示未删除的数据
|
||
conditions.append("i.is_deleted = 0")
|
||
|
||
# 标签过滤需要JOIN
|
||
if tag:
|
||
query += " JOIN item_tags it ON i.id = it.item_id JOIN tags t ON it.tag_id = t.id"
|
||
conditions.append("t.name = ?")
|
||
params.append(tag)
|
||
|
||
if type:
|
||
conditions.append("i.type = ?")
|
||
params.append(type)
|
||
|
||
if status:
|
||
conditions.append("i.status = ?")
|
||
params.append(status)
|
||
|
||
if starred is not None:
|
||
conditions.append("i.is_starred = ?")
|
||
params.append(1 if starred else 0)
|
||
|
||
# 文件夹过滤
|
||
if folder_id is not None:
|
||
if folder_id == -1:
|
||
# -1 表示未分类(folder_id 为 null)
|
||
conditions.append("i.folder_id IS NULL")
|
||
else:
|
||
conditions.append("i.folder_id = ?")
|
||
params.append(folder_id)
|
||
|
||
if keyword:
|
||
conditions.append("(i.title LIKE ? OR i.content LIKE ? OR i.note LIKE ?)")
|
||
keyword_pattern = f"%{keyword}%"
|
||
params.extend([keyword_pattern, keyword_pattern, keyword_pattern])
|
||
|
||
if conditions:
|
||
query += " WHERE " + " AND ".join(conditions)
|
||
|
||
# 排序逻辑:置顶 > 关注 > 创建时间
|
||
if sort_by == 'updated_at':
|
||
order_field = 'i.updated_at'
|
||
elif sort_by == 'created_at':
|
||
order_field = 'i.created_at'
|
||
else:
|
||
# 默认:置顶优先 + 关注优先 + 创建时间降序
|
||
order_field = 'i.created_at'
|
||
|
||
# 确定排序方向
|
||
if sort_order == 'asc':
|
||
order_dir = 'ASC'
|
||
elif sort_order == 'desc':
|
||
order_dir = 'DESC'
|
||
else:
|
||
order_dir = 'DESC' # 默认降序
|
||
|
||
# 如果有指定排序字段,按该字段排序,但置顶始终优先
|
||
if sort_by:
|
||
query += f" ORDER BY i.is_pinned DESC, {order_field} {order_dir} LIMIT ? OFFSET ?"
|
||
else:
|
||
# 默认:置顶 > 关注 > 创建时间降序
|
||
query += f" ORDER BY i.is_pinned DESC, i.is_starred DESC, i.created_at DESC LIMIT ? OFFSET ?"
|
||
params.extend([limit, offset])
|
||
|
||
cursor.execute(query, params)
|
||
items = []
|
||
for row in cursor.fetchall():
|
||
item = dict(row)
|
||
item['tags'] = self._get_item_tags(conn, item['id'])
|
||
items.append(item)
|
||
|
||
return items
|
||
|
||
def count_items(self, type: str = None, status: str = None, tag: str = None,
|
||
keyword: str = None, starred: bool = None, folder_id: int = None) -> int:
|
||
"""计算符合条件的条目总数"""
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
|
||
query = "SELECT COUNT(DISTINCT i.id) as count FROM items i"
|
||
params = []
|
||
conditions = []
|
||
|
||
# 标签过滤需要JOIN
|
||
if tag:
|
||
query += " JOIN item_tags it ON i.id = it.item_id JOIN tags t ON it.tag_id = t.id"
|
||
conditions.append("t.name = ?")
|
||
params.append(tag)
|
||
|
||
if type:
|
||
conditions.append("i.type = ?")
|
||
params.append(type)
|
||
|
||
if status:
|
||
conditions.append("i.status = ?")
|
||
params.append(status)
|
||
|
||
if starred is not None:
|
||
conditions.append("i.is_starred = ?")
|
||
params.append(1 if starred else 0)
|
||
|
||
if keyword:
|
||
conditions.append("(i.title LIKE ? OR i.content LIKE ? OR i.note LIKE ?)")
|
||
keyword_pattern = f"%{keyword}%"
|
||
params.extend([keyword_pattern, keyword_pattern, keyword_pattern])
|
||
|
||
# 文件夹过滤
|
||
if folder_id is not None:
|
||
if folder_id == -1:
|
||
# 未分类:folder_id为NULL
|
||
conditions.append("i.folder_id IS NULL")
|
||
else:
|
||
conditions.append("i.folder_id = ?")
|
||
params.append(folder_id)
|
||
|
||
if conditions:
|
||
query += " WHERE " + " AND ".join(conditions)
|
||
|
||
cursor.execute(query, params)
|
||
return cursor.fetchone()['count']
|
||
|
||
def update_item(self, item_id: int, **kwargs) -> bool:
|
||
"""更新条目"""
|
||
allowed_fields = ['type', 'title', 'content', 'url', 'source', 'status', 'priority', 'due_date', 'note', 'is_starred', 'folder_id']
|
||
update_fields = {k: v for k, v in kwargs.items() if k in allowed_fields}
|
||
|
||
# 只有 tags 变化也算有效更新
|
||
if not update_fields and 'tags' not in kwargs:
|
||
return False
|
||
|
||
now = datetime.now().isoformat()
|
||
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
|
||
# 检查条目是否存在
|
||
cursor.execute("SELECT id FROM items WHERE id = ?", (item_id,))
|
||
if not cursor.fetchone():
|
||
return False
|
||
|
||
if update_fields:
|
||
set_clause = ", ".join(f"{k} = ?" for k in update_fields.keys())
|
||
set_clause += ", updated_at = ?"
|
||
values = list(update_fields.values()) + [now, item_id]
|
||
cursor.execute(f"UPDATE items SET {set_clause} WHERE id = ?", values)
|
||
|
||
if 'tags' in kwargs:
|
||
# 先删除旧标签关联
|
||
cursor.execute("DELETE FROM item_tags WHERE item_id = ?", (item_id,))
|
||
# 添加新标签
|
||
if kwargs['tags']:
|
||
self._add_tags_to_item(conn, item_id, kwargs['tags'])
|
||
|
||
conn.commit()
|
||
return True
|
||
|
||
def delete_item(self, item_id: int) -> bool:
|
||
"""删除条目(移动到回收站)"""
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
now = datetime.now().isoformat()
|
||
cursor.execute("UPDATE items SET is_deleted = 1, deleted_at = ? WHERE id = ?", (now, item_id))
|
||
conn.commit()
|
||
return cursor.rowcount > 0
|
||
|
||
def list_trash(self, limit: int = 50, offset: int = 0) -> List[Dict[str, Any]]:
|
||
"""列出回收站数据"""
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT * FROM items WHERE is_deleted = 1 ORDER BY deleted_at DESC LIMIT ? OFFSET ?", (limit, offset))
|
||
items = []
|
||
for row in cursor.fetchall():
|
||
item = dict(row)
|
||
item['tags'] = self._get_item_tags(conn, item['id'])
|
||
items.append(item)
|
||
return items
|
||
|
||
def count_trash(self) -> int:
|
||
"""计算回收站数据总数"""
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT COUNT(*) as count FROM items WHERE is_deleted = 1")
|
||
return cursor.fetchone()['count']
|
||
|
||
def restore_item(self, item_id: int) -> bool:
|
||
"""从回收站恢复数据"""
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("UPDATE items SET is_deleted = 0, deleted_at = NULL WHERE id = ?", (item_id,))
|
||
conn.commit()
|
||
return cursor.rowcount > 0
|
||
|
||
def delete_permanently(self, item_id: int) -> bool:
|
||
"""彻底删除数据(从数据库中删除)"""
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
# 删除标签关联
|
||
cursor.execute("DELETE FROM item_tags WHERE item_id = ?", (item_id,))
|
||
# 删除邮件发送记录
|
||
cursor.execute("DELETE FROM email_logs WHERE item_id = ?", (item_id,))
|
||
# 删除数据
|
||
cursor.execute("DELETE FROM items WHERE id = ?", (item_id,))
|
||
conn.commit()
|
||
return cursor.rowcount > 0
|
||
|
||
def empty_trash(self) -> int:
|
||
"""清空回收站"""
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
# 获取所有回收站数据ID
|
||
cursor.execute("SELECT id FROM items WHERE is_deleted = 1")
|
||
ids = [row['id'] for row in cursor.fetchall()]
|
||
|
||
# 删除所有关联数据
|
||
for item_id in ids:
|
||
cursor.execute("DELETE FROM item_tags WHERE item_id = ?", (item_id,))
|
||
cursor.execute("DELETE FROM email_logs WHERE item_id = ?", (item_id,))
|
||
|
||
# 删除所有回收站数据
|
||
cursor.execute("DELETE FROM items WHERE is_deleted = 1")
|
||
deleted_count = cursor.rowcount
|
||
conn.commit()
|
||
return deleted_count
|
||
|
||
def toggle_star(self, item_id: int) -> bool:
|
||
"""切换重点关注状态"""
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
# 先获取当前状态
|
||
cursor.execute("SELECT is_starred FROM items WHERE id = ?", (item_id,))
|
||
row = cursor.fetchone()
|
||
if not row:
|
||
return False
|
||
|
||
new_status = 0 if row['is_starred'] else 1
|
||
now = datetime.now().isoformat()
|
||
cursor.execute("UPDATE items SET is_starred = ?, updated_at = ? WHERE id = ?", (new_status, now, item_id))
|
||
conn.commit()
|
||
return True
|
||
|
||
def set_star(self, item_id: int, starred: bool = True) -> bool:
|
||
"""设置重点关注状态"""
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
now = datetime.now().isoformat()
|
||
cursor.execute("UPDATE items SET is_starred = ?, updated_at = ? WHERE id = ?", (1 if starred else 0, now, item_id))
|
||
conn.commit()
|
||
return cursor.rowcount > 0
|
||
|
||
def toggle_pin(self, item_id: int) -> bool:
|
||
"""切换置顶状态"""
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
# 先获取当前状态
|
||
cursor.execute("SELECT is_pinned FROM items WHERE id = ?", (item_id,))
|
||
row = cursor.fetchone()
|
||
if not row:
|
||
return False
|
||
|
||
new_status = 0 if row['is_pinned'] else 1
|
||
now = datetime.now().isoformat()
|
||
cursor.execute("UPDATE items SET is_pinned = ?, updated_at = ? WHERE id = ?", (new_status, now, item_id))
|
||
conn.commit()
|
||
return True
|
||
|
||
def set_pin(self, item_id: int, pinned: bool = True) -> bool:
|
||
"""设置置顶状态"""
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
now = datetime.now().isoformat()
|
||
cursor.execute("UPDATE items SET is_pinned = ?, updated_at = ? WHERE id = ?", (1 if pinned else 0, now, item_id))
|
||
conn.commit()
|
||
return cursor.rowcount > 0
|
||
|
||
def increment_views(self, item_id: int) -> bool:
|
||
"""增加阅读数"""
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("UPDATE items SET views = views + 1 WHERE id = ?", (item_id,))
|
||
conn.commit()
|
||
return cursor.rowcount > 0
|
||
|
||
def move_item_to_folder(self, item_id: int, folder_id: int) -> bool:
|
||
"""将条目移动到文件夹"""
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
now = datetime.now().isoformat()
|
||
cursor.execute("UPDATE items SET folder_id = ?, updated_at = ? WHERE id = ?", (folder_id, now, item_id))
|
||
conn.commit()
|
||
return cursor.rowcount > 0
|
||
|
||
# ============ Folder 文件夹操作 ============
|
||
|
||
def create_folder(self, name: str, type: str, parent_id: int = None) -> int:
|
||
"""创建文件夹
|
||
|
||
Args:
|
||
name: 文件夹名称
|
||
type: 类别类型(text/link/column/todo)
|
||
parent_id: 父文件夹ID(目前不支持嵌套,预留)
|
||
"""
|
||
self._ensure_init()
|
||
now = datetime.now().isoformat()
|
||
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
INSERT INTO folders (name, type, parent_id, created_at, updated_at)
|
||
VALUES (?, ?, ?, ?, ?)
|
||
""", (name, type, parent_id, now, now))
|
||
folder_id = cursor.lastrowid
|
||
conn.commit()
|
||
return folder_id
|
||
|
||
def get_folder(self, folder_id: int) -> Optional[Dict[str, Any]]:
|
||
"""获取文件夹信息"""
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT * FROM folders WHERE id = ?", (folder_id,))
|
||
row = cursor.fetchone()
|
||
if not row:
|
||
return None
|
||
folder = dict(row)
|
||
# 获取文件夹内条目数量
|
||
cursor.execute("SELECT COUNT(*) FROM items WHERE folder_id = ? AND is_deleted = 0", (folder_id,))
|
||
folder['item_count'] = cursor.fetchone()[0]
|
||
return folder
|
||
|
||
def list_folders(self, type: str = None) -> List[Dict[str, Any]]:
|
||
"""列出文件夹
|
||
|
||
Args:
|
||
type: 按类型过滤,None表示列出所有
|
||
"""
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
if type:
|
||
cursor.execute("SELECT * FROM folders WHERE type = ? ORDER BY created_at DESC", (type,))
|
||
else:
|
||
cursor.execute("SELECT * FROM folders ORDER BY created_at DESC")
|
||
|
||
folders = []
|
||
for row in cursor.fetchall():
|
||
folder = dict(row)
|
||
# 获取文件夹内条目数量
|
||
cursor.execute("SELECT COUNT(*) FROM items WHERE folder_id = ? AND is_deleted = 0", (folder['id'],))
|
||
folder['item_count'] = cursor.fetchone()[0]
|
||
folders.append(folder)
|
||
|
||
return folders
|
||
|
||
def update_folder(self, folder_id: int, name: str = None) -> bool:
|
||
"""更新文件夹"""
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
now = datetime.now().isoformat()
|
||
|
||
if name:
|
||
cursor.execute("UPDATE folders SET name = ?, updated_at = ? WHERE id = ?", (name, now, folder_id))
|
||
conn.commit()
|
||
return cursor.rowcount > 0
|
||
return False
|
||
|
||
def delete_folder(self, folder_id: int, move_items_to_root: bool = True) -> bool:
|
||
"""删除文件夹
|
||
|
||
Args:
|
||
folder_id: 文件夹ID
|
||
move_items_to_root: 是否将条目移出文件夹(到未分类),True则移动,False则一起删除
|
||
"""
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
|
||
if move_items_to_root:
|
||
# 将条目移出文件夹(folder_id设为null)
|
||
cursor.execute("UPDATE items SET folder_id = NULL WHERE folder_id = ?", (folder_id,))
|
||
else:
|
||
# 删除文件夹内的所有条目
|
||
cursor.execute("DELETE FROM items WHERE folder_id = ?", (folder_id,))
|
||
|
||
# 删除文件夹
|
||
cursor.execute("DELETE FROM folders WHERE id = ?", (folder_id,))
|
||
conn.commit()
|
||
return cursor.rowcount > 0
|
||
|
||
def count_items_by_folder(self, folder_id: int) -> int:
|
||
"""统计文件夹内的条目数量"""
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT COUNT(*) FROM items WHERE folder_id = ? AND is_deleted = 0", (folder_id,))
|
||
return cursor.fetchone()[0]
|
||
|
||
# ============ Draft 草稿操作 ============
|
||
|
||
def save_draft(self, type: str = "text", title: str = None, content: str = None,
|
||
url: str = None, source: str = None, status: str = "pending",
|
||
priority: str = "medium", due_date: str = None, note: str = None,
|
||
tags: str = None, is_starred: bool = False) -> int:
|
||
"""保存草稿"""
|
||
self._ensure_init()
|
||
now = datetime.now().isoformat()
|
||
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
INSERT INTO drafts (type, title, content, url, source, status, priority, due_date, note, tags, is_starred, created_at, updated_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""", (type, title, content, url, source, status, priority, due_date, note, tags, 1 if is_starred else 0, now, now))
|
||
draft_id = cursor.lastrowid
|
||
conn.commit()
|
||
return draft_id
|
||
|
||
def update_draft(self, draft_id: int, **kwargs) -> bool:
|
||
"""更新草稿"""
|
||
allowed_fields = ['type', 'title', 'content', 'url', 'source', 'status', 'priority', 'due_date', 'note', 'tags', 'is_starred']
|
||
update_fields = {k: v for k, v in kwargs.items() if k in allowed_fields}
|
||
|
||
if not update_fields:
|
||
return False
|
||
|
||
now = datetime.now().isoformat()
|
||
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT id FROM drafts WHERE id = ?", (draft_id,))
|
||
if not cursor.fetchone():
|
||
return False
|
||
|
||
set_clause = ", ".join(f"{k} = ?" for k in update_fields.keys())
|
||
set_clause += ", updated_at = ?"
|
||
values = list(update_fields.values()) + [now, draft_id]
|
||
cursor.execute(f"UPDATE drafts SET {set_clause} WHERE id = ?", values)
|
||
conn.commit()
|
||
return True
|
||
|
||
def list_drafts(self, limit: int = 50, offset: int = 0) -> List[Dict[str, Any]]:
|
||
"""列出草稿"""
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT * FROM drafts ORDER BY updated_at DESC LIMIT ? OFFSET ?", (limit, offset))
|
||
return [dict(row) for row in cursor.fetchall()]
|
||
|
||
def count_drafts(self) -> int:
|
||
"""计算草稿总数"""
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT COUNT(*) as count FROM drafts")
|
||
return cursor.fetchone()['count']
|
||
|
||
def get_draft(self, draft_id: int) -> Optional[Dict[str, Any]]:
|
||
"""获取单个草稿"""
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT * FROM drafts WHERE id = ?", (draft_id,))
|
||
row = cursor.fetchone()
|
||
return dict(row) if row else None
|
||
|
||
def delete_draft(self, draft_id: int) -> bool:
|
||
"""删除草稿"""
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("DELETE FROM drafts WHERE id = ?", (draft_id,))
|
||
conn.commit()
|
||
return cursor.rowcount > 0
|
||
|
||
def draft_to_item(self, draft_id: int) -> Optional[int]:
|
||
"""将草稿转为正式条目"""
|
||
draft = self.get_draft(draft_id)
|
||
if not draft:
|
||
return None
|
||
|
||
# 创建条目
|
||
tags_list = draft['tags'].split(',') if draft['tags'] else []
|
||
tags_list = [t.strip() for t in tags_list if t.strip()]
|
||
|
||
item_id = self.create_item(
|
||
type=draft['type'],
|
||
title=draft['title'],
|
||
content=draft['content'],
|
||
url=draft['url'],
|
||
source=draft['source'],
|
||
status=draft['status'],
|
||
priority=draft['priority'],
|
||
due_date=draft['due_date'],
|
||
note=draft['note'],
|
||
tags=tags_list,
|
||
is_starred=draft['is_starred']
|
||
)
|
||
|
||
# 删除草稿
|
||
if item_id:
|
||
self.delete_draft(draft_id)
|
||
|
||
return item_id
|
||
|
||
# ============ Tag 操作 ============
|
||
|
||
def create_tag(self, name: str, color: str = "#3498db") -> int:
|
||
"""创建标签"""
|
||
now = datetime.now().isoformat()
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
try:
|
||
cursor.execute("INSERT INTO tags (name, color, created_at) VALUES (?, ?, ?)",
|
||
(name, color, now))
|
||
conn.commit()
|
||
return cursor.lastrowid
|
||
except sqlite3.IntegrityError:
|
||
# 标签已存在
|
||
cursor.execute("SELECT id FROM tags WHERE name = ?", (name,))
|
||
return cursor.fetchone()['id']
|
||
|
||
def list_tags(self) -> List[Dict[str, Any]]:
|
||
"""列出所有标签"""
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
SELECT t.*, COUNT(it.item_id) as item_count
|
||
FROM tags t
|
||
LEFT JOIN item_tags it ON t.id = it.tag_id
|
||
GROUP BY t.id
|
||
ORDER BY t.name
|
||
""")
|
||
return [dict(row) for row in cursor.fetchall()]
|
||
|
||
def update_tag(self, tag_id: int, name: str) -> bool:
|
||
"""更新标签名称"""
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
# 检查名称是否已存在(排除自己)
|
||
cursor.execute("SELECT id FROM tags WHERE name = ? AND id != ?", (name, tag_id))
|
||
if cursor.fetchone():
|
||
return False # 名称已存在
|
||
cursor.execute("UPDATE tags SET name = ? WHERE id = ?", (name, tag_id))
|
||
conn.commit()
|
||
return cursor.rowcount > 0
|
||
|
||
def delete_tag(self, tag_id: int = None, name: str = None) -> bool:
|
||
"""删除标签"""
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
if name:
|
||
cursor.execute("DELETE FROM tags WHERE name = ?", (name,))
|
||
elif tag_id:
|
||
cursor.execute("DELETE FROM tags WHERE id = ?", (tag_id,))
|
||
conn.commit()
|
||
return cursor.rowcount > 0
|
||
|
||
# ============ 辅助方法 ============
|
||
|
||
def _add_tags_to_item(self, conn, item_id: int, tags: List[str]):
|
||
"""为条目添加标签"""
|
||
cursor = conn.cursor()
|
||
for tag_name in tags:
|
||
tag_name = tag_name.strip()
|
||
if not tag_name:
|
||
continue
|
||
# 确保标签存在 - 使用同一个连接
|
||
cursor.execute("SELECT id FROM tags WHERE name = ?", (tag_name,))
|
||
row = cursor.fetchone()
|
||
if row:
|
||
tag_id = row['id']
|
||
else:
|
||
# 创建新标签
|
||
now = datetime.now().isoformat()
|
||
cursor.execute("INSERT INTO tags (name, color, created_at) VALUES (?, '#3498db', ?)",
|
||
(tag_name, now))
|
||
tag_id = cursor.lastrowid
|
||
# 创建关联
|
||
cursor.execute("INSERT OR IGNORE INTO item_tags (item_id, tag_id) VALUES (?, ?)",
|
||
(item_id, tag_id))
|
||
|
||
def _get_item_tags(self, conn, item_id: int) -> List[str]:
|
||
"""获取条目的标签"""
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
SELECT t.name FROM tags t
|
||
JOIN item_tags it ON t.id = it.tag_id
|
||
WHERE it.item_id = ?
|
||
ORDER BY t.name
|
||
""", (item_id,))
|
||
return [row['name'] for row in cursor.fetchall()]
|
||
|
||
# ============ Email 操作 ============
|
||
|
||
def create_email(self, email: str, name: str = None) -> int:
|
||
"""创建邮箱"""
|
||
now = datetime.now().isoformat()
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
try:
|
||
cursor.execute("INSERT INTO emails (email, name, created_at) VALUES (?, ?, ?)",
|
||
(email, name, now))
|
||
conn.commit()
|
||
return cursor.lastrowid
|
||
except sqlite3.IntegrityError:
|
||
# 邮箱已存在,返回已有ID
|
||
cursor.execute("SELECT id FROM emails WHERE email = ?", (email,))
|
||
return cursor.fetchone()['id']
|
||
|
||
def list_emails(self) -> List[Dict[str, Any]]:
|
||
"""列出所有邮箱"""
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT * FROM emails ORDER BY created_at DESC")
|
||
return [dict(row) for row in cursor.fetchall()]
|
||
|
||
def get_email(self, email_id: int) -> Optional[Dict[str, Any]]:
|
||
"""获取单个邮箱"""
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT * FROM emails WHERE id = ?", (email_id,))
|
||
row = cursor.fetchone()
|
||
return dict(row) if row else None
|
||
|
||
def update_email(self, email_id: int, email: str = None, name: str = None) -> bool:
|
||
"""更新邮箱"""
|
||
now = datetime.now().isoformat()
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
if email:
|
||
# 检查邮箱是否已存在(排除自己)
|
||
cursor.execute("SELECT id FROM emails WHERE email = ? AND id != ?", (email, email_id))
|
||
if cursor.fetchone():
|
||
return False # 邎箱已存在
|
||
if email and name:
|
||
cursor.execute("UPDATE emails SET email = ?, name = ? WHERE id = ?", (email, name, email_id))
|
||
elif email:
|
||
cursor.execute("UPDATE emails SET email = ? WHERE id = ?", (email, email_id))
|
||
elif name:
|
||
cursor.execute("UPDATE emails SET name = ? WHERE id = ?", (name, email_id))
|
||
conn.commit()
|
||
return cursor.rowcount > 0
|
||
|
||
def delete_email(self, email_id: int) -> bool:
|
||
"""删除邮箱"""
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("DELETE FROM emails WHERE id = ?", (email_id,))
|
||
conn.commit()
|
||
return cursor.rowcount > 0
|
||
|
||
# ============ 邮件日志 操作 ============
|
||
|
||
def log_email_send(self, item_id: int, email: str, success: bool = True) -> int:
|
||
"""记录邮件发送"""
|
||
now = datetime.now().isoformat()
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
INSERT INTO email_logs (item_id, email, sent_at, success)
|
||
VALUES (?, ?, ?, ?)
|
||
""", (item_id, email, now, 1 if success else 0))
|
||
conn.commit()
|
||
return cursor.lastrowid
|
||
|
||
def get_email_logs(self, item_id: int) -> List[Dict[str, Any]]:
|
||
"""获取收藏的邮件发送记录"""
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
SELECT * FROM email_logs
|
||
WHERE item_id = ?
|
||
ORDER BY sent_at DESC
|
||
""", (item_id,))
|
||
return [dict(row) for row in cursor.fetchall()]
|
||
|
||
def stats(self) -> Dict[str, Any]:
|
||
"""获取统计信息"""
|
||
self._ensure_init()
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
|
||
stats = {}
|
||
|
||
# 总数
|
||
cursor.execute("SELECT COUNT(*) as count FROM items WHERE is_deleted = 0")
|
||
stats['total'] = cursor.fetchone()['count']
|
||
|
||
# 按类型统计
|
||
cursor.execute("SELECT type, COUNT(*) as count FROM items WHERE is_deleted = 0 GROUP BY type")
|
||
stats['by_type'] = {row['type']: row['count'] for row in cursor.fetchall()}
|
||
|
||
# 未读数量统计(views = 0)
|
||
cursor.execute("SELECT COUNT(*) as count FROM items WHERE is_deleted = 0 AND (views IS NULL OR views = 0)")
|
||
stats['unread'] = cursor.fetchone()['count']
|
||
|
||
# 按类型统计未读数量
|
||
cursor.execute("SELECT type, COUNT(*) as count FROM items WHERE is_deleted = 0 AND (views IS NULL OR views = 0) GROUP BY type")
|
||
stats['unread_by_type'] = {row['type']: row['count'] for row in cursor.fetchall()}
|
||
|
||
# 待办状态统计
|
||
cursor.execute("SELECT status, COUNT(*) as count FROM items WHERE type = 'todo' AND is_deleted = 0 GROUP BY status")
|
||
stats['todo_status'] = {row['status']: row['count'] for row in cursor.fetchall()}
|
||
|
||
# 标签数
|
||
cursor.execute("SELECT COUNT(*) as count FROM tags")
|
||
stats['tags'] = cursor.fetchone()['count']
|
||
|
||
return stats
|
||
|
||
# ============ 提醒相关 ============
|
||
|
||
def get_reminders(self) -> Dict[str, Any]:
|
||
"""获取提醒信息:即将到期和已过期的待办"""
|
||
self._ensure_init()
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
now = datetime.now()
|
||
|
||
reminders = {
|
||
'overdue': [], # 已过期
|
||
'due_today': [], # 今天到期
|
||
'due_soon': [] # 24小时内到期(不含今天)
|
||
}
|
||
|
||
# 查询未完成的待办(有截止日期的)
|
||
cursor.execute("""
|
||
SELECT * FROM items
|
||
WHERE type = 'todo'
|
||
AND status != 'completed'
|
||
AND due_date IS NOT NULL
|
||
AND due_date != ''
|
||
ORDER BY due_date ASC
|
||
""")
|
||
|
||
for row in cursor.fetchall():
|
||
item = dict(row)
|
||
item['tags'] = self._get_item_tags(conn, item['id'])
|
||
|
||
try:
|
||
due_date_str = item['due_date']
|
||
# 支持多种日期格式
|
||
if 'T' in due_date_str:
|
||
# ISO 格式:2026-04-16T14:30
|
||
due_date = datetime.strptime(due_date_str[:16], '%Y-%m-%dT%H:%M')
|
||
elif len(due_date_str) == 10:
|
||
# 只有日期:2026-04-16,视为当天 23:59:59
|
||
due_date = datetime.strptime(due_date_str, '%Y-%m-%d').replace(hour=23, minute=59, second=59)
|
||
else:
|
||
# 其他格式,尝试解析
|
||
due_date = datetime.strptime(due_date_str.split('.')[0], '%Y-%m-%dT%H:%M:%S')
|
||
|
||
# 计算距离到期的时间
|
||
time_left = due_date - now
|
||
|
||
if time_left.total_seconds() < 0:
|
||
# 已过期
|
||
days_overdue = abs(int(time_left.total_seconds() / 86400))
|
||
item['days_overdue'] = days_overdue
|
||
reminders['overdue'].append(item)
|
||
elif time_left.total_seconds() < 86400: # 24小时内
|
||
# 判断是今天还是明天
|
||
if due_date.date() == now.date():
|
||
reminders['due_today'].append(item)
|
||
else:
|
||
reminders['due_soon'].append(item)
|
||
elif due_date.date() == now.date():
|
||
# 今天到期(超过24小时的情况,比如现在凌晨,截止时间是晚上)
|
||
reminders['due_today'].append(item)
|
||
except (ValueError, AttributeError) as e:
|
||
# 日期格式错误,跳过
|
||
continue
|
||
|
||
# 统计总数
|
||
reminders['total'] = len(reminders['overdue']) + len(reminders['due_today']) + len(reminders['due_soon'])
|
||
|
||
return reminders
|
||
|
||
# ============ 备份操作 ============
|
||
|
||
def create_backup(self, manual: bool = False) -> Dict[str, Any]:
|
||
"""创建数据库备份"""
|
||
import os
|
||
|
||
# 确保备份目录存在
|
||
os.makedirs(BACKUP_DIR, exist_ok=True)
|
||
|
||
now = datetime.now()
|
||
backup_name = now.strftime('%Y-%m-%d_%H%M%S')
|
||
if manual:
|
||
backup_name += '_manual'
|
||
backup_path = os.path.join(BACKUP_DIR, f'{backup_name}.db')
|
||
|
||
# 复制数据库文件
|
||
shutil.copy2(self.db_path, backup_path)
|
||
|
||
# 获取备份信息
|
||
backup_info = {
|
||
'name': backup_name,
|
||
'path': backup_path,
|
||
'size': os.path.getsize(backup_path),
|
||
'created_at': now.isoformat(),
|
||
'manual': manual,
|
||
'is_first_of_month': now.day == 1
|
||
}
|
||
|
||
# 保存备份元数据
|
||
self._save_backup_meta(backup_info)
|
||
|
||
# 清理旧备份
|
||
self._cleanup_old_backups()
|
||
|
||
return backup_info
|
||
|
||
def list_backups(self) -> List[Dict[str, Any]]:
|
||
"""列出所有备份"""
|
||
import os
|
||
|
||
if not os.path.exists(BACKUP_DIR):
|
||
return []
|
||
|
||
# 读取备份元数据
|
||
meta_path = os.path.join(BACKUP_DIR, 'backup_meta.json')
|
||
if os.path.exists(meta_path):
|
||
with open(meta_path, 'r') as f:
|
||
backups = json.load(f)
|
||
else:
|
||
# 从文件重建元数据
|
||
backups = []
|
||
for f in os.listdir(BACKUP_DIR):
|
||
if f.endswith('.db'):
|
||
path = os.path.join(BACKUP_DIR, f)
|
||
backups.append({
|
||
'name': f.replace('.db', ''),
|
||
'path': path,
|
||
'size': os.path.getsize(path),
|
||
'created_at': datetime.fromtimestamp(os.path.getmtime(path)).isoformat(),
|
||
'manual': '_manual' in f,
|
||
'is_first_of_month': self._is_first_of_month_filename(f)
|
||
})
|
||
|
||
# 按时间倒序排列
|
||
backups.sort(key=lambda x: x['created_at'], reverse=True)
|
||
|
||
return backups
|
||
|
||
def restore_backup(self, backup_name: str) -> bool:
|
||
"""恢复备份"""
|
||
import os
|
||
|
||
backup_path = os.path.join(BACKUP_DIR, f'{backup_name}.db')
|
||
if not os.path.exists(backup_path):
|
||
return False
|
||
|
||
# 先备份当前数据库(以防万一)
|
||
current_backup = self.db_path + '.before_restore'
|
||
shutil.copy2(self.db_path, current_backup)
|
||
|
||
# 恢复备份
|
||
shutil.copy2(backup_path, self.db_path)
|
||
|
||
return True
|
||
|
||
def delete_backup(self, backup_name: str) -> bool:
|
||
"""删除备份"""
|
||
import os
|
||
|
||
backup_path = os.path.join(BACKUP_DIR, f'{backup_name}.db')
|
||
if not os.path.exists(backup_path):
|
||
return False
|
||
|
||
os.remove(backup_path)
|
||
|
||
# 更新元数据
|
||
self._remove_backup_meta(backup_name)
|
||
|
||
return True
|
||
|
||
def _save_backup_meta(self, backup_info: Dict[str, Any]):
|
||
"""保存备份元数据"""
|
||
import os
|
||
|
||
meta_path = os.path.join(BACKUP_DIR, 'backup_meta.json')
|
||
|
||
# 读取现有元数据
|
||
backups = []
|
||
if os.path.exists(meta_path):
|
||
with open(meta_path, 'r') as f:
|
||
backups = json.load(f)
|
||
|
||
# 添加新备份
|
||
backups.append(backup_info)
|
||
|
||
# 保存
|
||
with open(meta_path, 'w') as f:
|
||
json.dump(backups, f, indent=2)
|
||
|
||
def _remove_backup_meta(self, backup_name: str):
|
||
"""从元数据中删除备份"""
|
||
import os
|
||
|
||
meta_path = os.path.join(BACKUP_DIR, 'backup_meta.json')
|
||
if not os.path.exists(meta_path):
|
||
return
|
||
|
||
with open(meta_path, 'r') as f:
|
||
backups = json.load(f)
|
||
|
||
backups = [b for b in backups if b['name'] != backup_name]
|
||
|
||
with open(meta_path, 'w') as f:
|
||
json.dump(backups, f, indent=2)
|
||
|
||
def _is_first_of_month_filename(self, filename: str) -> bool:
|
||
"""判断是否是每月第一天的备份"""
|
||
# 格式:2026-04-01_040000.db 或 2026-05-01_...
|
||
try:
|
||
date_part = filename.split('_')[0]
|
||
day = int(date_part.split('-')[2])
|
||
return day == 1
|
||
except:
|
||
return False
|
||
|
||
def _cleanup_old_backups(self):
|
||
"""清理旧备份:保留30天 + 每月第一天"""
|
||
import os
|
||
|
||
backups = self.list_backups()
|
||
now = datetime.now()
|
||
keep_paths = []
|
||
|
||
for backup in backups:
|
||
backup_date = datetime.fromisoformat(backup['created_at'])
|
||
days_old = (now - backup_date).days
|
||
|
||
# 保留条件:
|
||
# 1. 手动备份永久保留(最多10个)
|
||
# 2. 30天内的备份
|
||
# 3. 每月第一天的备份
|
||
|
||
if backup['manual']:
|
||
# 手动备份保留,但最多10个
|
||
manual_backups = [b for b in backups if b['manual']]
|
||
if manual_backups.index(backup) < 10:
|
||
keep_paths.append(backup['path'])
|
||
else:
|
||
self.delete_backup(backup['name'])
|
||
elif days_old <= 30:
|
||
keep_paths.append(backup['path'])
|
||
elif backup['is_first_of_month']:
|
||
# 每月第一天永久保留
|
||
keep_paths.append(backup['path'])
|
||
else:
|
||
# 删除
|
||
self.delete_backup(backup['name'])
|
||
|
||
# ============ Todo Events 待办事务操作 ============
|
||
|
||
def create_todo_event(self, item_id: int, content: str, remaining_days: int = 1) -> int:
|
||
"""创建待办事务"""
|
||
self._ensure_init()
|
||
now = datetime.now().isoformat()
|
||
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
INSERT INTO todo_events (item_id, content, remaining_days, is_completed, created_at, updated_at)
|
||
VALUES (?, ?, ?, 0, ?, ?)
|
||
""", (item_id, content, remaining_days, now, now))
|
||
conn.commit()
|
||
return cursor.lastrowid
|
||
|
||
def list_todo_events(self, item_id: int, limit: int = 10, offset: int = 0) -> List[Dict[str, Any]]:
|
||
"""列出待办事务(按时间倒序)"""
|
||
self._ensure_init()
|
||
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
SELECT * FROM todo_events
|
||
WHERE item_id = ?
|
||
ORDER BY created_at DESC
|
||
LIMIT ? OFFSET ?
|
||
""", (item_id, limit, offset))
|
||
rows = cursor.fetchall()
|
||
return [dict(row) for row in rows]
|
||
|
||
def count_todo_events(self, item_id: int) -> int:
|
||
"""计算待办事务总数"""
|
||
self._ensure_init()
|
||
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT COUNT(*) as count FROM todo_events WHERE item_id = ?", (item_id,))
|
||
return cursor.fetchone()['count']
|
||
|
||
def get_todo_event(self, event_id: int) -> Optional[Dict[str, Any]]:
|
||
"""获取单个待办事务"""
|
||
self._ensure_init()
|
||
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT * FROM todo_events WHERE id = ?", (event_id,))
|
||
row = cursor.fetchone()
|
||
return dict(row) if row else None
|
||
|
||
def update_todo_event(self, event_id: int, content: str = None, remaining_days: int = None, is_completed: bool = None) -> bool:
|
||
"""更新待办事务"""
|
||
self._ensure_init()
|
||
now = datetime.now().isoformat()
|
||
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
|
||
# 获取当前数据
|
||
cursor.execute("SELECT * FROM todo_events WHERE id = ?", (event_id,))
|
||
row = cursor.fetchone()
|
||
if not row:
|
||
return False
|
||
|
||
# 更新字段
|
||
new_content = content if content is not None else row['content']
|
||
new_days = remaining_days if remaining_days is not None else row['remaining_days']
|
||
new_completed = 1 if is_completed is True else (0 if is_completed is False else row['is_completed'])
|
||
|
||
cursor.execute("""
|
||
UPDATE todo_events
|
||
SET content = ?, remaining_days = ?, is_completed = ?, updated_at = ?
|
||
WHERE id = ?
|
||
""", (new_content, new_days, new_completed, now, event_id))
|
||
conn.commit()
|
||
return True
|
||
|
||
def delete_todo_event(self, event_id: int) -> bool:
|
||
"""删除待办事务"""
|
||
self._ensure_init()
|
||
|
||
with self.get_conn() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("DELETE FROM todo_events WHERE id = ?", (event_id,))
|
||
conn.commit()
|
||
return cursor.rowcount > 0
|
||
|
||
|
||
# 全局数据库实例
|
||
db = Database() |