Files
vision-record/web/app.py

479 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Web 后端 - FastAPI
"""
import sqlite3
from fastapi import FastAPI, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse
from pathlib import Path
import uvicorn
from config import WEB_PORT, WEB_HOST, config_mgr, DATA_DIR
from database import db
from scheduler import scheduler
from camera import list_available_cameras, CameraCapture
# 创建应用
app = FastAPI(title="视觉记录系统")
# 静态文件目录
STATIC_DIR = Path(__file__).parent / "static"
app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static")
# 图片目录 - 使用配置中的目录
@app.get("/images/{filename}")
async def get_image(filename: str):
"""获取图片(需要在实际目录中查找)"""
# 从数据库查找图片路径
images = db.get_images(limit=1000)
for img in images:
if img['path'].endswith(filename):
return FileResponse(img['path'])
raise HTTPException(status_code=404, detail="图片不存在")
# ============== 页面路由 ==============
@app.get("/", response_class=HTMLResponse)
async def index():
"""主页"""
# 添加时间戳参数防止缓存
index_file = STATIC_DIR / "index.html"
if index_file.exists():
content = index_file.read_text(encoding='utf-8')
# 在引用静态文件时添加版本参数
content = content.replace('/static/style.css', '/static/style.css?v=1.1')
content = content.replace('/static/app.js', '/static/app.js?v=1.1')
return HTMLResponse(content=content)
return HTMLResponse("<h1>请创建 index.html</h1>")
# ============== 配置 API ==============
@app.get("/api/config")
async def get_config():
"""获取所有配置"""
return config_mgr.get_all()
@app.post("/api/config")
async def update_config(data: dict):
"""更新配置"""
config_mgr.update(data)
return {"success": True, "config": config_mgr.get_all()}
@app.get("/api/config/images-dir")
async def get_images_dir():
"""获取图片保存目录"""
return {
"images_dir": config_mgr.get('images_dir'),
"today_folder": str(config_mgr.get_images_dir())
}
@app.post("/api/config/images-dir")
async def set_images_dir(path: str):
"""设置图片保存目录"""
from pathlib import Path
try:
Path(path).mkdir(parents=True, exist_ok=True)
config_mgr.set('images_dir', path)
return {"success": True, "images_dir": path}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
# ============== 状态 API ==============
@app.get("/api/status")
async def get_status():
"""获取系统状态"""
stats = db.get_stats()
scheduler_status = scheduler.get_status()
config = config_mgr.get_all()
return {
"scheduler": scheduler_status,
"stats": stats,
"config": config
}
@app.get("/api/cameras")
async def get_cameras():
"""获取可用摄像头"""
cameras = list_available_cameras()
details = []
for cam_index in cameras:
cam = CameraCapture()
cam.camera_index = cam_index
details.append(cam.get_camera_info())
cam.close()
return {"cameras": details}
# ============== 调度控制 API ==============
@app.post("/api/scheduler/start")
async def start_scheduler():
"""启动定时拍照"""
result = scheduler.start()
if not result['success']:
raise HTTPException(status_code=400, detail=result['error'])
return result
@app.post("/api/scheduler/stop")
async def stop_scheduler():
"""停止定时拍照"""
return scheduler.stop()
@app.post("/api/scheduler/interval")
async def set_interval(interval: int):
"""设置拍照间隔"""
if interval < 10:
raise HTTPException(status_code=400, detail="间隔不能小于10秒")
return scheduler.set_interval(interval)
@app.post("/api/capture")
async def capture_now():
"""立即拍照"""
result = scheduler.capture_now()
if not result['success']:
raise HTTPException(status_code=500, detail=result['error'])
return result
# ============== 分析 API ==============
@app.post("/api/analyze/{image_id}")
async def analyze_image(image_id: int):
"""分析指定图片"""
result = scheduler.analyze_now(image_id)
if not result['success']:
raise HTTPException(status_code=500, detail=result['error'])
return result
@app.post("/api/analyze/unanalyzed")
async def analyze_unanalyzed():
"""分析所有未分析的图片"""
results = scheduler.analyze_unanalyzed()
return {"results": results}
@app.get("/api/persons")
async def get_persons():
"""获取人员列表"""
from person_manager import person_manager
return {"persons": person_manager.get_persons_list(), "stats": person_manager.get_stats()}
@app.delete("/api/persons/{person_id}")
async def delete_person(person_id: str):
"""删除人员"""
from person_manager import person_manager
if person_id in person_manager.persons:
del person_manager.persons[person_id]
person_manager._save_persons_db()
return {"success": True}
raise HTTPException(status_code=404, detail="人员不存在")
@app.post("/api/persons/{person_id}/rename")
async def rename_person(person_id: str, name: str):
"""重命名人员"""
from person_manager import person_manager
if person_id in person_manager.persons:
person_manager.persons[person_id]['name'] = name
person_manager._save_persons_db()
return {"success": True, "name": name}
raise HTTPException(status_code=404, detail="人员不存在")
@app.get("/api/persons/{person_id}/face")
async def get_person_face(person_id: str):
"""获取人员人脸图片"""
from person_manager import person_manager
from pathlib import Path
if person_id in person_manager.persons:
face_path = person_manager.persons[person_id].get('face_path', '')
if face_path and Path(face_path).exists():
return FileResponse(face_path)
# 检查 faces_dir 中的默认图片
face_file = person_manager.faces_dir / f"{person_id}.jpg"
if face_file.exists():
return FileResponse(str(face_file))
raise HTTPException(status_code=404, detail="人员图片不存在")
@app.get("/api/stats/daily")
async def get_daily_stats(date: str = None):
"""获取每日统计数据"""
from datetime import datetime, timedelta
import re
# 默认昨天的数据
if date is None:
yesterday = datetime.now() - timedelta(days=1)
date = yesterday.strftime('%Y-%m-%d')
conn = sqlite3.connect(db.db_path)
conn.row_factory = sqlite3.Row
# 获取当天所有图片
cursor = conn.execute(
"SELECT id, timestamp, analyzed FROM images WHERE date_folder = ? ORDER BY timestamp",
(date,)
)
images = [dict(row) for row in cursor.fetchall()]
# 获取每个时间点的人数
timeline = []
for img in images:
# 获取该图片的事件
cursor = conn.execute(
"SELECT event_type, description FROM events WHERE image_id = ?",
(img['id'],)
)
events = cursor.fetchall()
# 计算人数 - 从事件描述中提取
person_count = 0
# 方法1: 从"共 X 人"格式提取
for event in events:
desc = event['description']
if '' in desc and '' in desc:
try:
count_str = desc.split('')[1].split('')[0]
person_count = int(count_str)
break
except:
pass
# 方法2: 从"当前 X 人"格式提取
if person_count == 0:
for event in events:
desc = event['description']
if '当前 ' in desc and '' in desc:
try:
count_str = desc.split('当前 ')[1].split('')[0]
person_count = int(count_str)
break
except:
pass
# 方法3: 从"当前剩 X 人"格式提取
if person_count == 0:
for event in events:
desc = event['description']
if '当前剩 ' in desc and '' in desc:
try:
count_str = desc.split('当前剩 ')[1].split('')[0]
person_count = int(count_str)
break
except:
pass
# 方法4: 从人物活动事件推断
if person_count == 0:
person_events = [e for e in events if '人物活动' in e['event_type'] or '人员进出' in e['event_type']]
if person_events:
# 统计 #1, #2, #3 等序号
max_index = 0
for e in person_events:
desc = e['description']
# 提取所有 #数字 格式
import re
matches = re.findall(r'#(\d+)', desc)
if matches:
max_index = max(max_index, max([int(m) for m in matches]))
if max_index > 0:
person_count = max_index
timeline.append({
'time': img['timestamp'],
'image_id': img['id'],
'person_count': person_count,
'analyzed': img['analyzed']
})
# 统计汇总
total_images = len(images)
cursor = conn.execute(
"SELECT COUNT(*) FROM events WHERE image_id IN (SELECT id FROM images WHERE date_folder = ?)",
(date,)
)
total_events = cursor.fetchone()[0]
# 事件类型统计
cursor = conn.execute(
"""SELECT event_type, COUNT(*) as count
FROM events WHERE image_id IN
(SELECT id FROM images WHERE date_folder = ?)
GROUP BY event_type""",
(date,)
)
event_types = [{'type': row[0], 'count': row[1]} for row in cursor.fetchall()]
conn.close()
return {
'date': date,
'timeline': timeline,
'total_images': total_images,
'total_events': total_events,
'event_types': event_types
}
@app.get("/api/stats/history")
async def get_history_stats(days: int = 7):
"""获取历史统计最近N天"""
from datetime import datetime, timedelta
conn = db._get_conn()
history = []
for i in range(days):
date = (datetime.now() - timedelta(days=i)).strftime('%Y-%m-%d')
# 统计当天的数据
total_images = conn.execute(
"SELECT COUNT(*) FROM images WHERE date_folder = ?",
(date,)
).fetchone()[0]
total_events = conn.execute(
"SELECT COUNT(*) FROM events WHERE image_id IN (SELECT id FROM images WHERE date_folder = ?)",
(date,)
).fetchone()[0]
# 人数变化统计
cursor = conn.execute(
"""SELECT e.event_type, COUNT(*) as count
FROM events e JOIN images i ON e.image_id = i.id
WHERE i.date_folder = ? AND e.event_type LIKE '%人员进出%'
GROUP BY e.event_type""",
(date,)
)
person_changes = sum(row[1] for row in cursor.fetchall())
history.append({
'date': date,
'total_images': total_images,
'total_events': total_events,
'person_changes': person_changes
})
conn.close()
return {'history': history, 'days': days}
# ============== 图片 API ==============
@app.get("/api/images")
async def get_images(
limit: int = 50,
offset: int = 0,
analyzed_only: bool = False,
date_folder: str = None
):
"""获取图片列表"""
# 使用配置中的显示数量
config_limit = config_mgr.get('display_limit', 20)
if limit > config_limit:
limit = config_limit
images = db.get_images(limit=limit, offset=offset, analyzed_only=analyzed_only, date_folder=date_folder)
# 为每张图片添加事件摘要
for img in images:
events = db.get_events_by_image(img['id'])
img['events_summary'] = ', '.join([e['event_type'] for e in events]) if events else ''
img['events_count'] = len(events)
return {"images": images, "total": len(images)}
@app.get("/api/images/{image_id}")
async def get_image(image_id: int):
"""获取单个图片详情"""
image = db.get_image_by_id(image_id)
if not image:
raise HTTPException(status_code=404, detail="图片不存在")
events = db.get_events_by_image(image_id)
image['events'] = events
return image
@app.delete("/api/images/{image_id}")
async def delete_image(image_id: int):
"""删除图片"""
success = db.delete_image(image_id)
if not success:
raise HTTPException(status_code=404, detail="图片不存在")
return {"success": True}
@app.get("/api/date-folders")
async def get_date_folders():
"""获取日期文件夹列表"""
stats = db.get_stats()
return {"folders": stats['date_folders']}
# ============== 事件 API ==============
@app.get("/api/events")
async def get_events(limit: int = 50, offset: int = 0, event_type: str = None):
"""获取事件列表"""
config_limit = config_mgr.get('display_limit', 20)
if limit > config_limit:
limit = config_limit
events = db.get_events(limit=limit, offset=offset, event_type=event_type)
return {"events": events, "total": len(events)}
@app.get("/api/stats")
async def get_stats():
"""获取统计信息"""
return db.get_stats()
# ============== 启动 ==============
def run_web():
"""启动 Web 服务"""
import uvicorn
import signal
import sys
config = uvicorn.Config(app, host=WEB_HOST, port=WEB_PORT, log_level="info")
server = uvicorn.Server(config)
def signal_handler(sig, frame):
print("\nShutting down...")
server.should_exit = True
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
server.run()
if __name__ == "__main__":
run_web()