Files
vision-record/web/app.py

158 lines
4.2 KiB
Python

"""
Web 后端 - FastAPI
"""
from fastapi import FastAPI, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, HTMLResponse
from pathlib import Path
import uvicorn
import json
from config import WEB_PORT, WEB_HOST, IMAGES_DIR
from database import db
from scheduler import scheduler
from camera import list_available_cameras, CameraCapture
# 创建应用
app = FastAPI(title="视觉记录系统")
# 静态文件目录
STATIC_DIR = Path(__file__).parent / "static"
app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static")
app.mount("/images", StaticFiles(directory=str(IMAGES_DIR)), name="images")
# ============== 页面路由 ==============
@app.get("/", response_class=HTMLResponse)
async def index():
"""主页"""
index_file = STATIC_DIR / "index.html"
if index_file.exists():
return FileResponse(str(index_file))
return HTMLResponse("<h1>请创建 index.html</h1>")
# ============== API 路由 ==============
@app.get("/api/status")
async def get_status():
"""获取系统状态"""
stats = db.get_stats()
scheduler_status = scheduler.get_status()
return {
"scheduler": scheduler_status,
"stats": stats
}
@app.get("/api/cameras")
async def get_cameras():
"""获取可用摄像头"""
cameras = list_available_cameras()
details = []
for cam_index in cameras:
cam = CameraCapture(cam_index)
details.append(cam.get_camera_info())
cam.close()
return {"cameras": details}
@app.post("/api/scheduler/start")
async def start_scheduler(interval: int = None, auto_analyze: bool = None):
"""启动定时拍照"""
result = scheduler.start(interval=interval, auto_analyze=auto_analyze)
if not result['success']:
raise HTTPException(status_code=400, detail=result['error'])
return result
@app.post("/api/scheduler/stop")
async def stop_scheduler():
"""停止定时拍照"""
return scheduler.stop()
@app.post("/api/scheduler/interval")
async def set_interval(interval: int):
"""设置拍照间隔"""
return scheduler.set_interval(interval)
@app.post("/api/capture")
async def capture_now():
"""立即拍照"""
result = scheduler.capture_now()
if not result['success']:
raise HTTPException(status_code=500, detail=result['error'])
return result
@app.post("/api/analyze/{image_id}")
async def analyze_image(image_id: int):
"""分析指定图片"""
result = scheduler.analyze_now(image_id)
if not result['success']:
raise HTTPException(status_code=500, detail=result['error'])
return result
@app.post("/api/analyze/unanalyzed")
async def analyze_unanalyzed():
"""分析所有未分析的图片"""
results = scheduler.analyze_unanalyzed()
return {"results": results}
@app.get("/api/images")
async def get_images(limit: int = 50, offset: int = 0, analyzed_only: bool = False):
"""获取图片列表"""
images = db.get_images(limit=limit, offset=offset, analyzed_only=analyzed_only)
return {"images": images, "total": len(images)}
@app.get("/api/images/{image_id}")
async def get_image(image_id: int):
"""获取单个图片详情"""
image = db.get_image_by_id(image_id)
if not image:
raise HTTPException(status_code=404, detail="图片不存在")
events = db.get_events_by_image(image_id)
image['events'] = events
return image
@app.delete("/api/images/{image_id}")
async def delete_image(image_id: int):
"""删除图片"""
success = db.delete_image(image_id)
if not success:
raise HTTPException(status_code=404, detail="图片不存在")
return {"success": True}
@app.get("/api/events")
async def get_events(limit: int = 50, offset: int = 0, event_type: str = None):
"""获取事件列表"""
events = db.get_events(limit=limit, offset=offset, event_type=event_type)
return {"events": events, "total": len(events)}
@app.get("/api/stats")
async def get_stats():
"""获取统计信息"""
return db.get_stats()
# ============== 启动 ==============
def run_web():
"""启动 Web 服务"""
uvicorn.run(app, host=WEB_HOST, port=WEB_PORT)
if __name__ == "__main__":
run_web()