Files
xian-favor/xian_favor/db.py
hubian 79e4eb4de0 feat: 新增回收站功能
- 数据库添加 is_deleted 和 deleted_at 字段
- 删除数据改为移动到回收站(软删除)
- 回收站支持查看、恢复、彻底删除
- 支持一键清空回收站
- 侧边栏添加回收站入口
2026-04-19 16:57:47 +08:00

893 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""数据库操作"""
import sqlite3
import json
import shutil
import os
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any
from contextlib import contextmanager
from .config import DATABASE_URL, TODO_STATUS, PRIORITY_LEVELS
# 备份目录
BACKUP_DIR = os.path.join(os.path.dirname(DATABASE_URL), 'backups')
class Database:
"""SQLite数据库管理"""
def __init__(self, db_path: str = DATABASE_URL):
self.db_path = db_path
self._initialized = False
def _ensure_init(self):
"""确保数据库已初始化"""
if self._initialized:
return
self._init_db()
self._initialized = True
@contextmanager
def get_conn(self):
"""获取数据库连接"""
conn = sqlite3.connect(self.db_path, timeout=30.0)
conn.row_factory = sqlite3.Row
# 启用WAL模式提高并发性能
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=30000")
try:
yield conn
finally:
conn.close()
def _init_db(self):
"""初始化数据库表"""
with self.get_conn() as conn:
cursor = conn.cursor()
# 主内容表
cursor.execute("""
CREATE TABLE IF NOT EXISTS items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
type TEXT NOT NULL DEFAULT 'text',
title TEXT,
content TEXT,
url TEXT,
source TEXT,
status TEXT DEFAULT 'pending',
priority TEXT DEFAULT 'medium',
due_date TEXT,
note TEXT,
is_starred INTEGER DEFAULT 0,
views INTEGER DEFAULT 0,
is_deleted INTEGER DEFAULT 0,
deleted_at TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""")
# 标签表
cursor.execute("""
CREATE TABLE IF NOT EXISTS tags (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
color TEXT DEFAULT '#3498db',
created_at TEXT NOT NULL
)
""")
# 内容-标签关联表
cursor.execute("""
CREATE TABLE IF NOT EXISTS item_tags (
item_id INTEGER NOT NULL,
tag_id INTEGER NOT NULL,
PRIMARY KEY (item_id, tag_id),
FOREIGN KEY (item_id) REFERENCES items(id) ON DELETE CASCADE,
FOREIGN KEY (tag_id) REFERENCES tags(id) ON DELETE CASCADE
)
""")
# 邮箱表
cursor.execute("""
CREATE TABLE IF NOT EXISTS emails (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT NOT NULL UNIQUE,
name TEXT,
created_at TEXT NOT NULL
)
""")
# 邮件发送记录表
cursor.execute("""
CREATE TABLE IF NOT EXISTS email_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
item_id INTEGER NOT NULL,
email TEXT NOT NULL,
sent_at TEXT NOT NULL,
success INTEGER DEFAULT 1,
FOREIGN KEY (item_id) REFERENCES items(id) ON DELETE CASCADE
)
""")
# 创建索引
cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_type ON items(type)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_status ON items(status)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_created ON items(created_at)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_item_tags_item ON item_tags(item_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_item_tags_tag ON item_tags(tag_id)")
# 检查并添加 is_starred 字段(兼容旧数据库)
try:
cursor.execute("SELECT is_starred FROM items LIMIT 1")
except sqlite3.OperationalError:
cursor.execute("ALTER TABLE items ADD COLUMN is_starred INTEGER DEFAULT 0")
# 创建 is_starred 索引(字段添加后再创建)
cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_starred ON items(is_starred)")
# 检查并添加 views 字段(兼容旧数据库)
try:
cursor.execute("SELECT views FROM items LIMIT 1")
except sqlite3.OperationalError:
cursor.execute("ALTER TABLE items ADD COLUMN views INTEGER DEFAULT 0")
# 检查并添加 is_deleted 和 deleted_at 字段(兼容旧数据库)
try:
cursor.execute("SELECT is_deleted FROM items LIMIT 1")
except sqlite3.OperationalError:
cursor.execute("ALTER TABLE items ADD COLUMN is_deleted INTEGER DEFAULT 0")
try:
cursor.execute("SELECT deleted_at FROM items LIMIT 1")
except sqlite3.OperationalError:
cursor.execute("ALTER TABLE items ADD COLUMN deleted_at TEXT")
conn.commit()
# ============ Item 操作 ============
def create_item(self, type: str = "text", title: str = None, content: str = None,
url: str = None, source: str = None, status: str = "pending",
priority: str = "medium", due_date: str = None, note: str = None,
tags: List[str] = None, is_starred: bool = False) -> int:
"""创建新条目"""
self._ensure_init()
now = datetime.now().isoformat()
# 验证状态
if type == "todo" and status not in TODO_STATUS:
status = "pending"
if priority not in PRIORITY_LEVELS:
priority = "medium"
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO items (type, title, content, url, source, status, priority, due_date, note, is_starred, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (type, title, content, url, source, status, priority, due_date, note, 1 if is_starred else 0, now, now))
item_id = cursor.lastrowid
# 添加标签
if tags:
self._add_tags_to_item(conn, item_id, tags)
conn.commit()
return item_id
def get_item(self, item_id: int) -> Optional[Dict[str, Any]]:
"""获取单个条目"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM items WHERE id = ?", (item_id,))
row = cursor.fetchone()
if not row:
return None
item = dict(row)
item['tags'] = self._get_item_tags(conn, item_id)
return item
def list_items(self, type: str = None, status: str = None, tag: str = None,
keyword: str = None, starred: bool = None, sort_by: str = None,
sort_order: str = None, limit: int = 50, offset: int = 0) -> List[Dict[str, Any]]:
"""列出条目
sort_by: created_at, updated_at
sort_order: desc, asc
"""
with self.get_conn() as conn:
cursor = conn.cursor()
query = "SELECT DISTINCT i.* FROM items i"
params = []
conditions = []
# 只显示未删除的数据
conditions.append("i.is_deleted = 0")
# 标签过滤需要JOIN
if tag:
query += " JOIN item_tags it ON i.id = it.item_id JOIN tags t ON it.tag_id = t.id"
conditions.append("t.name = ?")
params.append(tag)
if type:
conditions.append("i.type = ?")
params.append(type)
if status:
conditions.append("i.status = ?")
params.append(status)
if starred is not None:
conditions.append("i.is_starred = ?")
params.append(1 if starred else 0)
if keyword:
conditions.append("(i.title LIKE ? OR i.content LIKE ? OR i.note LIKE ?)")
keyword_pattern = f"%{keyword}%"
params.extend([keyword_pattern, keyword_pattern, keyword_pattern])
if conditions:
query += " WHERE " + " AND ".join(conditions)
# 排序逻辑
if sort_by == 'updated_at':
order_field = 'i.updated_at'
elif sort_by == 'created_at':
order_field = 'i.created_at'
else:
# 默认:重点关注优先 + 创建时间降序
order_field = 'i.created_at'
order_dir = 'DESC' if (sort_order == 'asc' or sort_order is None) else 'ASC'
# 这里反转逻辑:用户选择"降序"时用DESC选择"升序"时用ASC
if sort_order == 'asc':
order_dir = 'ASC'
elif sort_order == 'desc':
order_dir = 'DESC'
else:
order_dir = 'DESC' # 默认降序
# 如果有指定排序字段,按该字段排序;否则默认重点关注优先
if sort_by:
query += f" ORDER BY {order_field} {order_dir} LIMIT ? OFFSET ?"
else:
# 默认:重点关注优先,然后创建时间降序
query += f" ORDER BY i.is_starred DESC, i.created_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
cursor.execute(query, params)
items = []
for row in cursor.fetchall():
item = dict(row)
item['tags'] = self._get_item_tags(conn, item['id'])
items.append(item)
return items
def count_items(self, type: str = None, status: str = None, tag: str = None,
keyword: str = None, starred: bool = None) -> int:
"""计算符合条件的条目总数"""
with self.get_conn() as conn:
cursor = conn.cursor()
query = "SELECT COUNT(DISTINCT i.id) as count FROM items i"
params = []
conditions = []
# 标签过滤需要JOIN
if tag:
query += " JOIN item_tags it ON i.id = it.item_id JOIN tags t ON it.tag_id = t.id"
conditions.append("t.name = ?")
params.append(tag)
if type:
conditions.append("i.type = ?")
params.append(type)
if status:
conditions.append("i.status = ?")
params.append(status)
if starred is not None:
conditions.append("i.is_starred = ?")
params.append(1 if starred else 0)
if keyword:
conditions.append("(i.title LIKE ? OR i.content LIKE ? OR i.note LIKE ?)")
keyword_pattern = f"%{keyword}%"
params.extend([keyword_pattern, keyword_pattern, keyword_pattern])
if conditions:
query += " WHERE " + " AND ".join(conditions)
cursor.execute(query, params)
return cursor.fetchone()['count']
def update_item(self, item_id: int, **kwargs) -> bool:
"""更新条目"""
allowed_fields = ['type', 'title', 'content', 'url', 'source', 'status', 'priority', 'due_date', 'note', 'is_starred']
update_fields = {k: v for k, v in kwargs.items() if k in allowed_fields}
# 只有 tags 变化也算有效更新
if not update_fields and 'tags' not in kwargs:
return False
now = datetime.now().isoformat()
with self.get_conn() as conn:
cursor = conn.cursor()
# 检查条目是否存在
cursor.execute("SELECT id FROM items WHERE id = ?", (item_id,))
if not cursor.fetchone():
return False
if update_fields:
set_clause = ", ".join(f"{k} = ?" for k in update_fields.keys())
set_clause += ", updated_at = ?"
values = list(update_fields.values()) + [now, item_id]
cursor.execute(f"UPDATE items SET {set_clause} WHERE id = ?", values)
if 'tags' in kwargs:
# 先删除旧标签关联
cursor.execute("DELETE FROM item_tags WHERE item_id = ?", (item_id,))
# 添加新标签
if kwargs['tags']:
self._add_tags_to_item(conn, item_id, kwargs['tags'])
conn.commit()
return True
def delete_item(self, item_id: int) -> bool:
"""删除条目(移动到回收站)"""
with self.get_conn() as conn:
cursor = conn.cursor()
now = datetime.now().isoformat()
cursor.execute("UPDATE items SET is_deleted = 1, deleted_at = ? WHERE id = ?", (now, item_id))
conn.commit()
return cursor.rowcount > 0
def list_trash(self, limit: int = 50, offset: int = 0) -> List[Dict[str, Any]]:
"""列出回收站数据"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM items WHERE is_deleted = 1 ORDER BY deleted_at DESC LIMIT ? OFFSET ?", (limit, offset))
items = []
for row in cursor.fetchall():
item = dict(row)
item['tags'] = self._get_item_tags(conn, item['id'])
items.append(item)
return items
def count_trash(self) -> int:
"""计算回收站数据总数"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) as count FROM items WHERE is_deleted = 1")
return cursor.fetchone()['count']
def restore_item(self, item_id: int) -> bool:
"""从回收站恢复数据"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("UPDATE items SET is_deleted = 0, deleted_at = NULL WHERE id = ?", (item_id,))
conn.commit()
return cursor.rowcount > 0
def delete_permanently(self, item_id: int) -> bool:
"""彻底删除数据(从数据库中删除)"""
with self.get_conn() as conn:
cursor = conn.cursor()
# 删除标签关联
cursor.execute("DELETE FROM item_tags WHERE item_id = ?", (item_id,))
# 删除邮件发送记录
cursor.execute("DELETE FROM email_logs WHERE item_id = ?", (item_id,))
# 删除数据
cursor.execute("DELETE FROM items WHERE id = ?", (item_id,))
conn.commit()
return cursor.rowcount > 0
def empty_trash(self) -> int:
"""清空回收站"""
with self.get_conn() as conn:
cursor = conn.cursor()
# 获取所有回收站数据ID
cursor.execute("SELECT id FROM items WHERE is_deleted = 1")
ids = [row['id'] for row in cursor.fetchall()]
# 删除所有关联数据
for item_id in ids:
cursor.execute("DELETE FROM item_tags WHERE item_id = ?", (item_id,))
cursor.execute("DELETE FROM email_logs WHERE item_id = ?", (item_id,))
# 删除所有回收站数据
cursor.execute("DELETE FROM items WHERE is_deleted = 1")
deleted_count = cursor.rowcount
conn.commit()
return deleted_count
def toggle_star(self, item_id: int) -> bool:
"""切换重点关注状态"""
with self.get_conn() as conn:
cursor = conn.cursor()
# 先获取当前状态
cursor.execute("SELECT is_starred FROM items WHERE id = ?", (item_id,))
row = cursor.fetchone()
if not row:
return False
new_status = 0 if row['is_starred'] else 1
now = datetime.now().isoformat()
cursor.execute("UPDATE items SET is_starred = ?, updated_at = ? WHERE id = ?", (new_status, now, item_id))
conn.commit()
return True
def set_star(self, item_id: int, starred: bool = True) -> bool:
"""设置重点关注状态"""
with self.get_conn() as conn:
cursor = conn.cursor()
now = datetime.now().isoformat()
cursor.execute("UPDATE items SET is_starred = ?, updated_at = ? WHERE id = ?", (1 if starred else 0, now, item_id))
conn.commit()
return cursor.rowcount > 0
def increment_views(self, item_id: int) -> bool:
"""增加阅读数"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("UPDATE items SET views = views + 1 WHERE id = ?", (item_id,))
conn.commit()
return cursor.rowcount > 0
# ============ Tag 操作 ============
def create_tag(self, name: str, color: str = "#3498db") -> int:
"""创建标签"""
now = datetime.now().isoformat()
with self.get_conn() as conn:
cursor = conn.cursor()
try:
cursor.execute("INSERT INTO tags (name, color, created_at) VALUES (?, ?, ?)",
(name, color, now))
conn.commit()
return cursor.lastrowid
except sqlite3.IntegrityError:
# 标签已存在
cursor.execute("SELECT id FROM tags WHERE name = ?", (name,))
return cursor.fetchone()['id']
def list_tags(self) -> List[Dict[str, Any]]:
"""列出所有标签"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT t.*, COUNT(it.item_id) as item_count
FROM tags t
LEFT JOIN item_tags it ON t.id = it.tag_id
GROUP BY t.id
ORDER BY t.name
""")
return [dict(row) for row in cursor.fetchall()]
def update_tag(self, tag_id: int, name: str) -> bool:
"""更新标签名称"""
with self.get_conn() as conn:
cursor = conn.cursor()
# 检查名称是否已存在(排除自己)
cursor.execute("SELECT id FROM tags WHERE name = ? AND id != ?", (name, tag_id))
if cursor.fetchone():
return False # 名称已存在
cursor.execute("UPDATE tags SET name = ? WHERE id = ?", (name, tag_id))
conn.commit()
return cursor.rowcount > 0
def delete_tag(self, tag_id: int = None, name: str = None) -> bool:
"""删除标签"""
with self.get_conn() as conn:
cursor = conn.cursor()
if name:
cursor.execute("DELETE FROM tags WHERE name = ?", (name,))
elif tag_id:
cursor.execute("DELETE FROM tags WHERE id = ?", (tag_id,))
conn.commit()
return cursor.rowcount > 0
# ============ 辅助方法 ============
def _add_tags_to_item(self, conn, item_id: int, tags: List[str]):
"""为条目添加标签"""
cursor = conn.cursor()
for tag_name in tags:
tag_name = tag_name.strip()
if not tag_name:
continue
# 确保标签存在 - 使用同一个连接
cursor.execute("SELECT id FROM tags WHERE name = ?", (tag_name,))
row = cursor.fetchone()
if row:
tag_id = row['id']
else:
# 创建新标签
now = datetime.now().isoformat()
cursor.execute("INSERT INTO tags (name, color, created_at) VALUES (?, '#3498db', ?)",
(tag_name, now))
tag_id = cursor.lastrowid
# 创建关联
cursor.execute("INSERT OR IGNORE INTO item_tags (item_id, tag_id) VALUES (?, ?)",
(item_id, tag_id))
def _get_item_tags(self, conn, item_id: int) -> List[str]:
"""获取条目的标签"""
cursor = conn.cursor()
cursor.execute("""
SELECT t.name FROM tags t
JOIN item_tags it ON t.id = it.tag_id
WHERE it.item_id = ?
ORDER BY t.name
""", (item_id,))
return [row['name'] for row in cursor.fetchall()]
# ============ Email 操作 ============
def create_email(self, email: str, name: str = None) -> int:
"""创建邮箱"""
now = datetime.now().isoformat()
with self.get_conn() as conn:
cursor = conn.cursor()
try:
cursor.execute("INSERT INTO emails (email, name, created_at) VALUES (?, ?, ?)",
(email, name, now))
conn.commit()
return cursor.lastrowid
except sqlite3.IntegrityError:
# 邮箱已存在返回已有ID
cursor.execute("SELECT id FROM emails WHERE email = ?", (email,))
return cursor.fetchone()['id']
def list_emails(self) -> List[Dict[str, Any]]:
"""列出所有邮箱"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM emails ORDER BY created_at DESC")
return [dict(row) for row in cursor.fetchall()]
def get_email(self, email_id: int) -> Optional[Dict[str, Any]]:
"""获取单个邮箱"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM emails WHERE id = ?", (email_id,))
row = cursor.fetchone()
return dict(row) if row else None
def update_email(self, email_id: int, email: str = None, name: str = None) -> bool:
"""更新邮箱"""
now = datetime.now().isoformat()
with self.get_conn() as conn:
cursor = conn.cursor()
if email:
# 检查邮箱是否已存在(排除自己)
cursor.execute("SELECT id FROM emails WHERE email = ? AND id != ?", (email, email_id))
if cursor.fetchone():
return False # 邎箱已存在
if email and name:
cursor.execute("UPDATE emails SET email = ?, name = ? WHERE id = ?", (email, name, email_id))
elif email:
cursor.execute("UPDATE emails SET email = ? WHERE id = ?", (email, email_id))
elif name:
cursor.execute("UPDATE emails SET name = ? WHERE id = ?", (name, email_id))
conn.commit()
return cursor.rowcount > 0
def delete_email(self, email_id: int) -> bool:
"""删除邮箱"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM emails WHERE id = ?", (email_id,))
conn.commit()
return cursor.rowcount > 0
# ============ 邮件日志 操作 ============
def log_email_send(self, item_id: int, email: str, success: bool = True) -> int:
"""记录邮件发送"""
now = datetime.now().isoformat()
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO email_logs (item_id, email, sent_at, success)
VALUES (?, ?, ?, ?)
""", (item_id, email, now, 1 if success else 0))
conn.commit()
return cursor.lastrowid
def get_email_logs(self, item_id: int) -> List[Dict[str, Any]]:
"""获取收藏的邮件发送记录"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM email_logs
WHERE item_id = ?
ORDER BY sent_at DESC
""", (item_id,))
return [dict(row) for row in cursor.fetchall()]
def stats(self) -> Dict[str, Any]:
"""获取统计信息"""
self._ensure_init()
with self.get_conn() as conn:
cursor = conn.cursor()
stats = {}
# 总数
cursor.execute("SELECT COUNT(*) as count FROM items")
stats['total'] = cursor.fetchone()['count']
# 按类型统计
cursor.execute("SELECT type, COUNT(*) as count FROM items GROUP BY type")
stats['by_type'] = {row['type']: row['count'] for row in cursor.fetchall()}
# 待办状态统计
cursor.execute("SELECT status, COUNT(*) as count FROM items WHERE type = 'todo' GROUP BY status")
stats['todo_status'] = {row['status']: row['count'] for row in cursor.fetchall()}
# 标签数
cursor.execute("SELECT COUNT(*) as count FROM tags")
stats['tags'] = cursor.fetchone()['count']
return stats
# ============ 提醒相关 ============
def get_reminders(self) -> Dict[str, Any]:
"""获取提醒信息:即将到期和已过期的待办"""
self._ensure_init()
with self.get_conn() as conn:
cursor = conn.cursor()
now = datetime.now()
reminders = {
'overdue': [], # 已过期
'due_today': [], # 今天到期
'due_soon': [] # 24小时内到期不含今天
}
# 查询未完成的待办(有截止日期的)
cursor.execute("""
SELECT * FROM items
WHERE type = 'todo'
AND status != 'completed'
AND due_date IS NOT NULL
AND due_date != ''
ORDER BY due_date ASC
""")
for row in cursor.fetchall():
item = dict(row)
item['tags'] = self._get_item_tags(conn, item['id'])
try:
due_date_str = item['due_date']
# 支持多种日期格式
if 'T' in due_date_str:
# ISO 格式2026-04-16T14:30
due_date = datetime.strptime(due_date_str[:16], '%Y-%m-%dT%H:%M')
elif len(due_date_str) == 10:
# 只有日期2026-04-16视为当天 23:59:59
due_date = datetime.strptime(due_date_str, '%Y-%m-%d').replace(hour=23, minute=59, second=59)
else:
# 其他格式,尝试解析
due_date = datetime.strptime(due_date_str.split('.')[0], '%Y-%m-%dT%H:%M:%S')
# 计算距离到期的时间
time_left = due_date - now
if time_left.total_seconds() < 0:
# 已过期
days_overdue = abs(int(time_left.total_seconds() / 86400))
item['days_overdue'] = days_overdue
reminders['overdue'].append(item)
elif time_left.total_seconds() < 86400: # 24小时内
# 判断是今天还是明天
if due_date.date() == now.date():
reminders['due_today'].append(item)
else:
reminders['due_soon'].append(item)
elif due_date.date() == now.date():
# 今天到期超过24小时的情况比如现在凌晨截止时间是晚上
reminders['due_today'].append(item)
except (ValueError, AttributeError) as e:
# 日期格式错误,跳过
continue
# 统计总数
reminders['total'] = len(reminders['overdue']) + len(reminders['due_today']) + len(reminders['due_soon'])
return reminders
# ============ 备份操作 ============
def create_backup(self, manual: bool = False) -> Dict[str, Any]:
"""创建数据库备份"""
import os
# 确保备份目录存在
os.makedirs(BACKUP_DIR, exist_ok=True)
now = datetime.now()
backup_name = now.strftime('%Y-%m-%d_%H%M%S')
if manual:
backup_name += '_manual'
backup_path = os.path.join(BACKUP_DIR, f'{backup_name}.db')
# 复制数据库文件
shutil.copy2(self.db_path, backup_path)
# 获取备份信息
backup_info = {
'name': backup_name,
'path': backup_path,
'size': os.path.getsize(backup_path),
'created_at': now.isoformat(),
'manual': manual,
'is_first_of_month': now.day == 1
}
# 保存备份元数据
self._save_backup_meta(backup_info)
# 清理旧备份
self._cleanup_old_backups()
return backup_info
def list_backups(self) -> List[Dict[str, Any]]:
"""列出所有备份"""
import os
if not os.path.exists(BACKUP_DIR):
return []
# 读取备份元数据
meta_path = os.path.join(BACKUP_DIR, 'backup_meta.json')
if os.path.exists(meta_path):
with open(meta_path, 'r') as f:
backups = json.load(f)
else:
# 从文件重建元数据
backups = []
for f in os.listdir(BACKUP_DIR):
if f.endswith('.db'):
path = os.path.join(BACKUP_DIR, f)
backups.append({
'name': f.replace('.db', ''),
'path': path,
'size': os.path.getsize(path),
'created_at': datetime.fromtimestamp(os.path.getmtime(path)).isoformat(),
'manual': '_manual' in f,
'is_first_of_month': self._is_first_of_month_filename(f)
})
# 按时间倒序排列
backups.sort(key=lambda x: x['created_at'], reverse=True)
return backups
def restore_backup(self, backup_name: str) -> bool:
"""恢复备份"""
import os
backup_path = os.path.join(BACKUP_DIR, f'{backup_name}.db')
if not os.path.exists(backup_path):
return False
# 先备份当前数据库(以防万一)
current_backup = self.db_path + '.before_restore'
shutil.copy2(self.db_path, current_backup)
# 恢复备份
shutil.copy2(backup_path, self.db_path)
return True
def delete_backup(self, backup_name: str) -> bool:
"""删除备份"""
import os
backup_path = os.path.join(BACKUP_DIR, f'{backup_name}.db')
if not os.path.exists(backup_path):
return False
os.remove(backup_path)
# 更新元数据
self._remove_backup_meta(backup_name)
return True
def _save_backup_meta(self, backup_info: Dict[str, Any]):
"""保存备份元数据"""
import os
meta_path = os.path.join(BACKUP_DIR, 'backup_meta.json')
# 读取现有元数据
backups = []
if os.path.exists(meta_path):
with open(meta_path, 'r') as f:
backups = json.load(f)
# 添加新备份
backups.append(backup_info)
# 保存
with open(meta_path, 'w') as f:
json.dump(backups, f, indent=2)
def _remove_backup_meta(self, backup_name: str):
"""从元数据中删除备份"""
import os
meta_path = os.path.join(BACKUP_DIR, 'backup_meta.json')
if not os.path.exists(meta_path):
return
with open(meta_path, 'r') as f:
backups = json.load(f)
backups = [b for b in backups if b['name'] != backup_name]
with open(meta_path, 'w') as f:
json.dump(backups, f, indent=2)
def _is_first_of_month_filename(self, filename: str) -> bool:
"""判断是否是每月第一天的备份"""
# 格式2026-04-01_040000.db 或 2026-05-01_...
try:
date_part = filename.split('_')[0]
day = int(date_part.split('-')[2])
return day == 1
except:
return False
def _cleanup_old_backups(self):
"""清理旧备份保留30天 + 每月第一天"""
import os
backups = self.list_backups()
now = datetime.now()
keep_paths = []
for backup in backups:
backup_date = datetime.fromisoformat(backup['created_at'])
days_old = (now - backup_date).days
# 保留条件:
# 1. 手动备份永久保留最多10个
# 2. 30天内的备份
# 3. 每月第一天的备份
if backup['manual']:
# 手动备份保留但最多10个
manual_backups = [b for b in backups if b['manual']]
if manual_backups.index(backup) < 10:
keep_paths.append(backup['path'])
else:
self.delete_backup(backup['name'])
elif days_old <= 30:
keep_paths.append(backup['path'])
elif backup['is_first_of_month']:
# 每月第一天永久保留
keep_paths.append(backup['path'])
else:
# 删除
self.delete_backup(backup['name'])
# 全局数据库实例
db = Database()