479 lines
14 KiB
Python
479 lines
14 KiB
Python
"""
|
||
Web 后端 - FastAPI
|
||
"""
|
||
import sqlite3
|
||
from fastapi import FastAPI, HTTPException
|
||
from fastapi.staticfiles import StaticFiles
|
||
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse
|
||
from pathlib import Path
|
||
import uvicorn
|
||
|
||
from config import WEB_PORT, WEB_HOST, config_mgr, DATA_DIR
|
||
from database import db
|
||
from scheduler import scheduler
|
||
from camera import list_available_cameras, CameraCapture
|
||
|
||
|
||
# 创建应用
|
||
app = FastAPI(title="视觉记录系统")
|
||
|
||
# 静态文件目录
|
||
STATIC_DIR = Path(__file__).parent / "static"
|
||
app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static")
|
||
|
||
# 图片目录 - 使用配置中的目录
|
||
@app.get("/images/{filename}")
|
||
async def get_image(filename: str):
|
||
"""获取图片(需要在实际目录中查找)"""
|
||
# 从数据库查找图片路径
|
||
images = db.get_images(limit=1000)
|
||
for img in images:
|
||
if img['path'].endswith(filename):
|
||
return FileResponse(img['path'])
|
||
raise HTTPException(status_code=404, detail="图片不存在")
|
||
|
||
|
||
# ============== 页面路由 ==============
|
||
|
||
@app.get("/", response_class=HTMLResponse)
|
||
async def index():
|
||
"""主页"""
|
||
# 添加时间戳参数防止缓存
|
||
index_file = STATIC_DIR / "index.html"
|
||
if index_file.exists():
|
||
content = index_file.read_text(encoding='utf-8')
|
||
# 在引用静态文件时添加版本参数
|
||
content = content.replace('/static/style.css', '/static/style.css?v=1.1')
|
||
content = content.replace('/static/app.js', '/static/app.js?v=1.1')
|
||
return HTMLResponse(content=content)
|
||
return HTMLResponse("<h1>请创建 index.html</h1>")
|
||
|
||
|
||
# ============== 配置 API ==============
|
||
|
||
@app.get("/api/config")
|
||
async def get_config():
|
||
"""获取所有配置"""
|
||
return config_mgr.get_all()
|
||
|
||
|
||
@app.post("/api/config")
|
||
async def update_config(data: dict):
|
||
"""更新配置"""
|
||
config_mgr.update(data)
|
||
return {"success": True, "config": config_mgr.get_all()}
|
||
|
||
|
||
@app.get("/api/config/images-dir")
|
||
async def get_images_dir():
|
||
"""获取图片保存目录"""
|
||
return {
|
||
"images_dir": config_mgr.get('images_dir'),
|
||
"today_folder": str(config_mgr.get_images_dir())
|
||
}
|
||
|
||
|
||
@app.post("/api/config/images-dir")
|
||
async def set_images_dir(path: str):
|
||
"""设置图片保存目录"""
|
||
from pathlib import Path
|
||
try:
|
||
Path(path).mkdir(parents=True, exist_ok=True)
|
||
config_mgr.set('images_dir', path)
|
||
return {"success": True, "images_dir": path}
|
||
except Exception as e:
|
||
raise HTTPException(status_code=400, detail=str(e))
|
||
|
||
|
||
# ============== 状态 API ==============
|
||
|
||
@app.get("/api/status")
|
||
async def get_status():
|
||
"""获取系统状态"""
|
||
stats = db.get_stats()
|
||
scheduler_status = scheduler.get_status()
|
||
config = config_mgr.get_all()
|
||
|
||
return {
|
||
"scheduler": scheduler_status,
|
||
"stats": stats,
|
||
"config": config
|
||
}
|
||
|
||
|
||
@app.get("/api/cameras")
|
||
async def get_cameras():
|
||
"""获取可用摄像头"""
|
||
cameras = list_available_cameras()
|
||
details = []
|
||
for cam_index in cameras:
|
||
cam = CameraCapture()
|
||
cam.camera_index = cam_index
|
||
details.append(cam.get_camera_info())
|
||
cam.close()
|
||
return {"cameras": details}
|
||
|
||
|
||
# ============== 调度控制 API ==============
|
||
|
||
@app.post("/api/scheduler/start")
|
||
async def start_scheduler():
|
||
"""启动定时拍照"""
|
||
result = scheduler.start()
|
||
if not result['success']:
|
||
raise HTTPException(status_code=400, detail=result['error'])
|
||
return result
|
||
|
||
|
||
@app.post("/api/scheduler/stop")
|
||
async def stop_scheduler():
|
||
"""停止定时拍照"""
|
||
return scheduler.stop()
|
||
|
||
|
||
@app.post("/api/scheduler/interval")
|
||
async def set_interval(interval: int):
|
||
"""设置拍照间隔"""
|
||
if interval < 10:
|
||
raise HTTPException(status_code=400, detail="间隔不能小于10秒")
|
||
return scheduler.set_interval(interval)
|
||
|
||
|
||
@app.post("/api/capture")
|
||
async def capture_now():
|
||
"""立即拍照"""
|
||
result = scheduler.capture_now()
|
||
if not result['success']:
|
||
raise HTTPException(status_code=500, detail=result['error'])
|
||
return result
|
||
|
||
|
||
# ============== 分析 API ==============
|
||
|
||
@app.post("/api/analyze/{image_id}")
|
||
async def analyze_image(image_id: int):
|
||
"""分析指定图片"""
|
||
result = scheduler.analyze_now(image_id)
|
||
if not result['success']:
|
||
raise HTTPException(status_code=500, detail=result['error'])
|
||
return result
|
||
|
||
|
||
@app.post("/api/analyze/unanalyzed")
|
||
async def analyze_unanalyzed():
|
||
"""分析所有未分析的图片"""
|
||
results = scheduler.analyze_unanalyzed()
|
||
return {"results": results}
|
||
|
||
|
||
@app.get("/api/persons")
|
||
async def get_persons():
|
||
"""获取人员列表"""
|
||
from person_manager import person_manager
|
||
return {"persons": person_manager.get_persons_list(), "stats": person_manager.get_stats()}
|
||
|
||
|
||
@app.delete("/api/persons/{person_id}")
|
||
async def delete_person(person_id: str):
|
||
"""删除人员"""
|
||
from person_manager import person_manager
|
||
if person_id in person_manager.persons:
|
||
del person_manager.persons[person_id]
|
||
person_manager._save_persons_db()
|
||
return {"success": True}
|
||
raise HTTPException(status_code=404, detail="人员不存在")
|
||
|
||
|
||
@app.post("/api/persons/{person_id}/rename")
|
||
async def rename_person(person_id: str, name: str):
|
||
"""重命名人员"""
|
||
from person_manager import person_manager
|
||
if person_id in person_manager.persons:
|
||
person_manager.persons[person_id]['name'] = name
|
||
person_manager._save_persons_db()
|
||
return {"success": True, "name": name}
|
||
raise HTTPException(status_code=404, detail="人员不存在")
|
||
|
||
|
||
@app.get("/api/persons/{person_id}/face")
|
||
async def get_person_face(person_id: str):
|
||
"""获取人员人脸图片"""
|
||
from person_manager import person_manager
|
||
from pathlib import Path
|
||
|
||
if person_id in person_manager.persons:
|
||
face_path = person_manager.persons[person_id].get('face_path', '')
|
||
if face_path and Path(face_path).exists():
|
||
return FileResponse(face_path)
|
||
|
||
# 检查 faces_dir 中的默认图片
|
||
face_file = person_manager.faces_dir / f"{person_id}.jpg"
|
||
if face_file.exists():
|
||
return FileResponse(str(face_file))
|
||
|
||
raise HTTPException(status_code=404, detail="人员图片不存在")
|
||
|
||
|
||
@app.get("/api/stats/daily")
|
||
async def get_daily_stats(date: str = None):
|
||
"""获取每日统计数据"""
|
||
from datetime import datetime, timedelta
|
||
import re
|
||
|
||
# 默认昨天的数据
|
||
if date is None:
|
||
yesterday = datetime.now() - timedelta(days=1)
|
||
date = yesterday.strftime('%Y-%m-%d')
|
||
|
||
conn = sqlite3.connect(db.db_path)
|
||
conn.row_factory = sqlite3.Row
|
||
|
||
# 获取当天所有图片
|
||
cursor = conn.execute(
|
||
"SELECT id, timestamp, analyzed FROM images WHERE date_folder = ? ORDER BY timestamp",
|
||
(date,)
|
||
)
|
||
images = [dict(row) for row in cursor.fetchall()]
|
||
|
||
# 获取每个时间点的人数
|
||
timeline = []
|
||
for img in images:
|
||
# 获取该图片的事件
|
||
cursor = conn.execute(
|
||
"SELECT event_type, description FROM events WHERE image_id = ?",
|
||
(img['id'],)
|
||
)
|
||
events = cursor.fetchall()
|
||
|
||
# 计算人数 - 从事件描述中提取
|
||
person_count = 0
|
||
|
||
# 方法1: 从"共 X 人"格式提取
|
||
for event in events:
|
||
desc = event['description']
|
||
if '共 ' in desc and ' 人' in desc:
|
||
try:
|
||
count_str = desc.split('共 ')[1].split(' 人')[0]
|
||
person_count = int(count_str)
|
||
break
|
||
except:
|
||
pass
|
||
|
||
# 方法2: 从"当前 X 人"格式提取
|
||
if person_count == 0:
|
||
for event in events:
|
||
desc = event['description']
|
||
if '当前 ' in desc and ' 人' in desc:
|
||
try:
|
||
count_str = desc.split('当前 ')[1].split(' 人')[0]
|
||
person_count = int(count_str)
|
||
break
|
||
except:
|
||
pass
|
||
|
||
# 方法3: 从"当前剩 X 人"格式提取
|
||
if person_count == 0:
|
||
for event in events:
|
||
desc = event['description']
|
||
if '当前剩 ' in desc and ' 人' in desc:
|
||
try:
|
||
count_str = desc.split('当前剩 ')[1].split(' 人')[0]
|
||
person_count = int(count_str)
|
||
break
|
||
except:
|
||
pass
|
||
|
||
# 方法4: 从人物活动事件推断
|
||
if person_count == 0:
|
||
person_events = [e for e in events if '人物活动' in e['event_type'] or '人员进出' in e['event_type']]
|
||
if person_events:
|
||
# 统计 #1, #2, #3 等序号
|
||
max_index = 0
|
||
for e in person_events:
|
||
desc = e['description']
|
||
# 提取所有 #数字 格式
|
||
import re
|
||
matches = re.findall(r'#(\d+)', desc)
|
||
if matches:
|
||
max_index = max(max_index, max([int(m) for m in matches]))
|
||
if max_index > 0:
|
||
person_count = max_index
|
||
|
||
timeline.append({
|
||
'time': img['timestamp'],
|
||
'image_id': img['id'],
|
||
'person_count': person_count,
|
||
'analyzed': img['analyzed']
|
||
})
|
||
|
||
# 统计汇总
|
||
total_images = len(images)
|
||
|
||
cursor = conn.execute(
|
||
"SELECT COUNT(*) FROM events WHERE image_id IN (SELECT id FROM images WHERE date_folder = ?)",
|
||
(date,)
|
||
)
|
||
total_events = cursor.fetchone()[0]
|
||
|
||
# 事件类型统计
|
||
cursor = conn.execute(
|
||
"""SELECT event_type, COUNT(*) as count
|
||
FROM events WHERE image_id IN
|
||
(SELECT id FROM images WHERE date_folder = ?)
|
||
GROUP BY event_type""",
|
||
(date,)
|
||
)
|
||
event_types = [{'type': row[0], 'count': row[1]} for row in cursor.fetchall()]
|
||
|
||
conn.close()
|
||
|
||
return {
|
||
'date': date,
|
||
'timeline': timeline,
|
||
'total_images': total_images,
|
||
'total_events': total_events,
|
||
'event_types': event_types
|
||
}
|
||
|
||
|
||
@app.get("/api/stats/history")
|
||
async def get_history_stats(days: int = 7):
|
||
"""获取历史统计(最近N天)"""
|
||
from datetime import datetime, timedelta
|
||
|
||
conn = db._get_conn()
|
||
|
||
history = []
|
||
|
||
for i in range(days):
|
||
date = (datetime.now() - timedelta(days=i)).strftime('%Y-%m-%d')
|
||
|
||
# 统计当天的数据
|
||
total_images = conn.execute(
|
||
"SELECT COUNT(*) FROM images WHERE date_folder = ?",
|
||
(date,)
|
||
).fetchone()[0]
|
||
|
||
total_events = conn.execute(
|
||
"SELECT COUNT(*) FROM events WHERE image_id IN (SELECT id FROM images WHERE date_folder = ?)",
|
||
(date,)
|
||
).fetchone()[0]
|
||
|
||
# 人数变化统计
|
||
cursor = conn.execute(
|
||
"""SELECT e.event_type, COUNT(*) as count
|
||
FROM events e JOIN images i ON e.image_id = i.id
|
||
WHERE i.date_folder = ? AND e.event_type LIKE '%人员进出%'
|
||
GROUP BY e.event_type""",
|
||
(date,)
|
||
)
|
||
person_changes = sum(row[1] for row in cursor.fetchall())
|
||
|
||
history.append({
|
||
'date': date,
|
||
'total_images': total_images,
|
||
'total_events': total_events,
|
||
'person_changes': person_changes
|
||
})
|
||
|
||
conn.close()
|
||
|
||
return {'history': history, 'days': days}
|
||
|
||
|
||
# ============== 图片 API ==============
|
||
|
||
@app.get("/api/images")
|
||
async def get_images(
|
||
limit: int = 50,
|
||
offset: int = 0,
|
||
analyzed_only: bool = False,
|
||
date_folder: str = None
|
||
):
|
||
"""获取图片列表"""
|
||
# 使用配置中的显示数量
|
||
config_limit = config_mgr.get('display_limit', 20)
|
||
if limit > config_limit:
|
||
limit = config_limit
|
||
|
||
images = db.get_images(limit=limit, offset=offset, analyzed_only=analyzed_only, date_folder=date_folder)
|
||
|
||
# 为每张图片添加事件摘要
|
||
for img in images:
|
||
events = db.get_events_by_image(img['id'])
|
||
img['events_summary'] = ', '.join([e['event_type'] for e in events]) if events else '无'
|
||
img['events_count'] = len(events)
|
||
|
||
return {"images": images, "total": len(images)}
|
||
|
||
|
||
@app.get("/api/images/{image_id}")
|
||
async def get_image(image_id: int):
|
||
"""获取单个图片详情"""
|
||
image = db.get_image_by_id(image_id)
|
||
if not image:
|
||
raise HTTPException(status_code=404, detail="图片不存在")
|
||
|
||
events = db.get_events_by_image(image_id)
|
||
image['events'] = events
|
||
return image
|
||
|
||
|
||
@app.delete("/api/images/{image_id}")
|
||
async def delete_image(image_id: int):
|
||
"""删除图片"""
|
||
success = db.delete_image(image_id)
|
||
if not success:
|
||
raise HTTPException(status_code=404, detail="图片不存在")
|
||
return {"success": True}
|
||
|
||
|
||
@app.get("/api/date-folders")
|
||
async def get_date_folders():
|
||
"""获取日期文件夹列表"""
|
||
stats = db.get_stats()
|
||
return {"folders": stats['date_folders']}
|
||
|
||
|
||
# ============== 事件 API ==============
|
||
|
||
@app.get("/api/events")
|
||
async def get_events(limit: int = 50, offset: int = 0, event_type: str = None):
|
||
"""获取事件列表"""
|
||
config_limit = config_mgr.get('display_limit', 20)
|
||
if limit > config_limit:
|
||
limit = config_limit
|
||
|
||
events = db.get_events(limit=limit, offset=offset, event_type=event_type)
|
||
return {"events": events, "total": len(events)}
|
||
|
||
|
||
@app.get("/api/stats")
|
||
async def get_stats():
|
||
"""获取统计信息"""
|
||
return db.get_stats()
|
||
|
||
|
||
# ============== 启动 ==============
|
||
|
||
def run_web():
|
||
"""启动 Web 服务"""
|
||
import uvicorn
|
||
import signal
|
||
import sys
|
||
|
||
config = uvicorn.Config(app, host=WEB_HOST, port=WEB_PORT, log_level="info")
|
||
server = uvicorn.Server(config)
|
||
|
||
def signal_handler(sig, frame):
|
||
print("\nShutting down...")
|
||
server.should_exit = True
|
||
sys.exit(0)
|
||
|
||
signal.signal(signal.SIGINT, signal_handler)
|
||
|
||
server.run()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
run_web() |