""" Web 后端 - FastAPI """ import sqlite3 from fastapi import FastAPI, HTTPException from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse, HTMLResponse, JSONResponse from pathlib import Path import uvicorn from config import WEB_PORT, WEB_HOST, config_mgr, DATA_DIR from database import db from scheduler import scheduler from camera import list_available_cameras, CameraCapture # 创建应用 app = FastAPI(title="视觉记录系统") # 静态文件目录 STATIC_DIR = Path(__file__).parent / "static" app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static") # 图片目录 - 使用配置中的目录 @app.get("/images/{filename}") async def get_image(filename: str): """获取图片(需要在实际目录中查找)""" # 从数据库查找图片路径 images = db.get_images(limit=1000) for img in images: if img['path'].endswith(filename): return FileResponse(img['path']) raise HTTPException(status_code=404, detail="图片不存在") # ============== 页面路由 ============== @app.get("/", response_class=HTMLResponse) async def index(): """主页""" # 添加时间戳参数防止缓存 index_file = STATIC_DIR / "index.html" if index_file.exists(): content = index_file.read_text(encoding='utf-8') # 在引用静态文件时添加版本参数 content = content.replace('/static/style.css', '/static/style.css?v=1.1') content = content.replace('/static/app.js', '/static/app.js?v=1.1') return HTMLResponse(content=content) return HTMLResponse("

请创建 index.html

