feat: 收藏关注系统 v1.0.0 - 支持CLI/API/Web三种操作模式

This commit is contained in:
2026-04-12 01:34:13 +08:00
commit 184cc5b56b
26 changed files with 2953 additions and 0 deletions

321
build/lib/xian_favor/db.py Normal file
View File

@@ -0,0 +1,321 @@
"""数据库操作"""
import sqlite3
import json
from datetime import datetime
from typing import Optional, List, Dict, Any
from contextlib import contextmanager
from .config import DATABASE_URL, TODO_STATUS, PRIORITY_LEVELS
class Database:
"""SQLite数据库管理"""
def __init__(self, db_path: str = DATABASE_URL):
self.db_path = db_path
self._initialized = False
def _ensure_init(self):
"""确保数据库已初始化"""
if self._initialized:
return
self._init_db()
self._initialized = True
@contextmanager
def get_conn(self):
"""获取数据库连接"""
conn = sqlite3.connect(self.db_path, timeout=30.0)
conn.row_factory = sqlite3.Row
# 启用WAL模式提高并发性能
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=30000")
try:
yield conn
finally:
conn.close()
def _init_db(self):
"""初始化数据库表"""
with self.get_conn() as conn:
cursor = conn.cursor()
# 主内容表
cursor.execute("""
CREATE TABLE IF NOT EXISTS items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
type TEXT NOT NULL DEFAULT 'text',
title TEXT,
content TEXT,
url TEXT,
source TEXT,
status TEXT DEFAULT 'pending',
priority TEXT DEFAULT 'medium',
due_date TEXT,
note TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
)
""")
# 标签表
cursor.execute("""
CREATE TABLE IF NOT EXISTS tags (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
color TEXT DEFAULT '#3498db',
created_at TEXT NOT NULL
)
""")
# 内容-标签关联表
cursor.execute("""
CREATE TABLE IF NOT EXISTS item_tags (
item_id INTEGER NOT NULL,
tag_id INTEGER NOT NULL,
PRIMARY KEY (item_id, tag_id),
FOREIGN KEY (item_id) REFERENCES items(id) ON DELETE CASCADE,
FOREIGN KEY (tag_id) REFERENCES tags(id) ON DELETE CASCADE
)
""")
# 创建索引
cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_type ON items(type)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_status ON items(status)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_items_created ON items(created_at)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_item_tags_item ON item_tags(item_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_item_tags_tag ON item_tags(tag_id)")
conn.commit()
# ============ Item 操作 ============
def create_item(self, type: str = "text", title: str = None, content: str = None,
url: str = None, source: str = None, status: str = "pending",
priority: str = "medium", due_date: str = None, note: str = None,
tags: List[str] = None) -> int:
"""创建新条目"""
self._ensure_init()
now = datetime.now().isoformat()
# 验证状态
if type == "todo" and status not in TODO_STATUS:
status = "pending"
if priority not in PRIORITY_LEVELS:
priority = "medium"
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO items (type, title, content, url, source, status, priority, due_date, note, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (type, title, content, url, source, status, priority, due_date, note, now, now))
item_id = cursor.lastrowid
# 添加标签
if tags:
self._add_tags_to_item(conn, item_id, tags)
conn.commit()
return item_id
def get_item(self, item_id: int) -> Optional[Dict[str, Any]]:
"""获取单个条目"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM items WHERE id = ?", (item_id,))
row = cursor.fetchone()
if not row:
return None
item = dict(row)
item['tags'] = self._get_item_tags(conn, item_id)
return item
def list_items(self, type: str = None, status: str = None, tag: str = None,
keyword: str = None, limit: int = 50, offset: int = 0) -> List[Dict[str, Any]]:
"""列出条目"""
with self.get_conn() as conn:
cursor = conn.cursor()
query = "SELECT DISTINCT i.* FROM items i"
params = []
conditions = []
# 标签过滤需要JOIN
if tag:
query += " JOIN item_tags it ON i.id = it.item_id JOIN tags t ON it.tag_id = t.id"
conditions.append("t.name = ?")
params.append(tag)
if type:
conditions.append("i.type = ?")
params.append(type)
if status:
conditions.append("i.status = ?")
params.append(status)
if keyword:
conditions.append("(i.title LIKE ? OR i.content LIKE ? OR i.note LIKE ?)")
keyword_pattern = f"%{keyword}%"
params.extend([keyword_pattern, keyword_pattern, keyword_pattern])
if conditions:
query += " WHERE " + " AND ".join(conditions)
query += " ORDER BY i.created_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
cursor.execute(query, params)
items = []
for row in cursor.fetchall():
item = dict(row)
item['tags'] = self._get_item_tags(conn, item['id'])
items.append(item)
return items
def update_item(self, item_id: int, **kwargs) -> bool:
"""更新条目"""
allowed_fields = ['type', 'title', 'content', 'url', 'source', 'status', 'priority', 'due_date', 'note']
update_fields = {k: v for k, v in kwargs.items() if k in allowed_fields}
if not update_fields and 'tags' not in kwargs:
return False
now = datetime.now().isoformat()
with self.get_conn() as conn:
cursor = conn.cursor()
if update_fields:
set_clause = ", ".join(f"{k} = ?" for k in update_fields.keys())
set_clause += ", updated_at = ?"
values = list(update_fields.values()) + [now, item_id]
cursor.execute(f"UPDATE items SET {set_clause} WHERE id = ?", values)
if 'tags' in kwargs:
# 先删除旧标签关联
cursor.execute("DELETE FROM item_tags WHERE item_id = ?", (item_id,))
# 添加新标签
if kwargs['tags']:
self._add_tags_to_item(conn, item_id, kwargs['tags'])
conn.commit()
return cursor.rowcount > 0
def delete_item(self, item_id: int) -> bool:
"""删除条目"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM items WHERE id = ?", (item_id,))
conn.commit()
return cursor.rowcount > 0
# ============ Tag 操作 ============
def create_tag(self, name: str, color: str = "#3498db") -> int:
"""创建标签"""
now = datetime.now().isoformat()
with self.get_conn() as conn:
cursor = conn.cursor()
try:
cursor.execute("INSERT INTO tags (name, color, created_at) VALUES (?, ?, ?)",
(name, color, now))
conn.commit()
return cursor.lastrowid
except sqlite3.IntegrityError:
# 标签已存在
cursor.execute("SELECT id FROM tags WHERE name = ?", (name,))
return cursor.fetchone()['id']
def list_tags(self) -> List[Dict[str, Any]]:
"""列出所有标签"""
with self.get_conn() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT t.*, COUNT(it.item_id) as item_count
FROM tags t
LEFT JOIN item_tags it ON t.id = it.tag_id
GROUP BY t.id
ORDER BY t.name
""")
return [dict(row) for row in cursor.fetchall()]
def delete_tag(self, tag_id: int = None, name: str = None) -> bool:
"""删除标签"""
with self.get_conn() as conn:
cursor = conn.cursor()
if name:
cursor.execute("DELETE FROM tags WHERE name = ?", (name,))
elif tag_id:
cursor.execute("DELETE FROM tags WHERE id = ?", (tag_id,))
conn.commit()
return cursor.rowcount > 0
# ============ 辅助方法 ============
def _add_tags_to_item(self, conn, item_id: int, tags: List[str]):
"""为条目添加标签"""
cursor = conn.cursor()
for tag_name in tags:
tag_name = tag_name.strip()
if not tag_name:
continue
# 确保标签存在 - 使用同一个连接
cursor.execute("SELECT id FROM tags WHERE name = ?", (tag_name,))
row = cursor.fetchone()
if row:
tag_id = row['id']
else:
# 创建新标签
now = datetime.now().isoformat()
cursor.execute("INSERT INTO tags (name, color, created_at) VALUES (?, '#3498db', ?)",
(tag_name, now))
tag_id = cursor.lastrowid
# 创建关联
cursor.execute("INSERT OR IGNORE INTO item_tags (item_id, tag_id) VALUES (?, ?)",
(item_id, tag_id))
def _get_item_tags(self, conn, item_id: int) -> List[str]:
"""获取条目的标签"""
cursor = conn.cursor()
cursor.execute("""
SELECT t.name FROM tags t
JOIN item_tags it ON t.id = it.tag_id
WHERE it.item_id = ?
ORDER BY t.name
""", (item_id,))
return [row['name'] for row in cursor.fetchall()]
def stats(self) -> Dict[str, Any]:
"""获取统计信息"""
self._ensure_init()
with self.get_conn() as conn:
cursor = conn.cursor()
stats = {}
# 总数
cursor.execute("SELECT COUNT(*) as count FROM items")
stats['total'] = cursor.fetchone()['count']
# 按类型统计
cursor.execute("SELECT type, COUNT(*) as count FROM items GROUP BY type")
stats['by_type'] = {row['type']: row['count'] for row in cursor.fetchall()}
# 待办状态统计
cursor.execute("SELECT status, COUNT(*) as count FROM items WHERE type = 'todo' GROUP BY status")
stats['todo_status'] = {row['status']: row['count'] for row in cursor.fetchall()}
# 标签数
cursor.execute("SELECT COUNT(*) as count FROM tags")
stats['tags'] = cursor.fetchone()['count']
return stats
# 全局数据库实例
db = Database()