Files
vision-record/person_manager.py

603 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Person Manager - 人员识别与管理模块
功能:
- 人脸检测MediaPipe
- 人脸识别,判断是否为同一个人
- 人员库管理,新人自动添加
- 追踪人员进出记录
"""
import cv2
import numpy as np
import json
import datetime
from pathlib import Path
from config import DATA_DIR
try:
import mediapipe as mp
HAS_MEDIAPIPE = True
except ImportError:
HAS_MEDIAPIPE = False
print("[PersonManager] MediaPipe not installed, using basic detection")
try:
import face_recognition
HAS_FACE_REC = True
except ImportError:
HAS_FACE_REC = False
print("[PersonManager] face_recognition not installed, using basic matching")
class PersonManager:
"""人员识别与管理器"""
def __init__(self):
self.persons_db_path = DATA_DIR / "persons.json"
self.faces_dir = DATA_DIR / "faces"
# 创建目录
self.faces_dir.mkdir(parents=True, exist_ok=True)
# 加载人员库
self.persons = self._load_persons_db()
# 初始化检测器状态
self.face_detector = None
self.mp_face_detection = None
self.cv_face_detector = None
self.has_mediapipe = HAS_MEDIAPIPE
# 从配置读取参数
try:
from config import config_mgr
self.config['mediapipe_min_confidence'] = config_mgr.get('min_detection_confidence', 0.3)
self.config['confirm_frames'] = config_mgr.get('confirm_frames', 3)
except:
pass
# 初始化检测器
self._init_detectors()
# 配置
self.config = {
'face_match_threshold': 0.6, # 人脸匹配阈值
'unknown_person_id': 'unknown', # 未知人员ID
'max_persons': 100, # 最大人员数量
# 方案1: 参数调整
'mediapipe_min_confidence': 0.3, # 降低阈值,更容易检测
'mediapipe_model_selection': 1, # 1: 远距离模型
'haar_scale_factor': 1.05, # Haar更细粒度
'haar_min_neighbors': 2, # 降低邻居要求
# 方案2: 连续性判断
'confirm_frames': 3, # 连续几帧确认
'leave_frames': 2, # 连续几帧消失才算离开
}
# 方案2: 追踪状态(连续判断)
self.tracked_persons = {} # {person_id: {'frames': count, 'confirmed': bool}}
self.prev_persons = [] # 前一帧检测到的人
self.confirmation_buffer = {} # 确认缓冲区
# 统计
self.total_detections = 0
self.known_persons_detected = 0
self.new_persons_added = 0
def _load_persons_db(self):
"""加载人员数据库"""
if self.persons_db_path.exists():
try:
with open(self.persons_db_path, 'r', encoding='utf-8') as f:
return json.load(f)
except:
return {}
return {}
def _save_persons_db(self):
"""保存人员数据库"""
with open(self.persons_db_path, 'w', encoding='utf-8') as f:
json.dump(self.persons, f, ensure_ascii=False, indent=2)
def _init_detectors(self):
"""初始化检测器"""
# MediaPipe 人脸检测方案1: 参数调整)
if self.has_mediapipe:
try:
mp_face_detection = mp.solutions.face_detection
self.face_detector = mp_face_detection.FaceDetection(
model_selection=self.config['mediapipe_model_selection'], # 远距离模型
min_detection_confidence=self.config['mediapipe_min_confidence'] # 降低阈值
)
self.mp_face_detection = mp_face_detection
print(f"[PersonManager] MediaPipe initialized (model={self.config['mediapipe_model_selection']}, conf={self.config['mediapipe_min_confidence']})")
except Exception as e:
print(f"[PersonManager] MediaPipe init failed: {e}")
self.face_detector = None
self.has_mediapipe = False
# OpenCV 人脸检测(备用)
try:
model_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
if Path(model_path).exists():
self.cv_face_detector = cv2.CascadeClassifier(model_path)
print("[PersonManager] OpenCV Haar Cascade initialized (backup)")
except Exception as e:
self.cv_face_detector = None
print(f"[PersonManager] OpenCV detector init failed: {e}")
# 方案3: YOLO 检测(更准确)
self.yolo_detector = None
try:
from ultralytics import YOLO
# 使用轻量级 nano 模型
self.yolo_detector = YOLO('yolov8n.pt') # nano 模型,快速
print("[PersonManager] YOLOv8nano initialized (most accurate)")
except ImportError:
print("[PersonManager] YOLO not installed. Install with: pip install ultralytics")
except Exception as e:
print(f"[PersonManager] YOLO init failed: {e}")
def detect_faces(self, image):
"""检测人脸(优先使用 YOLO其次 MediaPipe最后 Haar
Args:
image: 图片numpy array 或路径)
Returns:
list: [{'bbox': [x,y,w,h], 'confidence': float}]
"""
if isinstance(image, str):
image = cv2.imread(image)
if image is None:
return []
faces = []
# 方案3: YOLO 检测(优先,最准确)
if self.yolo_detector is not None:
try:
results = self.yolo_detector(image, classes=[0], verbose=False) # class 0 = person
for r in results:
for box in r.boxes:
x1, y1, x2, y2 = box.xyxy[0].tolist()
conf = box.conf[0].item()
# 转换为 [x, y, w, h] 格式
faces.append({
'bbox': [int(x1), int(y1), int(x2-x1), int(y2-y1)],
'confidence': conf,
'source': 'yolo'
})
if faces:
print(f"[PersonManager] YOLO detected {len(faces)} persons")
return faces # YOLO 检测成功,直接返回
except Exception as e:
print(f"[PersonManager] YOLO detection failed: {e}")
# 方案1+2: MediaPipe 检测
if self.has_mediapipe and self.face_detector is not None:
rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
results = self.face_detector.process(rgb_image)
if results.detections:
for detection in results.detections:
bboxC = detection.location_data.relative_bounding_box
h, w, _ = image.shape
x = int(bboxC.xmin * w)
y = int(bboxC.ymin * h)
width = int(bboxC.width * w)
height = int(bboxC.height * h)
faces.append({
'bbox': [x, y, width, height],
'confidence': detection.score[0],
'source': 'mediapipe'
})
if faces:
return faces
# 备用: OpenCV Haar 检测
if self.cv_face_detector is not None:
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
detections = self.cv_face_detector.detectMultiScale(
gray,
scaleFactor=self.config['haar_scale_factor'],
minNeighbors=self.config['haar_min_neighbors'],
minSize=(30, 30)
)
for (x, y, w, h) in detections:
faces.append({
'bbox': [x, y, w, h],
'confidence': 0.8,
'source': 'opencv'
})
return faces
def extract_face_encoding(self, image, face_bbox):
"""提取人脸特征(用于识别是否为同一个人)
使用 MediaPipe 的人脸关键点作为特征,不依赖 dlib
Args:
image: 图片
face_bbox: [x, y, w, h]
Returns:
numpy array: 人脸特征向量
"""
if isinstance(image, str):
image = cv2.imread(image)
if image is None:
return None
x, y, w, h = face_bbox
# 确保坐标有效
h_img, w_img = image.shape[:2]
x = max(0, min(x, w_img - 1))
y = max(0, min(y, h_img - 1))
w = max(1, min(w, w_img - x))
h = max(1, min(h, h_img - y))
# 提取人脸区域
face_image = image[y:y+h, x:x+w]
# 方法1使用 face_recognition如果安装了
if HAS_FACE_REC:
try:
rgb_face = cv2.cvtColor(face_image, cv2.COLOR_BGR2RGB)
encodings = face_recognition.face_encodings(rgb_face)
if len(encodings) > 0:
return encodings[0]
except:
pass
# 方法2使用 MediaPipe 人脸关键点(推荐)
if self.has_mediapipe:
try:
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(static_image_mode=True, max_num_faces=1)
rgb_face = cv2.cvtColor(face_image, cv2.COLOR_BGR2RGB)
results = face_mesh.process(rgb_face)
if results.multi_face_landmarks:
# 提取关键点坐标作为特征
landmarks = results.multi_face_landmarks[0]
features = []
for landmark in landmarks.landmark:
features.extend([landmark.x, landmark.y, landmark.z])
face_mesh.close()
return np.array(features)
except:
pass
# 方法3使用颜色直方图最简单备用
face_resized = cv2.resize(face_image, (64, 64))
hsv = cv2.cvtColor(face_resized, cv2.COLOR_BGR2HSV)
hist_h = cv2.calcHist([hsv], [0], None, [16], [0, 180])
hist_s = cv2.calcHist([hsv], [1], None, [16], [0, 256])
hist_v = cv2.calcHist([hsv], [2], None, [16], [0, 256])
feature = np.concatenate([
cv2.normalize(hist_h, hist_h).flatten(),
cv2.normalize(hist_s, hist_s).flatten(),
cv2.normalize(hist_v, hist_v).flatten()
])
return feature
def match_face(self, face_encoding, threshold=None):
"""匹配人脸,找出对应的已知人员
Args:
face_encoding: 人脸特征向量
threshold: 匹配阈值
Returns:
dict: {'person_id': str, 'name': str, 'is_new': bool}
"""
if threshold is None:
threshold = self.config['face_match_threshold']
if face_encoding is None:
return {'person_id': 'unknown', 'name': 'Unknown', 'is_new': False}
best_match = None
best_distance = float('inf')
for person_id, person_data in self.persons.items():
if 'face_encoding' in person_data:
stored_encoding = np.array(person_data['face_encoding'])
if HAS_FACE_REC:
# face_recognition 距离计算
distance = face_recognition.face_distance([stored_encoding], face_encoding)[0]
else:
# 简单特征距离
distance = np.linalg.norm(stored_encoding - face_encoding)
if distance < best_distance:
best_distance = distance
best_match = person_data
if best_match and best_distance < threshold:
self.known_persons_detected += 1
return {
'person_id': best_match.get('person_id'),
'name': best_match.get('name', 'Unknown'),
'is_new': False,
'confidence': 1 - best_distance
}
# 未匹配到,是新人员
return {
'person_id': 'unknown',
'name': 'Unknown',
'is_new': True,
'confidence': 0
}
def add_new_person(self, image, face_bbox, name=None):
"""添加新人员到库
Args:
image: 图片
face_bbox: 人脸位置
name: 人员名称(可选)
Returns:
dict: 新人员信息
"""
if isinstance(image, str):
image = cv2.imread(image)
# 提取特征
face_encoding = self.extract_face_encoding(image, face_bbox)
if face_encoding is None:
return None
# 生成人员ID
person_id = f"person_{len(self.persons) + 1}"
if name is None:
name = f"Person #{len(self.persons) + 1}"
# 保存人脸图片
x, y, w, h = face_bbox
face_image = image[y:y+h, x:x+w]
face_path = self.faces_dir / f"{person_id}.jpg"
cv2.imwrite(str(face_path), face_image)
# 记录到数据库
person_data = {
'person_id': person_id,
'name': name,
'face_encoding': face_encoding.tolist() if isinstance(face_encoding, np.ndarray) else face_encoding,
'face_path': str(face_path),
'first_seen': datetime.datetime.now().isoformat(),
'last_seen': datetime.datetime.now().isoformat(),
'visit_count': 1
}
self.persons[person_id] = person_data
self._save_persons_db()
self.new_persons_added += 1
print(f"[PersonManager] New person added: {person_id} ({name})")
return person_data
def update_person_visit(self, person_id):
"""更新人员访问记录"""
if person_id in self.persons:
self.persons[person_id]['last_seen'] = datetime.datetime.now().isoformat()
self.persons[person_id]['visit_count'] += 1
self._save_persons_db()
def analyze_image(self, image_path, save_new_person=True):
"""分析图片中的人员(带连续性判断)
Args:
image_path: 图片路径
save_new_person: 是否保存新人员
Returns:
dict: {
'faces': list, # 检测到的人脸
'persons': list, # 识别的人员
'new_count': int, # 新人员数量
'known_count': int, # 已知人员数量
'confirmed_change': bool, # 是否有确认的人员变化
}
"""
image = cv2.imread(image_path)
if image is None:
return {'faces': [], 'persons': [], 'error': 'Cannot load image'}
self.total_detections += 1
# 检测人脸
faces = self.detect_faces(image)
current_count = len(faces)
# 方案2: 连续性判断
confirmed_change = False
confirmed_persons = []
# 检查人数变化
prev_count = len(self.prev_persons)
if current_count != prev_count:
# 人数变化,记录到缓冲区
key = f"count_{current_count}"
if key not in self.confirmation_buffer:
self.confirmation_buffer[key] = {'count': 0, 'persons': []}
self.confirmation_buffer[key]['count'] += 1
# 临时识别人员
temp_persons = []
for face in faces:
bbox = face['bbox']
encoding = self.extract_face_encoding(image, bbox)
match_result = self.match_face(encoding)
person_info = {
'person_id': match_result['person_id'] if not match_result['is_new'] else 'unknown',
'name': match_result['name'],
'bbox': bbox,
'is_new': match_result['is_new'],
'confidence': face['confidence'],
'source': face['source']
}
temp_persons.append(person_info)
self.confirmation_buffer[key]['persons'] = temp_persons
# 达到确认帧数
if self.confirmation_buffer[key]['count'] >= self.config['confirm_frames']:
confirmed_change = True
confirmed_persons = temp_persons
print(f"[PersonManager] Confirmed: {prev_count} -> {current_count} persons (after {self.config['confirm_frames']} frames)")
# 清空其他缓冲区
self.confirmation_buffer = {}
# 更新前一帧状态
self.prev_persons = temp_persons
else:
# 人数不变,清空变化缓冲区,维持当前状态
if current_count > 0:
# 识别当前人员
temp_persons = []
for face in faces:
bbox = face['bbox']
encoding = self.extract_face_encoding(image, bbox)
match_result = self.match_face(encoding)
person_info = {
'person_id': match_result['person_id'] if not match_result['is_new'] else 'unknown',
'name': match_result['name'],
'bbox': bbox,
'is_new': match_result['is_new'],
'confidence': face['confidence'],
'source': face['source']
}
temp_persons.append(person_info)
confirmed_persons = temp_persons
self.prev_persons = temp_persons
# 清空变化缓冲区
keys_to_remove = [k for k in self.confirmation_buffer.keys() if not k.endswith(f"_{current_count}")]
for k in keys_to_remove:
del self.confirmation_buffer[k]
# 统计新人和已知人员
new_count = 0
known_count = 0
persons_to_save = []
for person in confirmed_persons:
if person['is_new']:
new_count += 1
# 只有确认后才保存新人
if confirmed_change and save_new_person and len(self.persons) < self.config['max_persons']:
# 找到对应的 face bbox
for face in faces:
if face['bbox'] == person['bbox']:
new_person = self.add_new_person(image, face['bbox'])
if new_person:
person['person_id'] = new_person['person_id']
person['name'] = new_person['name']
persons_to_save.append(person)
break
else:
known_count += 1
self.update_person_visit(person['person_id'])
persons_to_save.append(person)
return {
'faces': faces,
'persons': persons_to_save,
'new_count': new_count,
'known_count': known_count,
'total_count': len(persons_to_save),
'confirmed_change': confirmed_change,
'current_count': current_count,
'prev_count': prev_count,
'detection_source': faces[0]['source'] if faces else 'none'
}
def get_persons_list(self):
"""获取人员列表"""
return [
{
'person_id': p['person_id'],
'name': p['name'],
'visit_count': p['visit_count'],
'first_seen': p['first_seen'],
'last_seen': p['last_seen']
}
for p in self.persons.values()
]
def get_stats(self):
"""获取统计信息"""
return {
'total_persons': len(self.persons),
'total_detections': self.total_detections,
'known_persons_detected': self.known_persons_detected,
'new_persons_added': self.new_persons_added,
'recognition_rate': self.known_persons_detected / max(self.total_detections, 1)
}
def reset(self):
"""重置统计"""
self.total_detections = 0
self.known_persons_detected = 0
self.new_persons_added = 0
# 全局实例
person_manager = PersonManager()
if __name__ == "__main__":
# 测试
import sys
if len(sys.argv) >= 2:
test_image = sys.argv[1]
print(f"[Test] Analyzing: {test_image}")
result = person_manager.analyze_image(test_image)
print(f"[Test] Faces detected: {len(result['faces'])}")
print(f"[Test] Persons: {result['total_count']}")
print(f"[Test] New: {result['new_count']}, Known: {result['known_count']}")
for person in result['persons']:
status = "NEW" if person['is_new'] else "KNOWN"
print(f" - [{status}] {person['name']} (confidence: {person['confidence']:.2f})")
else:
print("Usage: python person_manager.py <image_path>")