feat: 顶部添加连接状态指示器,掉线时禁止编辑和新增操作

This commit is contained in:
2026-04-20 18:10:37 +08:00
parent 94efc524c6
commit ca7dc10e92
10 changed files with 14541 additions and 73 deletions

View File

@@ -2,12 +2,17 @@
import sqlite3
import json
from datetime import datetime
import shutil
import os
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any
from contextlib import contextmanager
from .config import DATABASE_URL, TODO_STATUS, PRIORITY_LEVELS
# 备份目录
BACKUP_DIR = os.path.join(os.path.dirname(DATABASE_URL), 'backups')
class Database:
"""SQLite数据库管理"""
@@ -80,6 +85,28 @@ class Database:
)
""")
# 邮箱表
cursor.execute("""
CREATE TABLE IF NOT EXISTS emails (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT NOT NULL UNIQUE,
name TEXT,
created_at TEXT NOT NULL
)
""")
# 邮件发送记录表
cursor.execute("""
CREATE TABLE IF NOT EXISTS email_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
item_id INTEGER NOT NULL,
email TEXT NOT NULL,
sent_at TEXT NOT NULL,
success INTEGER DEFAULT 1,
FOREIGN KEY (item_id) REFERENCES items(id) ON DELETE CASCADE
)
""")
# 创建索引
cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_type ON items(type)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_status ON items(status)")
@@ -177,6 +204,41 @@ class Database:
return items
def count_items(self, type: str = None, status: str = None, tag: str = None,
keyword: str = None) -> int:
"""计算符合条件的条目总数"""
with self.get_conn() as conn:
cursor = conn.cursor()
query = "SELECT COUNT(DISTINCT i.id) as count FROM items i"
params = []
conditions = []
# 标签过滤需要JOIN
if tag:
query += " JOIN item_tags it ON i.id = it.item_id JOIN tags t ON it.tag_id = t.id"
conditions.append("t.name = ?")
params.append(tag)
if type:
conditions.append("i.type = ?")
params.append(type)
if status:
conditions.append("i.status = ?")
params.append(status)
if keyword:
conditions.append("(i.title LIKE ? OR i.content LIKE ? OR i.note LIKE ?)")
keyword_pattern = f"%{keyword}%"
params.extend([keyword_pattern, keyword_pattern, keyword_pattern])
if conditions:
query += " WHERE " + " AND ".join(conditions)
cursor.execute(query, params)
return cursor.fetchone()['count']
def update_item(self, item_id: int, **kwargs) -> bool:
"""更新条目"""
allowed_fields = ['type', 'title', 'content', 'url', 'source', 'status', 'priority', 'due_date', 'note']
@@ -244,6 +306,18 @@ class Database:
""")
return [dict(row) for row in cursor.fetchall()]
def update_tag(self, tag_id: int, name: str) -> bool:
"""更新标签名称"""
with self.get_conn() as conn:
cursor = conn.cursor()
# 检查名称是否已存在(排除自己)
cursor.execute("SELECT id FROM tags WHERE name = ? AND id != ?", (name, tag_id))
if cursor.fetchone():
return False # 名称已存在
cursor.execute("UPDATE tags SET name = ? WHERE id = ?", (name, tag_id))
conn.commit()
return cursor.rowcount > 0
def delete_tag(self, tag_id: int = None, name: str = None) -> bool:
"""删除标签"""
with self.get_conn() as conn:
@@ -290,6 +364,90 @@ class Database:
""", (item_id,))
return [row['name'] for row in cursor.fetchall()]
# ============ Email 操作 ============
def create_email(self, email: str, name: str = None) -> int:
"""创建邮箱"""
now = datetime.now().isoformat()
with self.get_conn() as conn:
cursor = conn.cursor()
try:
cursor.execute("INSERT INTO emails (email, name, created_at) VALUES (?, ?, ?)",
(email, name, now))
conn.commit()
return cursor.lastrowid
except sqlite3.IntegrityError:
# 邮箱已存在返回已有ID
cursor.execute("SELECT id FROM emails WHERE email = ?", (email,))
return cursor.fetchone()['id']
def list_emails(self) -> List[Dict[str, Any]]:
"""列出所有邮箱"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM emails ORDER BY created_at DESC")
return [dict(row) for row in cursor.fetchall()]
def get_email(self, email_id: int) -> Optional[Dict[str, Any]]:
"""获取单个邮箱"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM emails WHERE id = ?", (email_id,))
row = cursor.fetchone()
return dict(row) if row else None
def update_email(self, email_id: int, email: str = None, name: str = None) -> bool:
"""更新邮箱"""
now = datetime.now().isoformat()
with self.get_conn() as conn:
cursor = conn.cursor()
if email:
# 检查邮箱是否已存在(排除自己)
cursor.execute("SELECT id FROM emails WHERE email = ? AND id != ?", (email, email_id))
if cursor.fetchone():
return False # 邎箱已存在
if email and name:
cursor.execute("UPDATE emails SET email = ?, name = ? WHERE id = ?", (email, name, email_id))
elif email:
cursor.execute("UPDATE emails SET email = ? WHERE id = ?", (email, email_id))
elif name:
cursor.execute("UPDATE emails SET name = ? WHERE id = ?", (name, email_id))
conn.commit()
return cursor.rowcount > 0
def delete_email(self, email_id: int) -> bool:
"""删除邮箱"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM emails WHERE id = ?", (email_id,))
conn.commit()
return cursor.rowcount > 0
# ============ 邮件日志 操作 ============
def log_email_send(self, item_id: int, email: str, success: bool = True) -> int:
"""记录邮件发送"""
now = datetime.now().isoformat()
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO email_logs (item_id, email, sent_at, success)
VALUES (?, ?, ?, ?)
""", (item_id, email, now, 1 if success else 0))
conn.commit()
return cursor.lastrowid
def get_email_logs(self, item_id: int) -> List[Dict[str, Any]]:
"""获取收藏的邮件发送记录"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM email_logs
WHERE item_id = ?
ORDER BY sent_at DESC
""", (item_id,))
return [dict(row) for row in cursor.fetchall()]
def stats(self) -> Dict[str, Any]:
"""获取统计信息"""
self._ensure_init()
@@ -315,6 +473,252 @@ class Database:
stats['tags'] = cursor.fetchone()['count']
return stats
# ============ 提醒相关 ============
def get_reminders(self) -> Dict[str, Any]:
"""获取提醒信息:即将到期和已过期的待办"""
self._ensure_init()
with self.get_conn() as conn:
cursor = conn.cursor()
now = datetime.now()
reminders = {
'overdue': [], # 已过期
'due_today': [], # 今天到期
'due_soon': [] # 24小时内到期不含今天
}
# 查询未完成的待办(有截止日期的)
cursor.execute("""
SELECT * FROM items
WHERE type = 'todo'
AND status != 'completed'
AND due_date IS NOT NULL
AND due_date != ''
ORDER BY due_date ASC
""")
for row in cursor.fetchall():
item = dict(row)
item['tags'] = self._get_item_tags(conn, item['id'])
try:
due_date_str = item['due_date']
# 支持多种日期格式
if 'T' in due_date_str:
# ISO 格式2026-04-16T14:30
due_date = datetime.strptime(due_date_str[:16], '%Y-%m-%dT%H:%M')
elif len(due_date_str) == 10:
# 只有日期2026-04-16视为当天 23:59:59
due_date = datetime.strptime(due_date_str, '%Y-%m-%d').replace(hour=23, minute=59, second=59)
else:
# 其他格式,尝试解析
due_date = datetime.strptime(due_date_str.split('.')[0], '%Y-%m-%dT%H:%M:%S')
# 计算距离到期的时间
time_left = due_date - now
if time_left.total_seconds() < 0:
# 已过期
days_overdue = abs(int(time_left.total_seconds() / 86400))
item['days_overdue'] = days_overdue
reminders['overdue'].append(item)
elif time_left.total_seconds() < 86400: # 24小时内
# 判断是今天还是明天
if due_date.date() == now.date():
reminders['due_today'].append(item)
else:
reminders['due_soon'].append(item)
elif due_date.date() == now.date():
# 今天到期超过24小时的情况比如现在凌晨截止时间是晚上
reminders['due_today'].append(item)
except (ValueError, AttributeError) as e:
# 日期格式错误,跳过
continue
# 统计总数
reminders['total'] = len(reminders['overdue']) + len(reminders['due_today']) + len(reminders['due_soon'])
return reminders
# ============ 备份操作 ============
def create_backup(self, manual: bool = False) -> Dict[str, Any]:
"""创建数据库备份"""
import os
# 确保备份目录存在
os.makedirs(BACKUP_DIR, exist_ok=True)
now = datetime.now()
backup_name = now.strftime('%Y-%m-%d_%H%M%S')
if manual:
backup_name += '_manual'
backup_path = os.path.join(BACKUP_DIR, f'{backup_name}.db')
# 复制数据库文件
shutil.copy2(self.db_path, backup_path)
# 获取备份信息
backup_info = {
'name': backup_name,
'path': backup_path,
'size': os.path.getsize(backup_path),
'created_at': now.isoformat(),
'manual': manual,
'is_first_of_month': now.day == 1
}
# 保存备份元数据
self._save_backup_meta(backup_info)
# 清理旧备份
self._cleanup_old_backups()
return backup_info
def list_backups(self) -> List[Dict[str, Any]]:
"""列出所有备份"""
import os
if not os.path.exists(BACKUP_DIR):
return []
# 读取备份元数据
meta_path = os.path.join(BACKUP_DIR, 'backup_meta.json')
if os.path.exists(meta_path):
with open(meta_path, 'r') as f:
backups = json.load(f)
else:
# 从文件重建元数据
backups = []
for f in os.listdir(BACKUP_DIR):
if f.endswith('.db'):
path = os.path.join(BACKUP_DIR, f)
backups.append({
'name': f.replace('.db', ''),
'path': path,
'size': os.path.getsize(path),
'created_at': datetime.fromtimestamp(os.path.getmtime(path)).isoformat(),
'manual': '_manual' in f,
'is_first_of_month': self._is_first_of_month_filename(f)
})
# 按时间倒序排列
backups.sort(key=lambda x: x['created_at'], reverse=True)
return backups
def restore_backup(self, backup_name: str) -> bool:
"""恢复备份"""
import os
backup_path = os.path.join(BACKUP_DIR, f'{backup_name}.db')
if not os.path.exists(backup_path):
return False
# 先备份当前数据库(以防万一)
current_backup = self.db_path + '.before_restore'
shutil.copy2(self.db_path, current_backup)
# 恢复备份
shutil.copy2(backup_path, self.db_path)
return True
def delete_backup(self, backup_name: str) -> bool:
"""删除备份"""
import os
backup_path = os.path.join(BACKUP_DIR, f'{backup_name}.db')
if not os.path.exists(backup_path):
return False
os.remove(backup_path)
# 更新元数据
self._remove_backup_meta(backup_name)
return True
def _save_backup_meta(self, backup_info: Dict[str, Any]):
"""保存备份元数据"""
import os
meta_path = os.path.join(BACKUP_DIR, 'backup_meta.json')
# 读取现有元数据
backups = []
if os.path.exists(meta_path):
with open(meta_path, 'r') as f:
backups = json.load(f)
# 添加新备份
backups.append(backup_info)
# 保存
with open(meta_path, 'w') as f:
json.dump(backups, f, indent=2)
def _remove_backup_meta(self, backup_name: str):
"""从元数据中删除备份"""
import os
meta_path = os.path.join(BACKUP_DIR, 'backup_meta.json')
if not os.path.exists(meta_path):
return
with open(meta_path, 'r') as f:
backups = json.load(f)
backups = [b for b in backups if b['name'] != backup_name]
with open(meta_path, 'w') as f:
json.dump(backups, f, indent=2)
def _is_first_of_month_filename(self, filename: str) -> bool:
"""判断是否是每月第一天的备份"""
# 格式2026-04-01_040000.db 或 2026-05-01_...
try:
date_part = filename.split('_')[0]
day = int(date_part.split('-')[2])
return day == 1
except:
return False
def _cleanup_old_backups(self):
"""清理旧备份保留30天 + 每月第一天"""
import os
backups = self.list_backups()
now = datetime.now()
keep_paths = []
for backup in backups:
backup_date = datetime.fromisoformat(backup['created_at'])
days_old = (now - backup_date).days
# 保留条件:
# 1. 手动备份永久保留最多10个
# 2. 30天内的备份
# 3. 每月第一天的备份
if backup['manual']:
# 手动备份保留但最多10个
manual_backups = [b for b in backups if b['manual']]
if manual_backups.index(backup) < 10:
keep_paths.append(backup['path'])
else:
self.delete_backup(backup['name'])
elif days_old <= 30:
keep_paths.append(backup['path'])
elif backup['is_first_of_month']:
# 每月第一天永久保留
keep_paths.append(backup['path'])
else:
# 删除
self.delete_backup(backup['name'])
# 全局数据库实例