""" 数据库管理模块 """ import sqlite3 import datetime from pathlib import Path from config import DB_PATH, DB_SCHEMA class Database: """SQLite 数据库管理""" def __init__(self): self.db_path = DB_PATH self._init_db() def _init_db(self): """初始化数据库(支持迁移)""" conn = sqlite3.connect(self.db_path) # 检查表是否存在 cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='images'") table_exists = cursor.fetchone() is not None if not table_exists: # 新建数据库 conn.executescript(DB_SCHEMA) else: # 迁移:检查是否有 date_folder 字段 cursor = conn.execute("PRAGMA table_info(images)") columns = [row[1] for row in cursor.fetchall()] if 'date_folder' not in columns: conn.execute("ALTER TABLE images ADD COLUMN date_folder TEXT") if 'camera_id' not in columns: conn.execute("ALTER TABLE images ADD COLUMN camera_id INTEGER DEFAULT 0") # 检查 config 表 cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='config'") if cursor.fetchone() is None: conn.execute("CREATE TABLE IF NOT EXISTS config (key TEXT PRIMARY KEY, value TEXT)") conn.commit() conn.close() def _get_conn(self): """获取数据库连接""" return sqlite3.connect(self.db_path) def add_image(self, path, camera_id=0, date_folder=None): """添加图片记录 Returns: int: 图片ID """ conn = self._get_conn() cursor = conn.cursor() cursor.execute( "INSERT INTO images (path, camera_id, timestamp, date_folder) VALUES (?, ?, ?, ?)", (path, camera_id, datetime.datetime.now().isoformat(), date_folder) ) image_id = cursor.lastrowid conn.commit() conn.close() return image_id def mark_image_analyzed(self, image_id): """标记图片已分析""" conn = self._get_conn() conn.execute("UPDATE images SET analyzed = 1 WHERE id = ?", (image_id,)) conn.commit() conn.close() def add_event(self, image_id, event_type, description, confidence): """添加事件记录""" conn = self._get_conn() conn.execute( "INSERT INTO events (image_id, event_type, description, confidence, timestamp) VALUES (?, ?, ?, ?, ?)", (image_id, event_type, description, confidence, datetime.datetime.now().isoformat()) ) conn.commit() conn.close() def get_images(self, limit=50, offset=0, analyzed_only=False, date_folder=None): """获取图片列表""" conn = self._get_conn() conn.row_factory = sqlite3.Row query = "SELECT * FROM images" params = [] conditions = [] if analyzed_only: conditions.append("analyzed = 1") if date_folder: conditions.append("date_folder = ?") params.append(date_folder) if conditions: query += " WHERE " + " AND ".join(conditions) query += " ORDER BY timestamp DESC LIMIT ? OFFSET ?" params.extend([limit, offset]) cursor = conn.execute(query, params) rows = cursor.fetchall() conn.close() return [dict(row) for row in rows] def get_events(self, limit=50, offset=0, event_type=None): """获取事件列表""" conn = self._get_conn() conn.row_factory = sqlite3.Row if event_type: cursor = conn.execute( """SELECT e.*, i.path as image_path, i.timestamp as image_timestamp, i.id as image_id FROM events e JOIN images i ON e.image_id = i.id WHERE e.event_type = ? ORDER BY e.timestamp DESC LIMIT ? OFFSET ?""", (event_type, limit, offset) ) else: cursor = conn.execute( """SELECT e.*, i.path as image_path, i.timestamp as image_timestamp, i.id as image_id FROM events e JOIN images i ON e.image_id = i.id ORDER BY e.timestamp DESC LIMIT ? OFFSET ?""", (limit, offset) ) rows = cursor.fetchall() conn.close() return [dict(row) for row in rows] def get_image_by_id(self, image_id): """获取单个图片""" conn = self._get_conn() conn.row_factory = sqlite3.Row cursor = conn.execute("SELECT * FROM images WHERE id = ?", (image_id,)) row = cursor.fetchone() conn.close() return dict(row) if row else None def get_events_by_image(self, image_id): """获取图片相关的事件""" conn = self._get_conn() conn.row_factory = sqlite3.Row cursor = conn.execute( "SELECT * FROM events WHERE image_id = ? ORDER BY timestamp DESC", (image_id,) ) rows = cursor.fetchall() conn.close() return [dict(row) for row in rows] def get_stats(self): """获取统计信息""" conn = self._get_conn() # 图片统计 total_images = conn.execute("SELECT COUNT(*) FROM images").fetchone()[0] analyzed_images = conn.execute("SELECT COUNT(*) FROM images WHERE analyzed = 1").fetchone()[0] # 事件统计 total_events = conn.execute("SELECT COUNT(*) FROM events").fetchone()[0] # 事件类型统计 cursor = conn.execute( "SELECT event_type, COUNT(*) as count FROM events GROUP BY event_type ORDER BY count DESC" ) event_types = [{'type': row[0], 'count': row[1]} for row in cursor.fetchall()] # 日期文件夹统计 cursor = conn.execute( "SELECT date_folder, COUNT(*) as count FROM images WHERE date_folder IS NOT NULL GROUP BY date_folder ORDER BY date_folder DESC" ) date_folders = [{'date': row[0], 'count': row[1]} for row in cursor.fetchall()] conn.close() return { 'total_images': total_images, 'analyzed_images': analyzed_images, 'unanalyzed_images': total_images - analyzed_images, 'total_events': total_events, 'event_types': event_types, 'date_folders': date_folders } def delete_image(self, image_id): """删除图片和相关事件""" conn = self._get_conn() # 获取图片路径 cursor = conn.execute("SELECT path FROM images WHERE id = ?", (image_id,)) row = cursor.fetchone() if row: image_path = row[0] # 删除事件 conn.execute("DELETE FROM events WHERE image_id = ?", (image_id,)) # 删除图片记录 conn.execute("DELETE FROM images WHERE id = ?", (image_id,)) conn.commit() conn.close() # 删除实际文件 try: Path(image_path).unlink(missing_ok=True) except: pass return True conn.close() return False def get_unanalyzed_images(self, limit=10): """获取未分析的图片""" conn = self._get_conn() conn.row_factory = sqlite3.Row cursor = conn.execute( "SELECT * FROM images WHERE analyzed = 0 ORDER BY timestamp ASC LIMIT ?", (limit,) ) rows = cursor.fetchall() conn.close() return [dict(row) for row in rows] def get_date_folders(self): """获取所有日期文件夹""" conn = self._get_conn() cursor = conn.execute( "SELECT DISTINCT date_folder FROM images WHERE date_folder IS NOT NULL ORDER BY date_folder DESC" ) folders = [row[0] for row in cursor.fetchall()] conn.close() return folders # 全局实例 db = Database()