") # ============== 配置 API ============== @app.get("/api/config") async def get_config(): """获取所有配置""" return config_mgr.get_all() @app.post("/api/config") async def update_config(data: dict): """更新配置""" config_mgr.update(data) return {"success": True, "config": config_mgr.get_all()} @app.get("/api/config/images-dir") async def get_images_dir(): """获取图片保存目录""" return { "images_dir": config_mgr.get('images_dir'), "today_folder": str(config_mgr.get_images_dir()) } @app.post("/api/config/images-dir") async def set_images_dir(path: str): """设置图片保存目录""" from pathlib import Path try: Path(path).mkdir(parents=True, exist_ok=True) config_mgr.set('images_dir', path) return {"success": True, "images_dir": path} except Exception as e: raise HTTPException(status_code=400, detail=str(e)) # ============== 状态 API ============== @app.get("/api/status") async def get_status(): """获取系统状态""" stats = db.get_stats() scheduler_status = scheduler.get_status() config = config_mgr.get_all() return { "scheduler": scheduler_status, "stats": stats, "config": config } @app.get("/api/cameras") async def get_cameras(): """获取可用摄像头""" cameras = list_available_cameras() details = [] for cam_index in cameras: cam = CameraCapture() cam.camera_index = cam_index details.append(cam.get_camera_info()) cam.close() return {"cameras": details} # ============== 调度控制 API ============== @app.post("/api/scheduler/start") async def start_scheduler(): """启动定时拍照""" result = scheduler.start() if not result['success']: raise HTTPException(status_code=400, detail=result['error']) return result @app.post("/api/scheduler/stop") async def stop_scheduler(): """停止定时拍照""" return scheduler.stop() @app.post("/api/scheduler/interval") async def set_interval(interval: int): """设置拍照间隔""" if interval < 10: raise HTTPException(status_code=400, detail="间隔不能小于10秒") return scheduler.set_interval(interval) @app.post("/api/capture") async def capture_now(): """立即拍照""" result = scheduler.capture_now() if not result['success']: raise HTTPException(status_code=500, detail=result['error']) return result # ============== 分析 API ============== @app.post("/api/analyze/{image_id}") async def analyze_image(image_id: int): """分析指定图片""" result = scheduler.analyze_now(image_id) if not result['success']: raise HTTPException(status_code=500, detail=result['error']) return result @app.post("/api/analyze/unanalyzed") async def analyze_unanalyzed(): """分析所有未分析的图片""" results = scheduler.analyze_unanalyzed() return {"results": results} @app.get("/api/persons") async def get_persons(): """获取人员列表""" from person_manager import person_manager return {"persons": person_manager.get_persons_list(), "stats": person_manager.get_stats()} @app.delete("/api/persons/{person_id}") async def delete_person(person_id: str): """删除人员""" from person_manager import person_manager if person_id in person_manager.persons: del person_manager.persons[person_id] person_manager._save_persons_db() return {"success": True} raise HTTPException(status_code=404, detail="人员不存在") @app.post("/api/persons/{person_id}/rename") async def rename_person(person_id: str, name: str): """重命名人员""" from person_manager import person_manager if person_id in person_manager.persons: person_manager.persons[person_id]['name'] = name person_manager._save_persons_db() return {"success": True, "name": name} raise HTTPException(status_code=404, detail="人员不存在") @app.get("/api/persons/{person_id}/face") async def get_person_face(person_id: str): """获取人员人脸图片""" from person_manager import person_manager from pathlib import Path if person_id in person_manager.persons: face_path = person_manager.persons[person_id].get('face_path', '') if face_path and Path(face_path).exists(): return FileResponse(face_path) # 检查 faces_dir 中的默认图片 face_file = person_manager.faces_dir / f"{person_id}.jpg" if face_file.exists(): return FileResponse(str(face_file)) raise HTTPException(status_code=404, detail="人员图片不存在") @app.get("/api/stats/daily") async def get_daily_stats(date: str = None): """获取每日统计数据""" from datetime import datetime, timedelta import re # 默认昨天的数据 if date is None: yesterday = datetime.now() - timedelta(days=1) date = yesterday.strftime('%Y-%m-%d') conn = sqlite3.connect(db.db_path) conn.row_factory = sqlite3.Row # 获取当天所有图片 cursor = conn.execute( "SELECT id, timestamp, analyzed FROM images WHERE date_folder = ? ORDER BY timestamp", (date,) ) images = [dict(row) for row in cursor.fetchall()] # 获取每个时间点的人数 timeline = [] for img in images: # 获取该图片的事件 cursor = conn.execute( "SELECT event_type, description FROM events WHERE image_id = ?", (img['id'],) ) events = cursor.fetchall() # 计算人数 - 从事件描述中提取 person_count = 0 # 方法1: 从"共 X 人"格式提取 for event in events: desc = event['description'] if '共 ' in desc and ' 人' in desc: try: count_str = desc.split('共 ')[1].split(' 人')[0] person_count = int(count_str) break except: pass # 方法2: 从"当前 X 人"格式提取 if person_count == 0: for event in events: desc = event['description'] if '当前 ' in desc and ' 人' in desc: try: count_str = desc.split('当前 ')[1].split(' 人')[0] person_count = int(count_str) break except: pass # 方法3: 从"当前剩 X 人"格式提取 if person_count == 0: for event in events: desc = event['description'] if '当前剩 ' in desc and ' 人' in desc: try: count_str = desc.split('当前剩 ')[1].split(' 人')[0] person_count = int(count_str) break except: pass # 方法4: 从人物活动事件推断 if person_count == 0: person_events = [e for e in events if '人物活动' in e['event_type'] or '人员进出' in e['event_type']] if person_events: # 统计 #1, #2, #3 等序号 max_index = 0 for e in person_events: desc = e['description'] # 提取所有 #数字 格式 import re matches = re.findall(r'#(\d+)', desc) if matches: max_index = max(max_index, max([int(m) for m in matches])) if max_index > 0: person_count = max_index timeline.append({ 'time': img['timestamp'], 'image_id': img['id'], 'person_count': person_count, 'analyzed': img['analyzed'] }) # 统计汇总 total_images = len(images) cursor = conn.execute( "SELECT COUNT(*) FROM events WHERE image_id IN (SELECT id FROM images WHERE date_folder = ?)", (date,) ) total_events = cursor.fetchone()[0] # 事件类型统计 cursor = conn.execute( """SELECT event_type, COUNT(*) as count FROM events WHERE image_id IN (SELECT id FROM images WHERE date_folder = ?) GROUP BY event_type""", (date,) ) event_types = [{'type': row[0], 'count': row[1]} for row in cursor.fetchall()] conn.close() return { 'date': date, 'timeline': timeline, 'total_images': total_images, 'total_events': total_events, 'event_types': event_types } @app.get("/api/stats/history") async def get_history_stats(days: int = 7): """获取历史统计(最近N天)""" from datetime import datetime, timedelta conn = db._get_conn() history = [] for i in range(days): date = (datetime.now() - timedelta(days=i)).strftime('%Y-%m-%d') # 统计当天的数据 total_images = conn.execute( "SELECT COUNT(*) FROM images WHERE date_folder = ?", (date,) ).fetchone()[0] total_events = conn.execute( "SELECT COUNT(*) FROM events WHERE image_id IN (SELECT id FROM images WHERE date_folder = ?)", (date,) ).fetchone()[0] # 人数变化统计 cursor = conn.execute( """SELECT e.event_type, COUNT(*) as count FROM events e JOIN images i ON e.image_id = i.id WHERE i.date_folder = ? AND e.event_type LIKE '%人员进出%' GROUP BY e.event_type""", (date,) ) person_changes = sum(row[1] for row in cursor.fetchall()) history.append({ 'date': date, 'total_images': total_images, 'total_events': total_events, 'person_changes': person_changes }) conn.close() return {'history': history, 'days': days} # ============== 图片 API ============== @app.get("/api/images") async def get_images( limit: int = 50, offset: int = 0, analyzed_only: bool = False, date_folder: str = None ): """获取图片列表""" # 使用配置中的显示数量 config_limit = config_mgr.get('display_limit', 20) if limit > config_limit: limit = config_limit images = db.get_images(limit=limit, offset=offset, analyzed_only=analyzed_only, date_folder=date_folder) # 为每张图片添加事件摘要 for img in images: events = db.get_events_by_image(img['id']) img['events_summary'] = ', '.join([e['event_type'] for e in events]) if events else '无' img['events_count'] = len(events) return {"images": images, "total": len(images)} @app.get("/api/images/{image_id}") async def get_image(image_id: int): """获取单个图片详情""" image = db.get_image_by_id(image_id) if not image: raise HTTPException(status_code=404, detail="图片不存在") events = db.get_events_by_image(image_id) image['events'] = events return image @app.delete("/api/images/{image_id}") async def delete_image(image_id: int): """删除图片""" success = db.delete_image(image_id) if not success: raise HTTPException(status_code=404, detail="图片不存在") return {"success": True} @app.get("/api/date-folders") async def get_date_folders(): """获取日期文件夹列表""" stats = db.get_stats() return {"folders": stats['date_folders']} # ============== 事件 API ============== @app.get("/api/events") async def get_events(limit: int = 50, offset: int = 0, event_type: str = None): """获取事件列表""" config_limit = config_mgr.get('display_limit', 20) if limit > config_limit: limit = config_limit events = db.get_events(limit=limit, offset=offset, event_type=event_type) return {"events": events, "total": len(events)} @app.get("/api/stats") async def get_stats(): """获取统计信息""" return db.get_stats() # ============== 启动 ============== def run_web(): """启动 Web 服务""" import uvicorn import signal import sys config = uvicorn.Config(app, host=WEB_HOST, port=WEB_PORT, log_level="info") server = uvicorn.Server(config) def signal_handler(sig, frame): print("\nShutting down...") server.should_exit = True sys.exit(0) signal.signal(signal.SIGINT, signal_handler) server.run() if __name__ == "__main__": run_web()