"""
Web 后端 - FastAPI
"""
import sqlite3
from fastapi import FastAPI, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse
from pathlib import Path
import uvicorn
from config import WEB_PORT, WEB_HOST, config_mgr, DATA_DIR
from database import db
from scheduler import scheduler
from camera import list_available_cameras, CameraCapture
# 创建应用
app = FastAPI(title="视觉记录系统")
# 静态文件目录
STATIC_DIR = Path(__file__).parent / "static"
app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static")
# 图片目录 - 使用配置中的目录
@app.get("/images/{filename}")
async def get_image(filename: str):
"""获取图片(需要在实际目录中查找)"""
# 从数据库查找图片路径
images = db.get_images(limit=1000)
for img in images:
if img['path'].endswith(filename):
return FileResponse(img['path'])
raise HTTPException(status_code=404, detail="图片不存在")
# ============== 页面路由 ==============
@app.get("/", response_class=HTMLResponse)
async def index():
"""主页"""
# 添加时间戳参数防止缓存
index_file = STATIC_DIR / "index.html"
if index_file.exists():
content = index_file.read_text(encoding='utf-8')
# 在引用静态文件时添加版本参数
content = content.replace('/static/style.css', '/static/style.css?v=1.1')
content = content.replace('/static/app.js', '/static/app.js?v=1.1')
return HTMLResponse(content=content)
return HTMLResponse("
请创建 index.html
")
# ============== 配置 API ==============
@app.get("/api/config")
async def get_config():
"""获取所有配置"""
return config_mgr.get_all()
@app.post("/api/config")
async def update_config(data: dict):
"""更新配置"""
config_mgr.update(data)
return {"success": True, "config": config_mgr.get_all()}
@app.get("/api/config/images-dir")
async def get_images_dir():
"""获取图片保存目录"""
return {
"images_dir": config_mgr.get('images_dir'),
"today_folder": str(config_mgr.get_images_dir())
}
@app.post("/api/config/images-dir")
async def set_images_dir(path: str):
"""设置图片保存目录"""
from pathlib import Path
try:
Path(path).mkdir(parents=True, exist_ok=True)
config_mgr.set('images_dir', path)
return {"success": True, "images_dir": path}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
# ============== 状态 API ==============
@app.get("/api/status")
async def get_status():
"""获取系统状态"""
stats = db.get_stats()
scheduler_status = scheduler.get_status()
config = config_mgr.get_all()
return {
"scheduler": scheduler_status,
"stats": stats,
"config": config
}
@app.get("/api/cameras")
async def get_cameras():
"""获取可用摄像头"""
cameras = list_available_cameras()
details = []
for cam_index in cameras:
cam = CameraCapture()
cam.camera_index = cam_index
details.append(cam.get_camera_info())
cam.close()
return {"cameras": details}
# ============== 调度控制 API ==============
@app.post("/api/scheduler/start")
async def start_scheduler():
"""启动定时拍照"""
result = scheduler.start()
if not result['success']:
raise HTTPException(status_code=400, detail=result['error'])
return result
@app.post("/api/scheduler/stop")
async def stop_scheduler():
"""停止定时拍照"""
return scheduler.stop()
@app.post("/api/scheduler/interval")
async def set_interval(interval: int):
"""设置拍照间隔"""
if interval < 10:
raise HTTPException(status_code=400, detail="间隔不能小于10秒")
return scheduler.set_interval(interval)
@app.post("/api/capture")
async def capture_now():
"""立即拍照"""
result = scheduler.capture_now()
if not result['success']:
raise HTTPException(status_code=500, detail=result['error'])
return result
# ============== 分析 API ==============
@app.post("/api/analyze/{image_id}")
async def analyze_image(image_id: int):
"""分析指定图片"""
result = scheduler.analyze_now(image_id)
if not result['success']:
raise HTTPException(status_code=500, detail=result['error'])
return result
@app.post("/api/analyze/unanalyzed")
async def analyze_unanalyzed():
"""分析所有未分析的图片"""
results = scheduler.analyze_unanalyzed()
return {"results": results}
@app.get("/api/persons")
async def get_persons():
"""获取人员列表"""
from person_manager import person_manager
return {"persons": person_manager.get_persons_list(), "stats": person_manager.get_stats()}
@app.delete("/api/persons/{person_id}")
async def delete_person(person_id: str):
"""删除人员"""
from person_manager import person_manager
if person_id in person_manager.persons:
del person_manager.persons[person_id]
person_manager._save_persons_db()
return {"success": True}
raise HTTPException(status_code=404, detail="人员不存在")
@app.post("/api/persons/{person_id}/rename")
async def rename_person(person_id: str, name: str):
"""重命名人员"""
from person_manager import person_manager
if person_id in person_manager.persons:
person_manager.persons[person_id]['name'] = name
person_manager._save_persons_db()
return {"success": True, "name": name}
raise HTTPException(status_code=404, detail="人员不存在")
@app.get("/api/persons/{person_id}/face")
async def get_person_face(person_id: str):
"""获取人员人脸图片"""
from person_manager import person_manager
from pathlib import Path
if person_id in person_manager.persons:
face_path = person_manager.persons[person_id].get('face_path', '')
if face_path and Path(face_path).exists():
return FileResponse(face_path)
# 检查 faces_dir 中的默认图片
face_file = person_manager.faces_dir / f"{person_id}.jpg"
if face_file.exists():
return FileResponse(str(face_file))
raise HTTPException(status_code=404, detail="人员图片不存在")
@app.get("/api/stats/daily")
async def get_daily_stats(date: str = None):
"""获取每日统计数据"""
from datetime import datetime, timedelta
import re
# 默认昨天的数据
if date is None:
yesterday = datetime.now() - timedelta(days=1)
date = yesterday.strftime('%Y-%m-%d')
conn = sqlite3.connect(db.db_path)
conn.row_factory = sqlite3.Row
# 获取当天所有图片
cursor = conn.execute(
"SELECT id, timestamp, analyzed FROM images WHERE date_folder = ? ORDER BY timestamp",
(date,)
)
images = [dict(row) for row in cursor.fetchall()]
# 获取每个时间点的人数
timeline = []
for img in images:
# 获取该图片的事件
cursor = conn.execute(
"SELECT event_type, description FROM events WHERE image_id = ?",
(img['id'],)
)
events = cursor.fetchall()
# 计算人数 - 从事件描述中提取
person_count = 0
# 方法1: 从"共 X 人"格式提取
for event in events:
desc = event['description']
if '共 ' in desc and ' 人' in desc:
try:
count_str = desc.split('共 ')[1].split(' 人')[0]
person_count = int(count_str)
break
except:
pass
# 方法2: 从"当前 X 人"格式提取
if person_count == 0:
for event in events:
desc = event['description']
if '当前 ' in desc and ' 人' in desc:
try:
count_str = desc.split('当前 ')[1].split(' 人')[0]
person_count = int(count_str)
break
except:
pass
# 方法3: 从"当前剩 X 人"格式提取
if person_count == 0:
for event in events:
desc = event['description']
if '当前剩 ' in desc and ' 人' in desc:
try:
count_str = desc.split('当前剩 ')[1].split(' 人')[0]
person_count = int(count_str)
break
except:
pass
# 方法4: 从人物活动事件推断
if person_count == 0:
person_events = [e for e in events if '人物活动' in e['event_type'] or '人员进出' in e['event_type']]
if person_events:
# 统计 #1, #2, #3 等序号
max_index = 0
for e in person_events:
desc = e['description']
# 提取所有 #数字 格式
import re
matches = re.findall(r'#(\d+)', desc)
if matches:
max_index = max(max_index, max([int(m) for m in matches]))
if max_index > 0:
person_count = max_index
timeline.append({
'time': img['timestamp'],
'image_id': img['id'],
'person_count': person_count,
'analyzed': img['analyzed']
})
# 统计汇总
total_images = len(images)
cursor = conn.execute(
"SELECT COUNT(*) FROM events WHERE image_id IN (SELECT id FROM images WHERE date_folder = ?)",
(date,)
)
total_events = cursor.fetchone()[0]
# 事件类型统计
cursor = conn.execute(
"""SELECT event_type, COUNT(*) as count
FROM events WHERE image_id IN
(SELECT id FROM images WHERE date_folder = ?)
GROUP BY event_type""",
(date,)
)
event_types = [{'type': row[0], 'count': row[1]} for row in cursor.fetchall()]
conn.close()
return {
'date': date,
'timeline': timeline,
'total_images': total_images,
'total_events': total_events,
'event_types': event_types
}
@app.get("/api/stats/history")
async def get_history_stats(days: int = 7):
"""获取历史统计(最近N天)"""
from datetime import datetime, timedelta
conn = db._get_conn()
history = []
for i in range(days):
date = (datetime.now() - timedelta(days=i)).strftime('%Y-%m-%d')
# 统计当天的数据
total_images = conn.execute(
"SELECT COUNT(*) FROM images WHERE date_folder = ?",
(date,)
).fetchone()[0]
total_events = conn.execute(
"SELECT COUNT(*) FROM events WHERE image_id IN (SELECT id FROM images WHERE date_folder = ?)",
(date,)
).fetchone()[0]
# 人数变化统计
cursor = conn.execute(
"""SELECT e.event_type, COUNT(*) as count
FROM events e JOIN images i ON e.image_id = i.id
WHERE i.date_folder = ? AND e.event_type LIKE '%人员进出%'
GROUP BY e.event_type""",
(date,)
)
person_changes = sum(row[1] for row in cursor.fetchall())
history.append({
'date': date,
'total_images': total_images,
'total_events': total_events,
'person_changes': person_changes
})
conn.close()
return {'history': history, 'days': days}
# ============== 图片 API ==============
@app.get("/api/images")
async def get_images(
limit: int = 50,
offset: int = 0,
analyzed_only: bool = False,
date_folder: str = None
):
"""获取图片列表"""
# 使用配置中的显示数量
config_limit = config_mgr.get('display_limit', 20)
if limit > config_limit:
limit = config_limit
images = db.get_images(limit=limit, offset=offset, analyzed_only=analyzed_only, date_folder=date_folder)
# 为每张图片添加事件摘要
for img in images:
events = db.get_events_by_image(img['id'])
img['events_summary'] = ', '.join([e['event_type'] for e in events]) if events else '无'
img['events_count'] = len(events)
return {"images": images, "total": len(images)}
@app.get("/api/images/{image_id}")
async def get_image(image_id: int):
"""获取单个图片详情"""
image = db.get_image_by_id(image_id)
if not image:
raise HTTPException(status_code=404, detail="图片不存在")
events = db.get_events_by_image(image_id)
image['events'] = events
return image
@app.delete("/api/images/{image_id}")
async def delete_image(image_id: int):
"""删除图片"""
success = db.delete_image(image_id)
if not success:
raise HTTPException(status_code=404, detail="图片不存在")
return {"success": True}
@app.get("/api/date-folders")
async def get_date_folders():
"""获取日期文件夹列表"""
stats = db.get_stats()
return {"folders": stats['date_folders']}
# ============== 事件 API ==============
@app.get("/api/events")
async def get_events(limit: int = 50, offset: int = 0, event_type: str = None):
"""获取事件列表"""
config_limit = config_mgr.get('display_limit', 20)
if limit > config_limit:
limit = config_limit
events = db.get_events(limit=limit, offset=offset, event_type=event_type)
return {"events": events, "total": len(events)}
@app.get("/api/stats")
async def get_stats():
"""获取统计信息"""
return db.get_stats()
# ============== 启动 ==============
def run_web():
"""启动 Web 服务"""
import uvicorn
import signal
import sys
config = uvicorn.Config(app, host=WEB_HOST, port=WEB_PORT, log_level="info")
server = uvicorn.Server(config)
def signal_handler(sig, frame):
print("\nShutting down...")
server.should_exit = True
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
server.run()
if __name__ == "__main__":
run_web()