Files
xian-favor/xian_favor/db.py

402 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""数据库操作"""
import sqlite3
import json
from datetime import datetime
from typing import Optional, List, Dict, Any
from contextlib import contextmanager
from .config import DATABASE_URL, TODO_STATUS, PRIORITY_LEVELS
class Database:
"""SQLite数据库管理"""
def __init__(self, db_path: str = DATABASE_URL):
self.db_path = db_path
self._initialized = False
def _ensure_init(self):
"""确保数据库已初始化"""
if self._initialized:
return
self._init_db()
self._initialized = True
@contextmanager
def get_conn(self):
"""获取数据库连接"""
conn = sqlite3.connect(self.db_path, timeout=30.0)
conn.row_factory = sqlite3.Row
# 启用WAL模式提高并发性能
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=30000")
try:
yield conn
finally:
conn.close()
def _init_db(self):
"""初始化数据库表"""
with self.get_conn() as conn:
cursor = conn.cursor()
# 主内容表
cursor.execute("""
CREATE TABLE IF NOT EXISTS items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
type TEXT NOT NULL DEFAULT 'text',
title TEXT,
content TEXT,
url TEXT,
source TEXT,
status TEXT DEFAULT 'pending',
priority TEXT DEFAULT 'medium',
due_date TEXT,
note TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""")
# 标签表
cursor.execute("""
CREATE TABLE IF NOT EXISTS tags (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
color TEXT DEFAULT '#3498db',
created_at TEXT NOT NULL
)
""")
# 内容-标签关联表
cursor.execute("""
CREATE TABLE IF NOT EXISTS item_tags (
item_id INTEGER NOT NULL,
tag_id INTEGER NOT NULL,
PRIMARY KEY (item_id, tag_id),
FOREIGN KEY (item_id) REFERENCES items(id) ON DELETE CASCADE,
FOREIGN KEY (tag_id) REFERENCES tags(id) ON DELETE CASCADE
)
""")
# 邮箱表
cursor.execute("""
CREATE TABLE IF NOT EXISTS emails (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT NOT NULL UNIQUE,
name TEXT,
created_at TEXT NOT NULL
)
""")
# 创建索引
cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_type ON items(type)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_status ON items(status)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_created ON items(created_at)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_item_tags_item ON item_tags(item_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_item_tags_tag ON item_tags(tag_id)")
conn.commit()
# ============ Item 操作 ============
def create_item(self, type: str = "text", title: str = None, content: str = None,
url: str = None, source: str = None, status: str = "pending",
priority: str = "medium", due_date: str = None, note: str = None,
tags: List[str] = None) -> int:
"""创建新条目"""
self._ensure_init()
now = datetime.now().isoformat()
# 验证状态
if type == "todo" and status not in TODO_STATUS:
status = "pending"
if priority not in PRIORITY_LEVELS:
priority = "medium"
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO items (type, title, content, url, source, status, priority, due_date, note, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (type, title, content, url, source, status, priority, due_date, note, now, now))
item_id = cursor.lastrowid
# 添加标签
if tags:
self._add_tags_to_item(conn, item_id, tags)
conn.commit()
return item_id
def get_item(self, item_id: int) -> Optional[Dict[str, Any]]:
"""获取单个条目"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM items WHERE id = ?", (item_id,))
row = cursor.fetchone()
if not row:
return None
item = dict(row)
item['tags'] = self._get_item_tags(conn, item_id)
return item
def list_items(self, type: str = None, status: str = None, tag: str = None,
keyword: str = None, limit: int = 50, offset: int = 0) -> List[Dict[str, Any]]:
"""列出条目"""
with self.get_conn() as conn:
cursor = conn.cursor()
query = "SELECT DISTINCT i.* FROM items i"
params = []
conditions = []
# 标签过滤需要JOIN
if tag:
query += " JOIN item_tags it ON i.id = it.item_id JOIN tags t ON it.tag_id = t.id"
conditions.append("t.name = ?")
params.append(tag)
if type:
conditions.append("i.type = ?")
params.append(type)
if status:
conditions.append("i.status = ?")
params.append(status)
if keyword:
conditions.append("(i.title LIKE ? OR i.content LIKE ? OR i.note LIKE ?)")
keyword_pattern = f"%{keyword}%"
params.extend([keyword_pattern, keyword_pattern, keyword_pattern])
if conditions:
query += " WHERE " + " AND ".join(conditions)
query += " ORDER BY i.created_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
cursor.execute(query, params)
items = []
for row in cursor.fetchall():
item = dict(row)
item['tags'] = self._get_item_tags(conn, item['id'])
items.append(item)
return items
def update_item(self, item_id: int, **kwargs) -> bool:
"""更新条目"""
allowed_fields = ['type', 'title', 'content', 'url', 'source', 'status', 'priority', 'due_date', 'note']
update_fields = {k: v for k, v in kwargs.items() if k in allowed_fields}
if not update_fields and 'tags' not in kwargs:
return False
now = datetime.now().isoformat()
with self.get_conn() as conn:
cursor = conn.cursor()
if update_fields:
set_clause = ", ".join(f"{k} = ?" for k in update_fields.keys())
set_clause += ", updated_at = ?"
values = list(update_fields.values()) + [now, item_id]
cursor.execute(f"UPDATE items SET {set_clause} WHERE id = ?", values)
if 'tags' in kwargs:
# 先删除旧标签关联
cursor.execute("DELETE FROM item_tags WHERE item_id = ?", (item_id,))
# 添加新标签
if kwargs['tags']:
self._add_tags_to_item(conn, item_id, kwargs['tags'])
conn.commit()
return cursor.rowcount > 0
def delete_item(self, item_id: int) -> bool:
"""删除条目"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM items WHERE id = ?", (item_id,))
conn.commit()
return cursor.rowcount > 0
# ============ Tag 操作 ============
def create_tag(self, name: str, color: str = "#3498db") -> int:
"""创建标签"""
now = datetime.now().isoformat()
with self.get_conn() as conn:
cursor = conn.cursor()
try:
cursor.execute("INSERT INTO tags (name, color, created_at) VALUES (?, ?, ?)",
(name, color, now))
conn.commit()
return cursor.lastrowid
except sqlite3.IntegrityError:
# 标签已存在
cursor.execute("SELECT id FROM tags WHERE name = ?", (name,))
return cursor.fetchone()['id']
def list_tags(self) -> List[Dict[str, Any]]:
"""列出所有标签"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT t.*, COUNT(it.item_id) as item_count
FROM tags t
LEFT JOIN item_tags it ON t.id = it.tag_id
GROUP BY t.id
ORDER BY t.name
""")
return [dict(row) for row in cursor.fetchall()]
def update_tag(self, tag_id: int, name: str) -> bool:
"""更新标签名称"""
with self.get_conn() as conn:
cursor = conn.cursor()
# 检查名称是否已存在(排除自己)
cursor.execute("SELECT id FROM tags WHERE name = ? AND id != ?", (name, tag_id))
if cursor.fetchone():
return False # 名称已存在
cursor.execute("UPDATE tags SET name = ? WHERE id = ?", (name, tag_id))
conn.commit()
return cursor.rowcount > 0
def delete_tag(self, tag_id: int = None, name: str = None) -> bool:
"""删除标签"""
with self.get_conn() as conn:
cursor = conn.cursor()
if name:
cursor.execute("DELETE FROM tags WHERE name = ?", (name,))
elif tag_id:
cursor.execute("DELETE FROM tags WHERE id = ?", (tag_id,))
conn.commit()
return cursor.rowcount > 0
# ============ 辅助方法 ============
def _add_tags_to_item(self, conn, item_id: int, tags: List[str]):
"""为条目添加标签"""
cursor = conn.cursor()
for tag_name in tags:
tag_name = tag_name.strip()
if not tag_name:
continue
# 确保标签存在 - 使用同一个连接
cursor.execute("SELECT id FROM tags WHERE name = ?", (tag_name,))
row = cursor.fetchone()
if row:
tag_id = row['id']
else:
# 创建新标签
now = datetime.now().isoformat()
cursor.execute("INSERT INTO tags (name, color, created_at) VALUES (?, '#3498db', ?)",
(tag_name, now))
tag_id = cursor.lastrowid
# 创建关联
cursor.execute("INSERT OR IGNORE INTO item_tags (item_id, tag_id) VALUES (?, ?)",
(item_id, tag_id))
def _get_item_tags(self, conn, item_id: int) -> List[str]:
"""获取条目的标签"""
cursor = conn.cursor()
cursor.execute("""
SELECT t.name FROM tags t
JOIN item_tags it ON t.id = it.tag_id
WHERE it.item_id = ?
ORDER BY t.name
""", (item_id,))
return [row['name'] for row in cursor.fetchall()]
# ============ Email 操作 ============
def create_email(self, email: str, name: str = None) -> int:
"""创建邮箱"""
now = datetime.now().isoformat()
with self.get_conn() as conn:
cursor = conn.cursor()
try:
cursor.execute("INSERT INTO emails (email, name, created_at) VALUES (?, ?, ?)",
(email, name, now))
conn.commit()
return cursor.lastrowid
except sqlite3.IntegrityError:
# 邮箱已存在返回已有ID
cursor.execute("SELECT id FROM emails WHERE email = ?", (email,))
return cursor.fetchone()['id']
def list_emails(self) -> List[Dict[str, Any]]:
"""列出所有邮箱"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM emails ORDER BY created_at DESC")
return [dict(row) for row in cursor.fetchall()]
def get_email(self, email_id: int) -> Optional[Dict[str, Any]]:
"""获取单个邮箱"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM emails WHERE id = ?", (email_id,))
row = cursor.fetchone()
return dict(row) if row else None
def update_email(self, email_id: int, email: str = None, name: str = None) -> bool:
"""更新邮箱"""
now = datetime.now().isoformat()
with self.get_conn() as conn:
cursor = conn.cursor()
if email:
# 检查邮箱是否已存在(排除自己)
cursor.execute("SELECT id FROM emails WHERE email = ? AND id != ?", (email, email_id))
if cursor.fetchone():
return False # 邎箱已存在
if email and name:
cursor.execute("UPDATE emails SET email = ?, name = ? WHERE id = ?", (email, name, email_id))
elif email:
cursor.execute("UPDATE emails SET email = ? WHERE id = ?", (email, email_id))
elif name:
cursor.execute("UPDATE emails SET name = ? WHERE id = ?", (name, email_id))
conn.commit()
return cursor.rowcount > 0
def delete_email(self, email_id: int) -> bool:
"""删除邮箱"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM emails WHERE id = ?", (email_id,))
conn.commit()
return cursor.rowcount > 0
def stats(self) -> Dict[str, Any]:
"""获取统计信息"""
self._ensure_init()
with self.get_conn() as conn:
cursor = conn.cursor()
stats = {}
# 总数
cursor.execute("SELECT COUNT(*) as count FROM items")
stats['total'] = cursor.fetchone()['count']
# 按类型统计
cursor.execute("SELECT type, COUNT(*) as count FROM items GROUP BY type")
stats['by_type'] = {row['type']: row['count'] for row in cursor.fetchall()}
# 待办状态统计
cursor.execute("SELECT status, COUNT(*) as count FROM items WHERE type = 'todo' GROUP BY status")
stats['todo_status'] = {row['status']: row['count'] for row in cursor.fetchall()}
# 标签数
cursor.execute("SELECT COUNT(*) as count FROM tags")
stats['tags'] = cursor.fetchone()['count']
return stats
# 全局数据库实例
db = Database()