Files
vision-record/person_manager.py

676 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Person Manager - 人员识别与管理模块
功能:
- 人脸检测MediaPipe
- 人脸识别,判断是否为同一个人
- 人员库管理,新人自动添加
- 追踪人员进出记录
"""
import cv2
import numpy as np
import json
import datetime
from pathlib import Path
from config import DATA_DIR
try:
import mediapipe as mp
HAS_MEDIAPIPE = True
except ImportError:
HAS_MEDIAPIPE = False
print("[PersonManager] MediaPipe not installed, using basic detection")
try:
import face_recognition
HAS_FACE_REC = True
except ImportError:
HAS_FACE_REC = False
print("[PersonManager] face_recognition not installed, using basic matching")
class PersonManager:
"""人员识别与管理器"""
def __init__(self):
self.persons_db_path = DATA_DIR / "persons.json"
self.faces_dir = DATA_DIR / "faces"
# 创建目录
self.faces_dir.mkdir(parents=True, exist_ok=True)
# 加载人员库
self.persons = self._load_persons_db()
# 初始化检测器状态
self.face_detector = None
self.mp_face_detection = None
self.cv_face_detector = None
self.has_mediapipe = HAS_MEDIAPIPE
# 从配置读取参数
try:
from config import config_mgr
self.config['mediapipe_min_confidence'] = config_mgr.get('min_detection_confidence', 0.3)
self.config['confirm_frames'] = config_mgr.get('confirm_frames', 3)
except:
pass
# 初始化检测器
self._init_detectors()
# 配置
self.config = {
'face_match_threshold': 0.6,
'unknown_person_id': 'unknown',
'max_persons': 100,
# YOLO 检测参数
'yolo_min_confidence': 0.3,
'confirm_frames': 3,
'leave_frames': 2,
}
# 从配置文件读取参数
try:
from config import config_mgr
self.config['yolo_min_confidence'] = config_mgr.get('yolo_min_confidence', 0.3)
self.config['face_match_threshold'] = config_mgr.get('face_match_threshold', 0.6)
self.config['confirm_frames'] = config_mgr.get('confirm_frames', 3)
self.config['leave_frames'] = config_mgr.get('leave_frames', 2)
except:
pass
# 追踪状态(连续判断)
self.tracked_persons = {}
self.prev_persons = []
self.confirmation_buffer = {}
# 统计
self.total_detections = 0
self.known_persons_detected = 0
self.new_persons_added = 0
def _load_persons_db(self):
"""加载人员数据库"""
if self.persons_db_path.exists():
try:
with open(self.persons_db_path, 'r', encoding='utf-8') as f:
return json.load(f)
except:
return {}
return {}
def _save_persons_db(self):
"""保存人员数据库"""
with open(self.persons_db_path, 'w', encoding='utf-8') as f:
json.dump(self.persons, f, ensure_ascii=False, indent=2)
def _init_detectors(self):
"""初始化检测器"""
# MediaPipe 人脸检测(目前不使用,由 YOLO 负责)
self.face_detector = None
self.mp_face_detection = None
# OpenCV 人脸检测(备用)
self.cv_face_detector = None
try:
model_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
if Path(model_path).exists():
self.cv_face_detector = cv2.CascadeClassifier(model_path)
print("[PersonManager] OpenCV Haar Cascade initialized (backup)")
except Exception as e:
print(f"[PersonManager] OpenCV detector init failed: {e}")
# YOLO 检测(主要检测器)
self.yolo_detector = None
try:
from ultralytics import YOLO
self.yolo_detector = YOLO('yolov8n.pt')
print("[PersonManager] YOLOv8nano initialized (primary detector)")
except ImportError:
print("[PersonManager] YOLO not installed. Install with: pip install ultralytics")
except Exception as e:
print(f"[PersonManager] YOLO init failed: {e}")
def detect_persons_yolo(self, image):
"""YOLO 人体检测(只检测是否有人)
Returns:
list: [{'bbox': [x,y,w,h], 'confidence': float}]
"""
persons = []
if self.yolo_detector is None:
return persons
min_conf = self.config.get('yolo_min_confidence', 0.3)
try:
results = self.yolo_detector(image, classes=[0], verbose=False) # class 0 = person
for r in results:
for box in r.boxes:
x1, y1, x2, y2 = box.xyxy[0].tolist()
conf = box.conf[0].item()
# 置信度过滤
if conf < min_conf:
continue
persons.append({
'bbox': [int(x1), int(y1), int(x2-x1), int(y2-y1)],
'confidence': conf,
'source': 'yolo'
})
if persons:
print(f"[PersonManager] YOLO detected {len(persons)} persons (conf > {min_conf})")
except Exception as e:
print(f"[PersonManager] YOLO detection failed: {e}")
return persons
def identify_person(self, image, person_bbox, person_index):
"""识别具体人(使用 face_recognition/MediaPipe/颜色直方图)
Args:
image: 图片
person_bbox: 人体 bbox
person_index: 人员序号
Returns:
dict: {'person_id': str, 'name': str, 'is_new': bool, 'confidence': float}
"""
x, y, w, h = person_bbox
# 从人体 bbox 中提取人脸区域(通常在上方)
face_region_y = y
face_region_h = int(h * 0.4) # 人脸约占人体高度的 40%
face_region = image[face_region_y:face_region_y+face_region_h, x:x+w]
if face_region.size == 0:
return {
'person_id': f"person_{person_index}",
'name': f"Person #{person_index}",
'is_new': True,
'confidence': 0.5,
'method': 'yolo_only'
}
# 方法1: face_recognition最准确
encoding = None
method_used = 'unknown'
if HAS_FACE_REC:
try:
rgb_face = cv2.cvtColor(face_region, cv2.COLOR_BGR2RGB)
encodings = face_recognition.face_encodings(rgb_face)
if len(encodings) > 0:
encoding = encodings[0]
method_used = 'face_recognition'
print(f"[PersonManager] #{person_index} Using face_recognition")
except Exception as e:
print(f"[PersonManager] #{person_index} face_recognition failed: {e}")
# 方法2: MediaPipe 人脸关键点
if encoding is None and self.has_mediapipe:
try:
import mediapipe as mp_local
mp_face_mesh = mp_local.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(
static_image_mode=True,
max_num_faces=1,
min_detection_confidence=self.config.get('yolo_min_confidence', 0.3)
)
rgb_face = cv2.cvtColor(face_region, cv2.COLOR_BGR2RGB)
results = face_mesh.process(rgb_face)
if results.multi_face_landmarks:
landmarks = results.multi_face_landmarks[0]
features = []
for landmark in landmarks.landmark:
features.extend([landmark.x, landmark.y, landmark.z])
encoding = np.array(features)
method_used = 'mediapipe'
print(f"[PersonManager] #{person_index} Using MediaPipe landmarks")
face_mesh.close()
except Exception as e:
print(f"[PersonManager] #{person_index} MediaPipe failed: {e}")
# 方法3: 颜色直方图(备用)
if encoding is None:
try:
face_resized = cv2.resize(face_region, (64, 64))
hsv = cv2.cvtColor(face_resized, cv2.COLOR_BGR2HSV)
hist_h = cv2.calcHist([hsv], [0], None, [16], [0, 180])
hist_s = cv2.calcHist([hsv], [1], None, [16], [0, 256])
hist_v = cv2.calcHist([hsv], [2], None, [16], [0, 256])
encoding = np.concatenate([
cv2.normalize(hist_h, hist_h).flatten(),
cv2.normalize(hist_s, hist_s).flatten(),
cv2.normalize(hist_v, hist_v).flatten()
])
method_used = 'color_histogram'
print(f"[PersonManager] #{person_index} Using color histogram (backup)")
except Exception as e:
print(f"[PersonManager] #{person_index} Histogram failed: {e}")
# 匹配人员库
if encoding is not None:
match_result = self.match_face(encoding)
match_result['method'] = method_used
return match_result
# 无法识别,返回默认
return {
'person_id': f"unknown_{person_index}",
'name': f"Person #{person_index}",
'is_new': True,
'confidence': 0.3,
'method': 'no_face'
}
def extract_face_encoding(self, image, face_bbox):
"""提取人脸特征(用于识别是否为同一个人)
使用 MediaPipe 的人脸关键点作为特征,不依赖 dlib
Args:
image: 图片
face_bbox: [x, y, w, h]
Returns:
numpy array: 人脸特征向量
"""
if isinstance(image, str):
image = cv2.imread(image)
if image is None:
return None
x, y, w, h = face_bbox
# 确保坐标有效
h_img, w_img = image.shape[:2]
x = max(0, min(x, w_img - 1))
y = max(0, min(y, h_img - 1))
w = max(1, min(w, w_img - x))
h = max(1, min(h, h_img - y))
# 提取人脸区域
face_image = image[y:y+h, x:x+w]
# 方法1使用 face_recognition如果安装了
if HAS_FACE_REC:
try:
rgb_face = cv2.cvtColor(face_image, cv2.COLOR_BGR2RGB)
encodings = face_recognition.face_encodings(rgb_face)
if len(encodings) > 0:
return encodings[0]
except:
pass
# 方法2使用 MediaPipe 人脸关键点(推荐)
if self.has_mediapipe:
try:
import mediapipe as mp_local
mp_face_mesh = mp_local.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(static_image_mode=True, max_num_faces=1)
rgb_face = cv2.cvtColor(face_image, cv2.COLOR_BGR2RGB)
results = face_mesh.process(rgb_face)
if results.multi_face_landmarks:
# 提取关键点坐标作为特征
landmarks = results.multi_face_landmarks[0]
features = []
for landmark in landmarks.landmark:
features.extend([landmark.x, landmark.y, landmark.z])
face_mesh.close()
return np.array(features)
except Exception as e:
print(f"[PersonManager] MediaPipe face_mesh failed: {e}")
pass
# 方法3使用颜色直方图最简单备用
face_resized = cv2.resize(face_image, (64, 64))
hsv = cv2.cvtColor(face_resized, cv2.COLOR_BGR2HSV)
hist_h = cv2.calcHist([hsv], [0], None, [16], [0, 180])
hist_s = cv2.calcHist([hsv], [1], None, [16], [0, 256])
hist_v = cv2.calcHist([hsv], [2], None, [16], [0, 256])
feature = np.concatenate([
cv2.normalize(hist_h, hist_h).flatten(),
cv2.normalize(hist_s, hist_s).flatten(),
cv2.normalize(hist_v, hist_v).flatten()
])
return feature
def match_face(self, face_encoding, threshold=None):
"""匹配人脸,找出对应的已知人员
Args:
face_encoding: 人脸特征向量
threshold: 匹配阈值
Returns:
dict: {'person_id': str, 'name': str, 'is_new': bool}
"""
if threshold is None:
threshold = self.config.get('face_match_threshold', 0.6)
if face_encoding is None:
return {'person_id': 'unknown', 'name': 'Unknown', 'is_new': False}
best_match = None
best_distance = float('inf')
for person_id, person_data in self.persons.items():
if 'face_encoding' in person_data:
stored_encoding = np.array(person_data['face_encoding'])
if HAS_FACE_REC:
# face_recognition 距离计算
distance = face_recognition.face_distance([stored_encoding], face_encoding)[0]
else:
# 简单特征距离
distance = np.linalg.norm(stored_encoding - face_encoding)
if distance < best_distance:
best_distance = distance
best_match = person_data
if best_match and best_distance < threshold:
self.known_persons_detected += 1
return {
'person_id': best_match.get('person_id'),
'name': best_match.get('name', 'Unknown'),
'is_new': False,
'confidence': 1 - best_distance
}
# 未匹配到,是新人员
return {
'person_id': 'unknown',
'name': 'Unknown',
'is_new': True,
'confidence': 0
}
def add_new_person(self, image, face_bbox, name=None):
"""添加新人员到库
Args:
image: 图片
face_bbox: 人脸位置
name: 人员名称(可选)
Returns:
dict: 新人员信息
"""
if isinstance(image, str):
image = cv2.imread(image)
# 提取特征
face_encoding = self.extract_face_encoding(image, face_bbox)
if face_encoding is None:
return None
# 生成人员ID
person_id = f"person_{len(self.persons) + 1}"
if name is None:
name = f"Person #{len(self.persons) + 1}"
# 保存人脸图片
x, y, w, h = face_bbox
face_image = image[y:y+h, x:x+w]
face_path = self.faces_dir / f"{person_id}.jpg"
cv2.imwrite(str(face_path), face_image)
# 记录到数据库
person_data = {
'person_id': person_id,
'name': name,
'face_encoding': face_encoding.tolist() if isinstance(face_encoding, np.ndarray) else face_encoding,
'face_path': str(face_path),
'first_seen': datetime.datetime.now().isoformat(),
'last_seen': datetime.datetime.now().isoformat(),
'visit_count': 1
}
self.persons[person_id] = person_data
self._save_persons_db()
self.new_persons_added += 1
print(f"[PersonManager] New person added: {person_id} ({name})")
return person_data
def update_person_visit(self, person_id):
"""更新人员访问记录"""
if person_id in self.persons:
self.persons[person_id]['last_seen'] = datetime.datetime.now().isoformat()
self.persons[person_id]['visit_count'] += 1
self._save_persons_db()
def analyze_image(self, image_path, save_new_person=True):
"""分析图片中的人员
流程:
1. YOLO 检测人体(是否有人)
2. face_recognition/MediaPipe/颜色直方图 识别具体人
3. 连续帧判断确认
Args:
image_path: 图片路径
save_new_person: 是否保存新人员
Returns:
dict: {
'persons': list, # 识别的人员(带序号)
'confirmed_change': bool,
'person_indices': list, # 人员序号列表
}
"""
image = cv2.imread(image_path)
if image is None:
return {'persons': [], 'error': 'Cannot load image'}
self.total_detections += 1
# Step 1: YOLO 检测人体
detected_persons = self.detect_persons_yolo(image)
current_count = len(detected_persons)
# Step 2: 识别每个检测到的人
identified_persons = []
for idx, person in enumerate(detected_persons):
person_index = idx + 1 # 序号从 1 开始
# 使用 face_recognition/MediaPipe/颜色直方图 识别
identity = self.identify_person(image, person['bbox'], person_index)
identified_persons.append({
'person_id': identity['person_id'],
'name': identity['name'],
'person_index': person_index,
'bbox': person['bbox'],
'is_new': identity['is_new'],
'confidence': identity.get('confidence', person['confidence']),
'method': identity.get('method', 'unknown'),
'yolo_confidence': person['confidence'],
'source': 'yolo'
})
# Step 3: 连续帧判断
confirmed_change = False
prev_count = len(self.prev_persons)
if current_count != prev_count:
# 人数变化
key = f"count_{current_count}"
if key not in self.confirmation_buffer:
self.confirmation_buffer[key] = {'count': 0, 'persons': []}
self.confirmation_buffer[key]['count'] += 1
self.confirmation_buffer[key]['persons'] = identified_persons
# 达到确认帧数
if self.confirmation_buffer[key]['count'] >= self.config['confirm_frames']:
confirmed_change = True
print(f"[PersonManager] Confirmed change: {prev_count} -> {current_count} (after {self.config['confirm_frames']} frames)")
# 保存新人员
if save_new_person and confirmed_change:
for person in identified_persons:
if person['is_new'] and len(self.persons) < self.config['max_persons']:
# 保存人脸特征和人脸图片
x, y, w, h = person['bbox']
face_region = image[y:y+int(h*0.4), x:x+w]
if face_region.size > 0:
encoding = self.extract_face_encoding(image, person['bbox'])
if encoding is not None:
person_id = f"person_{len(self.persons) + 1}"
person['person_id'] = person_id
person['name'] = f"Person #{len(self.persons) + 1}"
# 保存到人员库(包括人脸图片)
self.add_new_person_with_encoding(person_id, encoding, person['name'], face_region)
# 清空缓冲区,更新状态
self.confirmation_buffer = {key: self.confirmation_buffer[key]}
self.prev_persons = identified_persons
else:
# 人数不变,维持状态
self.prev_persons = identified_persons
# 清空其他变化缓冲区
keys_to_remove = [k for k in self.confirmation_buffer.keys() if k != f"count_{current_count}"]
for k in keys_to_remove:
del self.confirmation_buffer[k]
# 统计
new_count = sum(1 for p in identified_persons if p['is_new'])
known_count = current_count - new_count
return {
'persons': identified_persons,
'new_count': new_count,
'known_count': known_count,
'total_count': current_count,
'confirmed_change': confirmed_change,
'current_count': current_count,
'prev_count': prev_count,
'person_indices': [p['person_index'] for p in identified_persons],
'methods_used': [p['method'] for p in identified_persons],
'detection_source': 'yolo'
}
def add_new_person_with_encoding(self, person_id, encoding, name=None, face_image=None):
"""保存新人员到库(已有 encoding
Args:
person_id: 人员ID
encoding: 特征向量
name: 名称
face_image: 人脸图片(可选,用于保存人脸图片)
Returns:
dict: 人员信息
"""
if name is None:
name = person_id
# 保存人脸图片
face_path = ''
if face_image is not None and face_image.size > 0:
face_path = str(self.faces_dir / f"{person_id}.jpg")
cv2.imwrite(face_path, face_image)
print(f"[PersonManager] Face image saved: {face_path}")
person_data = {
'person_id': person_id,
'name': name,
'face_encoding': encoding.tolist() if isinstance(encoding, np.ndarray) else encoding,
'face_path': face_path,
'first_seen': datetime.datetime.now().isoformat(),
'last_seen': datetime.datetime.now().isoformat(),
'visit_count': 1
}
self.persons[person_id] = person_data
self._save_persons_db()
self.new_persons_added += 1
print(f"[PersonManager] New person saved: {person_id} ({name})")
return person_data
def get_persons_list(self):
"""获取人员列表"""
return [
{
'person_id': p['person_id'],
'name': p['name'],
'visit_count': p['visit_count'],
'first_seen': p['first_seen'], # 已经精确到秒
'last_seen': p['last_seen'], # 已经精确到秒
'face_path': p.get('face_path', ''), # 人脸图片路径
}
for p in self.persons.values()
]
def get_stats(self):
"""获取统计信息"""
return {
'total_persons': len(self.persons),
'total_detections': self.total_detections,
'known_persons_detected': self.known_persons_detected,
'new_persons_added': self.new_persons_added,
'recognition_rate': self.known_persons_detected / max(self.total_detections, 1)
}
def reset(self):
"""重置统计"""
self.total_detections = 0
self.known_persons_detected = 0
self.new_persons_added = 0
# 全局实例
person_manager = PersonManager()
if __name__ == "__main__":
# 测试
import sys
if len(sys.argv) >= 2:
test_image = sys.argv[1]
print(f"[Test] Analyzing: {test_image}")
result = person_manager.analyze_image(test_image)
print(f"[Test] Faces detected: {len(result['faces'])}")
print(f"[Test] Persons: {result['total_count']}")
print(f"[Test] New: {result['new_count']}, Known: {result['known_count']}")
for person in result['persons']:
status = "NEW" if person['is_new'] else "KNOWN"
print(f" - [{status}] {person['name']} (confidence: {person['confidence']:.2f})")
else:
print("Usage: python person_manager.py <image_path>")