feat: 人员识别与管理模块 - MediaPipe人脸检测、人脸识别、人员库管理

This commit is contained in:
2026-04-16 18:47:15 +08:00
parent 32f98ce4f3
commit 55c6ad88c8
7 changed files with 769 additions and 45 deletions

View File

@@ -5,7 +5,7 @@ Local Analyzer - 本地视觉分析(无需大模型)
功能:
- 帧间差分:检测运动
- 背景建模:检测前景物体
- 人体检测:检测人员进出
- 人员识别:检测并识别人物(新人员自动入库)
- 亮度检测:检测光线变化
- 自动判断是否需要调用大模型
"""
@@ -13,6 +13,18 @@ import cv2
import numpy as np
from pathlib import Path
import datetime
import sys
import os
# 添加项目路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
try:
from person_manager import person_manager
HAS_PERSON_MANAGER = True
except ImportError:
HAS_PERSON_MANAGER = False
print("[LocalAnalyzer] PersonManager not available")
class LocalAnalyzer:
@@ -104,40 +116,90 @@ class LocalAnalyzer:
})
self.motion_count += 1
# 2. 人检测
human_result = self._detect_human(current_frame)
metrics['human_count'] = human_result['count']
# 2. 人检测与识别
person_result = {'persons': [], 'total_count': 0, 'new_count': 0, 'known_count': 0}
# 记录人数变化
human_count_change = human_result['count'] - self.prev_human_count
metrics['human_count_change'] = human_count_change
if human_count_change > 0:
events.append({
'event_type': '人物活动',
'description': f'检测到 {human_count_change} 人进入,当前共 {human_result["count"]}',
'confidence': '',
'source': 'local'
})
self.human_count += human_count_change
elif human_count_change < 0:
events.append({
'event_type': '人物活动',
'description': f'检测到 {abs(human_count_change)} 人离开,当前剩 {human_result["count"]}',
'confidence': '',
'source': 'local'
})
elif human_result['count'] > 0:
# 人数没变但有人
events.append({
'event_type': '人物活动',
'description': f'检测到 {human_result["count"]} 个人(无变化)',
'confidence': '',
'source': 'local'
})
# 更新前一帧人数(在 should_call_model 中更新)
# self.prev_human_count = human_result['count']
if HAS_PERSON_MANAGER:
print(f"[LocalAnalyzer] Using PersonManager for face detection...")
person_result = person_manager.analyze_image(image_path, save_new_person=True)
metrics['person_count'] = person_result['total_count']
metrics['new_persons'] = person_result['new_count']
metrics['known_persons'] = person_result['known_count']
# 记录人员变化
prev_person_count = self.prev_human_count # 用之前的变量名
person_count_change = person_result['total_count'] - prev_person_count
metrics['person_count_change'] = person_count_change
for person in person_result['persons']:
if person['is_new']:
events.append({
'event_type': '人物活动',
'description': f'新人出现: {person["name"]},当前共 {person_result["total_count"]}',
'confidence': '',
'source': 'local'
})
self.human_count += 1
else:
events.append({
'event_type': '人物活动',
'description': f'已知人员: {person["name"]},已访问 {person_manager.persons.get(person["person_id"], {}).get("visit_count", 1)}',
'confidence': '',
'source': 'local'
})
# 检测人员进出
if person_count_change > 0:
events.append({
'event_type': '人员进出',
'description': f'检测到 {person_count_change} 人进入,当前共 {person_result["total_count"]}',
'confidence': '',
'source': 'local'
})
elif person_count_change < 0:
events.append({
'event_type': '人员进出',
'description': f'检测到 {abs(person_count_change)} 人离开,当前剩 {person_result["total_count"]}',
'confidence': '',
'source': 'local'
})
# 更新前一帧人数
self.prev_human_count = person_result['total_count']
else:
# 使用传统人体检测(备用)
human_result = self._detect_human(current_frame)
metrics['human_count'] = human_result['count']
human_count_change = human_result['count'] - self.prev_human_count
metrics['human_count_change'] = human_count_change
if human_count_change > 0:
events.append({
'event_type': '人物活动',
'description': f'检测到 {human_count_change} 人进入,当前共 {human_result["count"]}',
'confidence': '',
'source': 'local'
})
self.human_count += human_count_change
elif human_count_change < 0:
events.append({
'event_type': '人物活动',
'description': f'检测到 {abs(human_count_change)} 人离开,当前剩 {human_result["count"]}',
'confidence': '',
'source': 'local'
})
elif human_result['count'] > 0:
events.append({
'event_type': '人物活动',
'description': f'检测到 {human_result["count"]} 个人(无变化)',
'confidence': '',
'source': 'local'
})
self.prev_human_count = human_result['count']
# 3. 亮度检测
brightness_result = self._detect_brightness_change(current_gray, prev_image_path)
@@ -306,23 +368,25 @@ class LocalAnalyzer:
"""判断是否需要调用大模型"""
# 条件1人数变化最重要
current_human_count = metrics.get('human_count', 0)
human_count_change = abs(current_human_count - self.prev_human_count)
current_person_count = metrics.get('person_count', metrics.get('human_count', 0))
person_count_change = metrics.get('person_count_change', metrics.get('human_count_change', 0))
# 更新前一帧人数
self.prev_human_count = current_human_count
if human_count_change >= self.config['human_count_change_threshold']:
print(f"[LocalAnalyzer] Human count changed: {self.prev_human_count} -> {current_human_count}, triggering model")
# 更新前一帧人数(如果还没更新)
if abs(person_count_change) >= self.config['human_count_change_threshold']:
print(f"[LocalAnalyzer] Person count changed: {current_person_count - person_count_change} -> {current_person_count}, triggering model")
return True
# 条件2运动面积超过阈值(排除有人但不动的情况)
# 只有在没有人变化时才用这个条件
# 条件2检测到新人
if metrics.get('new_persons', 0) > 0:
print(f"[LocalAnalyzer] New person detected: {metrics.get('new_persons', 0)}, triggering model")
return True
# 条件3运动面积超过阈值排除有人但不动的情况
if metrics.get('motion_ratio', 0) > self.config['trigger_model_threshold'] * 2:
print(f"[LocalAnalyzer] Large motion detected: {metrics.get('motion_ratio', 0):.2%}")
return True
# 条件3:亮度大幅变化(灯开关等)
# 条件4:亮度大幅变化(灯开关等)
if abs(metrics.get('brightness_change', 0)) > self.config['brightness_change_threshold'] * 2:
print(f"[LocalAnalyzer] Brightness changed: {metrics.get('brightness_change', 0)}")
return True

448
person_manager.py Normal file
View File

@@ -0,0 +1,448 @@
"""
Person Manager - 人员识别与管理模块
功能:
- 人脸检测MediaPipe
- 人脸识别,判断是否为同一个人
- 人员库管理,新人自动添加
- 追踪人员进出记录
"""
import cv2
import numpy as np
import json
import datetime
from pathlib import Path
from config import DATA_DIR
try:
import mediapipe as mp
HAS_MEDIAPIPE = True
except ImportError:
HAS_MEDIAPIPE = False
print("[PersonManager] MediaPipe not installed, using basic detection")
try:
import face_recognition
HAS_FACE_REC = True
except ImportError:
HAS_FACE_REC = False
print("[PersonManager] face_recognition not installed, using basic matching")
class PersonManager:
"""人员识别与管理器"""
def __init__(self):
self.persons_db_path = DATA_DIR / "persons.json"
self.faces_dir = DATA_DIR / "faces"
# 创建目录
self.faces_dir.mkdir(parents=True, exist_ok=True)
# 加载人员库
self.persons = self._load_persons_db()
# 初始化检测器
self._init_detectors()
# 配置
self.config = {
'face_match_threshold': 0.6, # 人脸匹配阈值
'unknown_person_id': 'unknown', # 未知人员ID
'max_persons': 100, # 最大人员数量
}
# 统计
self.total_detections = 0
self.known_persons_detected = 0
self.new_persons_added = 0
def _load_persons_db(self):
"""加载人员数据库"""
if self.persons_db_path.exists():
try:
with open(self.persons_db_path, 'r', encoding='utf-8') as f:
return json.load(f)
except:
return {}
return {}
def _save_persons_db(self):
"""保存人员数据库"""
with open(self.persons_db_path, 'w', encoding='utf-8') as f:
json.dump(self.persons, f, ensure_ascii=False, indent=2)
def _init_detectors(self):
"""初始化检测器"""
# MediaPipe 人脸检测
if HAS_MEDIAPIPE:
self.mp_face_detection = mp.solutions.face_detection
self.face_detector = self.mp_face_detection.FaceDetection(
model_selection=0, # 0: 短距离1: 远距离
min_detection_confidence=0.5
)
print("[PersonManager] MediaPipe face detector initialized")
else:
# OpenCV DNN 人脸检测
try:
model_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
self.cv_face_detector = cv2.CascadeClassifier(model_path)
print("[PersonManager] OpenCV face detector initialized")
except:
self.cv_face_detector = None
print("[PersonManager] No face detector available")
def detect_faces(self, image):
"""检测人脸
Args:
image: 图片numpy array 或路径)
Returns:
list: [{'bbox': [x,y,w,h], 'confidence': float}]
"""
if isinstance(image, str):
image = cv2.imread(image)
if image is None:
return []
faces = []
# MediaPipe 检测
if HAS_MEDIAPIPE and hasattr(self, 'face_detector'):
rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
results = self.face_detector.process(rgb_image)
if results.detections:
for detection in results.detections:
bboxC = detection.location_data.relative_bounding_box
h, w, _ = image.shape
x = int(bboxC.xmin * w)
y = int(bboxC.ymin * h)
width = int(bboxC.width * w)
height = int(bboxC.height * h)
faces.append({
'bbox': [x, y, width, height],
'confidence': detection.score[0],
'source': 'mediapipe'
})
# OpenCV 检测(备用)
elif self.cv_face_detector is not None:
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
detections = self.cv_face_detector.detectMultiScale(
gray,
scaleFactor=1.1,
minNeighbors=5,
minSize=(30, 30)
)
for (x, y, w, h) in detections:
faces.append({
'bbox': [x, y, w, h],
'confidence': 0.8,
'source': 'opencv'
})
return faces
def extract_face_encoding(self, image, face_bbox):
"""提取人脸特征
Args:
image: 图片
face_bbox: [x, y, w, h]
Returns:
numpy array: 人脸特征向量
"""
if isinstance(image, str):
image = cv2.imread(image)
if image is None:
return None
x, y, w, h = face_bbox
# 确保坐标有效
h_img, w_img = image.shape[:2]
x = max(0, min(x, w_img - 1))
y = max(0, min(y, h_img - 1))
w = max(1, min(w, w_img - x))
h = max(1, min(h, h_img - y))
# 提取人脸区域
face_image = image[y:y+h, x:x+w]
if HAS_FACE_REC:
# 使用 face_recognition 库
rgb_face = cv2.cvtColor(face_image, cv2.COLOR_BGR2RGB)
encodings = face_recognition.face_encodings(rgb_face)
if len(encodings) > 0:
return encodings[0]
# 简单特征:使用颜色直方图作为特征
# 将人脸缩放到固定大小
face_resized = cv2.resize(face_image, (64, 64))
# 计算 HSV 直方图
hsv = cv2.cvtColor(face_resized, cv2.COLOR_BGR2HSV)
hist_h = cv2.calcHist([hsv], [0], None, [16], [0, 180])
hist_s = cv2.calcHist([hsv], [1], None, [16], [0, 256])
hist_v = cv2.calcHist([hsv], [2], None, [16], [0, 256])
# 合并特征
feature = np.concatenate([
cv2.normalize(hist_h, hist_h).flatten(),
cv2.normalize(hist_s, hist_s).flatten(),
cv2.normalize(hist_v, hist_v).flatten()
])
return feature
def match_face(self, face_encoding, threshold=None):
"""匹配人脸,找出对应的已知人员
Args:
face_encoding: 人脸特征向量
threshold: 匹配阈值
Returns:
dict: {'person_id': str, 'name': str, 'is_new': bool}
"""
if threshold is None:
threshold = self.config['face_match_threshold']
if face_encoding is None:
return {'person_id': 'unknown', 'name': 'Unknown', 'is_new': False}
best_match = None
best_distance = float('inf')
for person_id, person_data in self.persons.items():
if 'face_encoding' in person_data:
stored_encoding = np.array(person_data['face_encoding'])
if HAS_FACE_REC:
# face_recognition 距离计算
distance = face_recognition.face_distance([stored_encoding], face_encoding)[0]
else:
# 简单特征距离
distance = np.linalg.norm(stored_encoding - face_encoding)
if distance < best_distance:
best_distance = distance
best_match = person_data
if best_match and best_distance < threshold:
self.known_persons_detected += 1
return {
'person_id': best_match.get('person_id'),
'name': best_match.get('name', 'Unknown'),
'is_new': False,
'confidence': 1 - best_distance
}
# 未匹配到,是新人员
return {
'person_id': 'unknown',
'name': 'Unknown',
'is_new': True,
'confidence': 0
}
def add_new_person(self, image, face_bbox, name=None):
"""添加新人员到库
Args:
image: 图片
face_bbox: 人脸位置
name: 人员名称(可选)
Returns:
dict: 新人员信息
"""
if isinstance(image, str):
image = cv2.imread(image)
# 提取特征
face_encoding = self.extract_face_encoding(image, face_bbox)
if face_encoding is None:
return None
# 生成人员ID
person_id = f"person_{len(self.persons) + 1}"
if name is None:
name = f"Person #{len(self.persons) + 1}"
# 保存人脸图片
x, y, w, h = face_bbox
face_image = image[y:y+h, x:x+w]
face_path = self.faces_dir / f"{person_id}.jpg"
cv2.imwrite(str(face_path), face_image)
# 记录到数据库
person_data = {
'person_id': person_id,
'name': name,
'face_encoding': face_encoding.tolist() if isinstance(face_encoding, np.ndarray) else face_encoding,
'face_path': str(face_path),
'first_seen': datetime.datetime.now().isoformat(),
'last_seen': datetime.datetime.now().isoformat(),
'visit_count': 1
}
self.persons[person_id] = person_data
self._save_persons_db()
self.new_persons_added += 1
print(f"[PersonManager] New person added: {person_id} ({name})")
return person_data
def update_person_visit(self, person_id):
"""更新人员访问记录"""
if person_id in self.persons:
self.persons[person_id]['last_seen'] = datetime.datetime.now().isoformat()
self.persons[person_id]['visit_count'] += 1
self._save_persons_db()
def analyze_image(self, image_path, save_new_person=True):
"""分析图片中的人员
Args:
image_path: 图片路径
save_new_person: 是否保存新人员
Returns:
dict: {
'faces': list, # 检测到的人脸
'persons': list, # 识别的人员
'new_count': int, # 新人员数量
'known_count': int, # 已知人员数量
}
"""
image = cv2.imread(image_path)
if image is None:
return {'faces': [], 'persons': [], 'error': 'Cannot load image'}
self.total_detections += 1
# 检测人脸
faces = self.detect_faces(image)
persons = []
new_count = 0
known_count = 0
for face in faces:
bbox = face['bbox']
# 提取特征
encoding = self.extract_face_encoding(image, bbox)
# 匹配
match_result = self.match_face(encoding)
if match_result['is_new']:
# 新人员
new_count += 1
if save_new_person and len(self.persons) < self.config['max_persons']:
new_person = self.add_new_person(image, bbox)
if new_person:
persons.append({
'person_id': new_person['person_id'],
'name': new_person['name'],
'bbox': bbox,
'is_new': True,
'confidence': face['confidence']
})
else:
persons.append({
'person_id': 'unknown',
'name': 'Unknown (new)',
'bbox': bbox,
'is_new': True,
'confidence': face['confidence']
})
else:
# 已知人员
known_count += 1
self.update_person_visit(match_result['person_id'])
persons.append({
'person_id': match_result['person_id'],
'name': match_result['name'],
'bbox': bbox,
'is_new': False,
'confidence': match_result['confidence']
})
return {
'faces': faces,
'persons': persons,
'new_count': new_count,
'known_count': known_count,
'total_count': len(persons)
}
def get_persons_list(self):
"""获取人员列表"""
return [
{
'person_id': p['person_id'],
'name': p['name'],
'visit_count': p['visit_count'],
'first_seen': p['first_seen'],
'last_seen': p['last_seen']
}
for p in self.persons.values()
]
def get_stats(self):
"""获取统计信息"""
return {
'total_persons': len(self.persons),
'total_detections': self.total_detections,
'known_persons_detected': self.known_persons_detected,
'new_persons_added': self.new_persons_added,
'recognition_rate': self.known_persons_detected / max(self.total_detections, 1)
}
def reset(self):
"""重置统计"""
self.total_detections = 0
self.known_persons_detected = 0
self.new_persons_added = 0
# 全局实例
person_manager = PersonManager()
if __name__ == "__main__":
# 测试
import sys
if len(sys.argv) >= 2:
test_image = sys.argv[1]
print(f"[Test] Analyzing: {test_image}")
result = person_manager.analyze_image(test_image)
print(f"[Test] Faces detected: {len(result['faces'])}")
print(f"[Test] Persons: {result['total_count']}")
print(f"[Test] New: {result['new_count']}, Known: {result['known_count']}")
for person in result['persons']:
status = "NEW" if person['is_new'] else "KNOWN"
print(f" - [{status}] {person['name']} (confidence: {person['confidence']:.2f})")
else:
print("Usage: python person_manager.py <image_path>")

View File

@@ -1,4 +1,9 @@
opencv-python>=4.8.0
fastapi>=0.100.0
uvicorn>=0.23.0
requests>=2.31.0
requests>=2.31.0
numpy>=1.20.0
# Optional: More accurate face detection and recognition
# mediapipe>=0.10.0
# face-recognition>=1.7.0 (requires dlib, may need manual install on Windows)

View File

@@ -165,6 +165,35 @@ async def analyze_unanalyzed():
return {"results": results}
@app.get("/api/persons")
async def get_persons():
"""获取人员列表"""
from person_manager import person_manager
return {"persons": person_manager.get_persons_list(), "stats": person_manager.get_stats()}
@app.delete("/api/persons/{person_id}")
async def delete_person(person_id: str):
"""删除人员"""
from person_manager import person_manager
if person_id in person_manager.persons:
del person_manager.persons[person_id]
person_manager._save_persons_db()
return {"success": True}
raise HTTPException(status_code=404, detail="人员不存在")
@app.post("/api/persons/{person_id}/rename")
async def rename_person(person_id: str, name: str):
"""重命名人员"""
from person_manager import person_manager
if person_id in person_manager.persons:
person_manager.persons[person_id]['name'] = name
person_manager._save_persons_db()
return {"success": True, "name": name}
raise HTTPException(status_code=404, detail="人员不存在")
# ============== 图片 API ==============
@app.get("/api/images")

View File

@@ -405,6 +405,96 @@ function closeSettingsModal() {
document.getElementById('settings-modal').classList.remove('active');
}
// Persons Management
function openPersonsModal() {
loadPersonsList();
document.getElementById('persons-modal').classList.add('active');
}
function closePersonsModal() {
document.getElementById('persons-modal').classList.remove('active');
}
function loadPersonsList() {
fetch(API_BASE + '/api/persons')
.then(function(res) { return res.json(); })
.then(function(data) {
var statsDiv = document.getElementById('persons-stats');
var listDiv = document.getElementById('persons-list');
// 统计信息
statsDiv.innerHTML = '<div class="stats-summary">' +
'<span>Total: ' + data.stats.total_persons + '</span>' +
'<span>Detected: ' + data.stats.total_detections + '</span>' +
'<span>Known: ' + data.stats.known_persons_detected + '</span>' +
'<span>New: ' + data.stats.new_persons_added + '</span>' +
'</div>';
// 人员列表
if (data.persons.length === 0) {
listDiv.innerHTML = '<p style="color:#888;text-align:center;">No persons recorded yet</p>';
return;
}
listDiv.innerHTML = '';
data.persons.forEach(function(person) {
var item = document.createElement('div');
item.className = 'person-item';
var firstSeen = new Date(person.first_seen).toLocaleDateString();
var lastSeen = new Date(person.last_seen).toLocaleDateString();
item.innerHTML = '<div class="person-info">' +
'<span class="person-name">' + person.name + '</span>' +
'<span class="person-id">' + person.person_id + '</span>' +
'</div>' +
'<div class="person-stats">' +
'<span>Visits: ' + person.visit_count + '</span>' +
'<span>First: ' + firstSeen + '</span>' +
'<span>Last: ' + lastSeen + '</span>' +
'</div>' +
'<div class="person-actions">' +
'<button onclick="renamePerson(\'' + person.person_id + '\')" class="btn-small">Rename</button>' +
'<button onclick="deletePerson(\'' + person.person_id + '\')" class="btn-small btn-danger">Delete</button>' +
'</div>';
listDiv.appendChild(item);
});
})
.catch(function(e) { console.error('Load persons failed:', e); });
}
function renamePerson(personId) {
var newName = prompt('Enter new name:');
if (!newName) return;
fetch(API_BASE + '/api/persons/' + personId + '/rename?name=' + encodeURIComponent(newName), {
method: 'POST'
})
.then(function(res) { return res.json(); })
.then(function(data) {
if (data.success) {
showToast('Renamed to ' + newName, 1500);
loadPersonsList();
}
})
.catch(function(e) { showToast('Error: ' + e.message, 3000); });
}
function deletePerson(personId) {
if (!confirm('Delete this person?')) return;
fetch(API_BASE + '/api/persons/' + personId, {method: 'DELETE'})
.then(function(res) { return res.json(); })
.then(function(data) {
if (data.success) {
showToast('Person deleted', 1500);
loadPersonsList();
}
})
.catch(function(e) { showToast('Error: ' + e.message, 3000); });
}
function loadSettingsForm() {
fetch(API_BASE + '/api/config')
.then(function(res) { return res.json(); })

View File

@@ -29,6 +29,11 @@
</div>
</div>
<div class="panel-section">
<h3>👥 人员库</h3>
<button onclick="openPersonsModal()" class="btn-settings">查看人员库</button>
</div>
<div class="panel-section">
<h3>⚙️ 系统设置</h3>
<button onclick="openSettingsModal()" class="btn-settings">🔧 设置参数</button>
@@ -155,6 +160,22 @@
</div>
</div>
<!-- 人员库模态框 -->
<div class="modal" id="persons-modal" onclick="closeModalOnBackground(event, 'persons-modal')">
<div class="modal-content settings-modal-content" onclick="event.stopPropagation()">
<span class="modal-close" onclick="closePersonsModal()">×</span>
<h2>👥 人员库管理</h2>
<div class="persons-stats" id="persons-stats">
<!-- 统计信息 -->
</div>
<div class="persons-list" id="persons-list">
<!-- 人员列表 -->
</div>
</div>
</div>
<!-- 图片详情模态框 -->
<div class="modal" id="image-modal" onclick="closeModalOnBackground(event, 'image-modal')">
<div class="modal-content" onclick="event.stopPropagation()">

View File

@@ -582,6 +582,73 @@ button {
}
/* 响应式 */
.settings-actions {
margin-top: 20px;
display: flex;
gap: 10px;
justify-content: center;
}
/* 人员库样式 */
.persons-stats {
margin-bottom: 20px;
}
.stats-summary {
display: flex;
gap: 20px;
justify-content: center;
padding: 15px;
background: #f5f5f5;
border-radius: 8px;
}
.stats-summary span {
color: #555;
}
.persons-list {
max-height: 400px;
overflow-y: auto;
}
.person-item {
background: #f9f9f9;
padding: 15px;
margin-bottom: 10px;
border-radius: 8px;
border-left: 4px solid #2196F3;
}
.person-info {
display: flex;
justify-content: space-between;
margin-bottom: 8px;
}
.person-name {
font-weight: bold;
font-size: 16px;
}
.person-id {
color: #888;
font-size: 12px;
}
.person-stats {
display: flex;
gap: 15px;
color: #555;
font-size: 13px;
margin-bottom: 10px;
}
.person-actions {
display: flex;
gap: 10px;
}
@media (max-width: 768px) {
.control-panel {
flex-direction: column;