Files
vision-record/person_manager.py

471 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Person Manager - 人员识别与管理模块
功能:
- 人脸检测MediaPipe
- 人脸识别,判断是否为同一个人
- 人员库管理,新人自动添加
- 追踪人员进出记录
"""
import cv2
import numpy as np
import json
import datetime
from pathlib import Path
from config import DATA_DIR
try:
import mediapipe as mp
HAS_MEDIAPIPE = True
except ImportError:
HAS_MEDIAPIPE = False
print("[PersonManager] MediaPipe not installed, using basic detection")
try:
import face_recognition
HAS_FACE_REC = True
except ImportError:
HAS_FACE_REC = False
print("[PersonManager] face_recognition not installed, using basic matching")
class PersonManager:
"""人员识别与管理器"""
def __init__(self):
self.persons_db_path = DATA_DIR / "persons.json"
self.faces_dir = DATA_DIR / "faces"
# 创建目录
self.faces_dir.mkdir(parents=True, exist_ok=True)
# 加载人员库
self.persons = self._load_persons_db()
# 初始化检测器
self._init_detectors()
# 配置
self.config = {
'face_match_threshold': 0.6, # 人脸匹配阈值
'unknown_person_id': 'unknown', # 未知人员ID
'max_persons': 100, # 最大人员数量
}
# 统计
self.total_detections = 0
self.known_persons_detected = 0
self.new_persons_added = 0
def _load_persons_db(self):
"""加载人员数据库"""
if self.persons_db_path.exists():
try:
with open(self.persons_db_path, 'r', encoding='utf-8') as f:
return json.load(f)
except:
return {}
return {}
def _save_persons_db(self):
"""保存人员数据库"""
with open(self.persons_db_path, 'w', encoding='utf-8') as f:
json.dump(self.persons, f, ensure_ascii=False, indent=2)
def _init_detectors(self):
"""初始化检测器"""
# MediaPipe 人脸检测
if HAS_MEDIAPIPE:
self.mp_face_detection = mp.solutions.face_detection
self.face_detector = self.mp_face_detection.FaceDetection(
model_selection=0, # 0: 短距离1: 远距离
min_detection_confidence=0.5
)
print("[PersonManager] MediaPipe face detector initialized")
else:
# OpenCV DNN 人脸检测
try:
model_path = cv2.data.haarcascades + 'haarcascade_frontalface_default.xml'
self.cv_face_detector = cv2.CascadeClassifier(model_path)
print("[PersonManager] OpenCV face detector initialized")
except:
self.cv_face_detector = None
print("[PersonManager] No face detector available")
def detect_faces(self, image):
"""检测人脸
Args:
image: 图片numpy array 或路径)
Returns:
list: [{'bbox': [x,y,w,h], 'confidence': float}]
"""
if isinstance(image, str):
image = cv2.imread(image)
if image is None:
return []
faces = []
# MediaPipe 检测
if HAS_MEDIAPIPE and hasattr(self, 'face_detector'):
rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
results = self.face_detector.process(rgb_image)
if results.detections:
for detection in results.detections:
bboxC = detection.location_data.relative_bounding_box
h, w, _ = image.shape
x = int(bboxC.xmin * w)
y = int(bboxC.ymin * h)
width = int(bboxC.width * w)
height = int(bboxC.height * h)
faces.append({
'bbox': [x, y, width, height],
'confidence': detection.score[0],
'source': 'mediapipe'
})
# OpenCV 检测(备用)
elif self.cv_face_detector is not None:
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
detections = self.cv_face_detector.detectMultiScale(
gray,
scaleFactor=1.1,
minNeighbors=5,
minSize=(30, 30)
)
for (x, y, w, h) in detections:
faces.append({
'bbox': [x, y, w, h],
'confidence': 0.8,
'source': 'opencv'
})
return faces
def extract_face_encoding(self, image, face_bbox):
"""提取人脸特征(用于识别是否为同一个人)
使用 MediaPipe 的人脸关键点作为特征,不依赖 dlib
Args:
image: 图片
face_bbox: [x, y, w, h]
Returns:
numpy array: 人脸特征向量
"""
if isinstance(image, str):
image = cv2.imread(image)
if image is None:
return None
x, y, w, h = face_bbox
# 确保坐标有效
h_img, w_img = image.shape[:2]
x = max(0, min(x, w_img - 1))
y = max(0, min(y, h_img - 1))
w = max(1, min(w, w_img - x))
h = max(1, min(h, h_img - y))
# 提取人脸区域
face_image = image[y:y+h, x:x+w]
# 方法1使用 face_recognition如果安装了
if HAS_FACE_REC:
try:
rgb_face = cv2.cvtColor(face_image, cv2.COLOR_BGR2RGB)
encodings = face_recognition.face_encodings(rgb_face)
if len(encodings) > 0:
return encodings[0]
except:
pass
# 方法2使用 MediaPipe 人脸关键点(推荐)
if HAS_MEDIAPIPE:
try:
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(static_image_mode=True, max_num_faces=1)
rgb_face = cv2.cvtColor(face_image, cv2.COLOR_BGR2RGB)
results = face_mesh.process(rgb_face)
if results.multi_face_landmarks:
# 提取关键点坐标作为特征
landmarks = results.multi_face_landmarks[0]
features = []
for landmark in landmarks.landmark:
features.extend([landmark.x, landmark.y, landmark.z])
face_mesh.close()
return np.array(features)
except:
pass
# 方法3使用颜色直方图最简单备用
face_resized = cv2.resize(face_image, (64, 64))
hsv = cv2.cvtColor(face_resized, cv2.COLOR_BGR2HSV)
hist_h = cv2.calcHist([hsv], [0], None, [16], [0, 180])
hist_s = cv2.calcHist([hsv], [1], None, [16], [0, 256])
hist_v = cv2.calcHist([hsv], [2], None, [16], [0, 256])
feature = np.concatenate([
cv2.normalize(hist_h, hist_h).flatten(),
cv2.normalize(hist_s, hist_s).flatten(),
cv2.normalize(hist_v, hist_v).flatten()
])
return feature
def match_face(self, face_encoding, threshold=None):
"""匹配人脸,找出对应的已知人员
Args:
face_encoding: 人脸特征向量
threshold: 匹配阈值
Returns:
dict: {'person_id': str, 'name': str, 'is_new': bool}
"""
if threshold is None:
threshold = self.config['face_match_threshold']
if face_encoding is None:
return {'person_id': 'unknown', 'name': 'Unknown', 'is_new': False}
best_match = None
best_distance = float('inf')
for person_id, person_data in self.persons.items():
if 'face_encoding' in person_data:
stored_encoding = np.array(person_data['face_encoding'])
if HAS_FACE_REC:
# face_recognition 距离计算
distance = face_recognition.face_distance([stored_encoding], face_encoding)[0]
else:
# 简单特征距离
distance = np.linalg.norm(stored_encoding - face_encoding)
if distance < best_distance:
best_distance = distance
best_match = person_data
if best_match and best_distance < threshold:
self.known_persons_detected += 1
return {
'person_id': best_match.get('person_id'),
'name': best_match.get('name', 'Unknown'),
'is_new': False,
'confidence': 1 - best_distance
}
# 未匹配到,是新人员
return {
'person_id': 'unknown',
'name': 'Unknown',
'is_new': True,
'confidence': 0
}
def add_new_person(self, image, face_bbox, name=None):
"""添加新人员到库
Args:
image: 图片
face_bbox: 人脸位置
name: 人员名称(可选)
Returns:
dict: 新人员信息
"""
if isinstance(image, str):
image = cv2.imread(image)
# 提取特征
face_encoding = self.extract_face_encoding(image, face_bbox)
if face_encoding is None:
return None
# 生成人员ID
person_id = f"person_{len(self.persons) + 1}"
if name is None:
name = f"Person #{len(self.persons) + 1}"
# 保存人脸图片
x, y, w, h = face_bbox
face_image = image[y:y+h, x:x+w]
face_path = self.faces_dir / f"{person_id}.jpg"
cv2.imwrite(str(face_path), face_image)
# 记录到数据库
person_data = {
'person_id': person_id,
'name': name,
'face_encoding': face_encoding.tolist() if isinstance(face_encoding, np.ndarray) else face_encoding,
'face_path': str(face_path),
'first_seen': datetime.datetime.now().isoformat(),
'last_seen': datetime.datetime.now().isoformat(),
'visit_count': 1
}
self.persons[person_id] = person_data
self._save_persons_db()
self.new_persons_added += 1
print(f"[PersonManager] New person added: {person_id} ({name})")
return person_data
def update_person_visit(self, person_id):
"""更新人员访问记录"""
if person_id in self.persons:
self.persons[person_id]['last_seen'] = datetime.datetime.now().isoformat()
self.persons[person_id]['visit_count'] += 1
self._save_persons_db()
def analyze_image(self, image_path, save_new_person=True):
"""分析图片中的人员
Args:
image_path: 图片路径
save_new_person: 是否保存新人员
Returns:
dict: {
'faces': list, # 检测到的人脸
'persons': list, # 识别的人员
'new_count': int, # 新人员数量
'known_count': int, # 已知人员数量
}
"""
image = cv2.imread(image_path)
if image is None:
return {'faces': [], 'persons': [], 'error': 'Cannot load image'}
self.total_detections += 1
# 检测人脸
faces = self.detect_faces(image)
persons = []
new_count = 0
known_count = 0
for face in faces:
bbox = face['bbox']
# 提取特征
encoding = self.extract_face_encoding(image, bbox)
# 匹配
match_result = self.match_face(encoding)
if match_result['is_new']:
# 新人员
new_count += 1
if save_new_person and len(self.persons) < self.config['max_persons']:
new_person = self.add_new_person(image, bbox)
if new_person:
persons.append({
'person_id': new_person['person_id'],
'name': new_person['name'],
'bbox': bbox,
'is_new': True,
'confidence': face['confidence']
})
else:
persons.append({
'person_id': 'unknown',
'name': 'Unknown (new)',
'bbox': bbox,
'is_new': True,
'confidence': face['confidence']
})
else:
# 已知人员
known_count += 1
self.update_person_visit(match_result['person_id'])
persons.append({
'person_id': match_result['person_id'],
'name': match_result['name'],
'bbox': bbox,
'is_new': False,
'confidence': match_result['confidence']
})
return {
'faces': faces,
'persons': persons,
'new_count': new_count,
'known_count': known_count,
'total_count': len(persons)
}
def get_persons_list(self):
"""获取人员列表"""
return [
{
'person_id': p['person_id'],
'name': p['name'],
'visit_count': p['visit_count'],
'first_seen': p['first_seen'],
'last_seen': p['last_seen']
}
for p in self.persons.values()
]
def get_stats(self):
"""获取统计信息"""
return {
'total_persons': len(self.persons),
'total_detections': self.total_detections,
'known_persons_detected': self.known_persons_detected,
'new_persons_added': self.new_persons_added,
'recognition_rate': self.known_persons_detected / max(self.total_detections, 1)
}
def reset(self):
"""重置统计"""
self.total_detections = 0
self.known_persons_detected = 0
self.new_persons_added = 0
# 全局实例
person_manager = PersonManager()
if __name__ == "__main__":
# 测试
import sys
if len(sys.argv) >= 2:
test_image = sys.argv[1]
print(f"[Test] Analyzing: {test_image}")
result = person_manager.analyze_image(test_image)
print(f"[Test] Faces detected: {len(result['faces'])}")
print(f"[Test] Persons: {result['total_count']}")
print(f"[Test] New: {result['new_count']}, Known: {result['known_count']}")
for person in result['persons']:
status = "NEW" if person['is_new'] else "KNOWN"
print(f" - [{status}] {person['name']} (confidence: {person['confidence']:.2f})")
else:
print("Usage: python person_manager.py <image_path>")