feat: YOLO人体检测 + 三方法人员识别 + 前端人物序号显示

This commit is contained in:
2026-04-16 23:48:40 +08:00
parent 548bb76efc
commit 57437de02d
4 changed files with 302 additions and 196 deletions

View File

@@ -132,33 +132,35 @@ class LocalAnalyzer:
use_haar = config_mgr.get('use_haar_cascade', True)
use_mediapipe = config_mgr.get('use_mediapipe_face', True)
# 方法1MediaPipe检测 + 人员识别(优先)
# 方法1YOLO检测 + 人员识别
if HAS_PERSON_MANAGER and use_mediapipe:
print(f"[LocalAnalyzer] Using PersonManager for detection...")
print(f"[LocalAnalyzer] Using YOLO + person identification...")
person_result = person_manager.analyze_image(image_path, save_new_person=True)
metrics['person_count'] = person_result['total_count']
metrics['new_persons'] = person_result['new_count']
metrics['known_persons'] = person_result['known_count']
metrics['detection_source'] = person_result.get('detection_source', 'unknown')
metrics['detection_source'] = 'yolo'
metrics['methods_used'] = person_result.get('methods_used', [])
metrics['person_indices'] = person_result.get('person_indices', [])
prev_person_count = self.prev_human_count
current_count = person_result['current_count']
person_count_change = current_count - prev_person_count
# 只有确认的变化才记录
# 只有确认的变化才记录事件
if person_result['confirmed_change']:
metrics['person_count_change'] = person_count_change
# 记录人员事件(带序号)
# 记录人员事件(带序号和方法
for person in person_result['persons']:
person_index = person.get('person_index', 1)
detection_source = person.get('source', 'unknown')
method = person.get('method', 'unknown')
if person['is_new']:
events.append({
'event_type': '人物活动',
'description': f'#{person_index} 新人: {person["name"]} [{detection_source}],当前共 {current_count}',
'description': f'#{person_index} 新人 [{method}]',
'confidence': '',
'source': 'local',
'person_index': person_index
@@ -168,21 +170,21 @@ class LocalAnalyzer:
else:
events.append({
'event_type': '人物活动',
'description': f'#{person_index} 已知人员: {person["name"]} [{detection_source}]',
'description': f'#{person_index} {person["name"]} [{method}]',
'confidence': '',
'source': 'local',
'person_index': person_index
})
# 检测人员进出(带序号)
# 检测人员进出
if person_count_change > 0:
# 列出新进入的人员序号
new_indices = [p.get('person_index', i+1) for i, p in enumerate(person_result['persons'][-person_count_change:])]
indices = person_result.get('person_indices', [])
events.append({
'event_type': '人员进出',
'description': f'#{", #".join(map(str, new_indices))} 进入,当前共 {current_count} [{person_result.get("detection_source", "")}]',
'description': f'#{" #".join(map(str, indices[-person_count_change:]))} 进入,当前共 {current_count}',
'confidence': '',
'source': 'local'
'source': 'local',
'person_indices': indices
})
self.person_change_count += 1
elif person_count_change < 0:
@@ -196,15 +198,19 @@ class LocalAnalyzer:
self.prev_human_count = current_count
else:
# 没有确认变化,只记录当前状态
# 没有确认变化,只记录当前状态
metrics['person_count_change'] = 0
if current_count > 0:
events.append({
'event_type': '人物活动',
'description': f'检测到 {current_count} 人(状态稳定)',
'confidence': '',
'source': 'local'
})
indices = person_result.get('person_indices', [])
methods = person_result.get('methods_used', [])
for i, person in enumerate(person_result['persons']):
events.append({
'event_type': '人物活动',
'description': f'#{person.get("person_index", i+1)} [{methods[i] if i < len(methods) else "unknown"}]',
'confidence': '',
'source': 'local',
'person_index': person.get('person_index', i+1)
})
# 方法2Haar Cascade 人体检测(备用或并行)
if use_haar and self.human_cascade is not None:

View File

@@ -140,89 +140,146 @@ class PersonManager:
except Exception as e:
print(f"[PersonManager] YOLO init failed: {e}")
def detect_faces(self, image):
"""检测人脸(优先使用 YOLO其次 MediaPipe最后 Haar
def detect_persons_yolo(self, image):
"""YOLO 人体检测(只检测是否有人
Args:
image: 图片numpy array 或路径)
Returns:
list: [{'bbox': [x,y,w,h], 'confidence': float}]
"""
if isinstance(image, str):
image = cv2.imread(image)
persons = []
if image is None:
return []
if self.yolo_detector is None:
return persons
faces = []
# 方案3: YOLO 检测(优先,最准确)
if self.yolo_detector is not None:
try:
results = self.yolo_detector(image, classes=[0], verbose=False) # class 0 = person
for r in results:
for box in r.boxes:
x1, y1, x2, y2 = box.xyxy[0].tolist()
conf = box.conf[0].item()
# 转换为 [x, y, w, h] 格式
faces.append({
'bbox': [int(x1), int(y1), int(x2-x1), int(y2-y1)],
'confidence': conf,
'source': 'yolo'
})
if faces:
print(f"[PersonManager] YOLO detected {len(faces)} persons")
return faces # YOLO 检测成功,直接返回
except Exception as e:
print(f"[PersonManager] YOLO detection failed: {e}")
# 方案1+2: MediaPipe 检测
if self.has_mediapipe and self.face_detector is not None:
rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
results = self.face_detector.process(rgb_image)
try:
results = self.yolo_detector(image, classes=[0], verbose=False) # class 0 = person
if results.detections:
for detection in results.detections:
bboxC = detection.location_data.relative_bounding_box
h, w, _ = image.shape
for r in results:
for box in r.boxes:
x1, y1, x2, y2 = box.xyxy[0].tolist()
conf = box.conf[0].item()
x = int(bboxC.xmin * w)
y = int(bboxC.ymin * h)
width = int(bboxC.width * w)
height = int(bboxC.height * h)
# 置信度过滤
min_conf = self.config.get('mediapipe_min_confidence', 0.3)
if conf < min_conf:
continue
faces.append({
'bbox': [x, y, width, height],
'confidence': detection.score[0],
'source': 'mediapipe'
persons.append({
'bbox': [int(x1), int(y1), int(x2-x1), int(y2-y1)],
'confidence': conf,
'source': 'yolo'
})
if faces:
return faces
# 备用: OpenCV Haar 检测
if self.cv_face_detector is not None:
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
detections = self.cv_face_detector.detectMultiScale(
gray,
scaleFactor=self.config['haar_scale_factor'],
minNeighbors=self.config['haar_min_neighbors'],
minSize=(30, 30)
)
for (x, y, w, h) in detections:
faces.append({
'bbox': [x, y, w, h],
'confidence': 0.8,
'source': 'opencv'
})
if persons:
print(f"[PersonManager] YOLO detected {len(persons)} persons (conf > {min_conf})")
except Exception as e:
print(f"[PersonManager] YOLO detection failed: {e}")
return faces
return persons
def identify_person(self, image, person_bbox, person_index):
"""识别具体人(使用 face_recognition/MediaPipe/颜色直方图)
Args:
image: 图片
person_bbox: 人体 bbox
person_index: 人员序号
Returns:
dict: {'person_id': str, 'name': str, 'is_new': bool, 'confidence': float}
"""
x, y, w, h = person_bbox
# 从人体 bbox 中提取人脸区域(通常在上方)
face_region_y = y
face_region_h = int(h * 0.4) # 人脸约占人体高度的 40%
face_region = image[face_region_y:face_region_y+face_region_h, x:x+w]
if face_region.size == 0:
return {
'person_id': f"person_{person_index}",
'name': f"Person #{person_index}",
'is_new': True,
'confidence': 0.5,
'method': 'yolo_only'
}
# 方法1: face_recognition最准确
encoding = None
method_used = 'unknown'
if HAS_FACE_REC:
try:
rgb_face = cv2.cvtColor(face_region, cv2.COLOR_BGR2RGB)
encodings = face_recognition.face_encodings(rgb_face)
if len(encodings) > 0:
encoding = encodings[0]
method_used = 'face_recognition'
print(f"[PersonManager] #{person_index} Using face_recognition")
except Exception as e:
print(f"[PersonManager] #{person_index} face_recognition failed: {e}")
# 方法2: MediaPipe 人脸关键点
if encoding is None and self.has_mediapipe:
try:
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(
static_image_mode=True,
max_num_faces=1,
min_detection_confidence=self.config.get('mediapipe_min_confidence', 0.3)
)
rgb_face = cv2.cvtColor(face_region, cv2.COLOR_BGR2RGB)
results = face_mesh.process(rgb_face)
if results.multi_face_landmarks:
landmarks = results.multi_face_landmarks[0]
features = []
for landmark in landmarks.landmark:
features.extend([landmark.x, landmark.y, landmark.z])
encoding = np.array(features)
method_used = 'mediapipe'
print(f"[PersonManager] #{person_index} Using MediaPipe landmarks")
face_mesh.close()
except Exception as e:
print(f"[PersonManager] #{person_index} MediaPipe failed: {e}")
# 方法3: 颜色直方图(备用)
if encoding is None:
try:
face_resized = cv2.resize(face_region, (64, 64))
hsv = cv2.cvtColor(face_resized, cv2.COLOR_BGR2HSV)
hist_h = cv2.calcHist([hsv], [0], None, [16], [0, 180])
hist_s = cv2.calcHist([hsv], [1], None, [16], [0, 256])
hist_v = cv2.calcHist([hsv], [2], None, [16], [0, 256])
encoding = np.concatenate([
cv2.normalize(hist_h, hist_h).flatten(),
cv2.normalize(hist_s, hist_s).flatten(),
cv2.normalize(hist_v, hist_v).flatten()
])
method_used = 'color_histogram'
print(f"[PersonManager] #{person_index} Using color histogram (backup)")
except Exception as e:
print(f"[PersonManager] #{person_index} Histogram failed: {e}")
# 匹配人员库
if encoding is not None:
match_result = self.match_face(encoding)
match_result['method'] = method_used
return match_result
# 无法识别,返回默认
return {
'person_id': f"unknown_{person_index}",
'name': f"Person #{person_index}",
'is_new': True,
'confidence': 0.3,
'method': 'no_face'
}
def extract_face_encoding(self, image, face_bbox):
"""提取人脸特征(用于识别是否为同一个人)
@@ -412,7 +469,12 @@ class PersonManager:
self._save_persons_db()
def analyze_image(self, image_path, save_new_person=True):
"""分析图片中的人员(带连续性判断)
"""分析图片中的人员
流程:
1. YOLO 检测人体(是否有人)
2. face_recognition/MediaPipe/颜色直方图 识别具体人
3. 连续帧判断确认
Args:
image_path: 图片路径
@@ -420,146 +482,138 @@ class PersonManager:
Returns:
dict: {
'faces': list, # 检测到的人脸
'persons': list, # 识别的人员
'new_count': int, # 人员数量
'known_count': int, # 已知人员数量
'confirmed_change': bool, # 是否有确认的人员变化
'persons': list, # 识别的人员(带序号)
'confirmed_change': bool,
'person_indices': list, # 人员序号列表
}
"""
image = cv2.imread(image_path)
if image is None:
return {'faces': [], 'persons': [], 'error': 'Cannot load image'}
return {'persons': [], 'error': 'Cannot load image'}
self.total_detections += 1
# 检测人
faces = self.detect_faces(image)
current_count = len(faces)
# Step 1: YOLO 检测人
detected_persons = self.detect_persons_yolo(image)
current_count = len(detected_persons)
# 方案2: 连续性判断
# Step 2: 识别每个检测到的人
identified_persons = []
for idx, person in enumerate(detected_persons):
person_index = idx + 1 # 序号从 1 开始
# 使用 face_recognition/MediaPipe/颜色直方图 识别
identity = self.identify_person(image, person['bbox'], person_index)
identified_persons.append({
'person_id': identity['person_id'],
'name': identity['name'],
'person_index': person_index,
'bbox': person['bbox'],
'is_new': identity['is_new'],
'confidence': identity.get('confidence', person['confidence']),
'method': identity.get('method', 'unknown'),
'yolo_confidence': person['confidence'],
'source': 'yolo'
})
# Step 3: 连续帧判断
confirmed_change = False
confirmed_persons = []
# 为每个检测到的人分配序号
person_index = self.prev_human_count # 从当前人数开始
# 检查人数变化
prev_count = len(self.prev_persons)
if current_count != prev_count:
# 人数变化,记录到缓冲区
# 人数变化
key = f"count_{current_count}"
if key not in self.confirmation_buffer:
self.confirmation_buffer[key] = {'count': 0, 'persons': []}
self.confirmation_buffer[key]['count'] += 1
# 临时识别人员并分配序号
temp_persons = []
for idx, face in enumerate(faces):
bbox = face['bbox']
encoding = self.extract_face_encoding(image, bbox)
match_result = self.match_face(encoding)
# 分配人员序号
person_index_display = idx + 1 # 序号从1开始
person_info = {
'person_id': match_result['person_id'] if not match_result['is_new'] else f"unknown_{person_index_display}",
'name': match_result['name'] if not match_result['is_new'] else f"Person #{person_index_display}",
'person_index': person_index_display, # 显示序号
'bbox': bbox,
'is_new': match_result['is_new'],
'confidence': face['confidence'],
'source': face['source']
}
temp_persons.append(person_info)
self.confirmation_buffer[key]['persons'] = temp_persons
self.confirmation_buffer[key]['persons'] = identified_persons
# 达到确认帧数
if self.confirmation_buffer[key]['count'] >= self.config['confirm_frames']:
confirmed_change = True
confirmed_persons = temp_persons
print(f"[PersonManager] Confirmed: {prev_count} -> {current_count} persons (after {self.config['confirm_frames']} frames)")
for p in confirmed_persons:
print(f" - {p['name']} (#{p['person_index']}) [{p['source']}]")
print(f"[PersonManager] Confirmed change: {prev_count} -> {current_count} (after {self.config['confirm_frames']} frames)")
# 清空其他缓冲区
self.confirmation_buffer = {}
# 更新前一帧状态
self.prev_persons = temp_persons
# 保存新人员
if save_new_person and confirmed_change:
for person in identified_persons:
if person['is_new'] and len(self.persons) < self.config['max_persons']:
# 保存人脸特征
x, y, w, h = person['bbox']
face_region = image[y:y+int(h*0.4), x:x+w]
if face_region.size > 0:
encoding = self.extract_face_encoding(image, person['bbox'])
if encoding is not None:
person_id = f"person_{len(self.persons) + 1}"
person['person_id'] = person_id
person['name'] = f"Person #{len(self.persons) + 1}"
# 保存到人员库
self.add_new_person_with_encoding(person_id, encoding, person['name'])
# 清空缓冲区,更新状态
self.confirmation_buffer = {key: self.confirmation_buffer[key]}
self.prev_persons = identified_persons
else:
# 人数不变,清空变化缓冲区,维持当前状态
if current_count > 0:
# 识别当前人员并分配序号
temp_persons = []
for idx, face in enumerate(faces):
bbox = face['bbox']
encoding = self.extract_face_encoding(image, bbox)
match_result = self.match_face(encoding)
person_index_display = idx + 1
person_info = {
'person_id': match_result['person_id'] if not match_result['is_new'] else f"unknown_{person_index_display}",
'name': match_result['name'] if not match_result['is_new'] else f"Person #{person_index_display}",
'person_index': person_index_display,
'bbox': bbox,
'is_new': match_result['is_new'],
'confidence': face['confidence'],
'source': face['source']
}
temp_persons.append(person_info)
confirmed_persons = temp_persons
self.prev_persons = temp_persons
# 人数不变,维持状态
self.prev_persons = identified_persons
# 清空变化缓冲区
keys_to_remove = [k for k in self.confirmation_buffer.keys() if not k.endswith(f"_{current_count}")]
# 清空其他变化缓冲区
keys_to_remove = [k for k in self.confirmation_buffer.keys() if k != f"count_{current_count}"]
for k in keys_to_remove:
del self.confirmation_buffer[k]
# 统计新人和已知人员
new_count = 0
known_count = 0
persons_to_save = []
for person in confirmed_persons:
if person['is_new']:
new_count += 1
# 只有确认后才保存新人
if confirmed_change and save_new_person and len(self.persons) < self.config['max_persons']:
# 找到对应的 face bbox
for face in faces:
if face['bbox'] == person['bbox']:
new_person = self.add_new_person(image, face['bbox'])
if new_person:
person['person_id'] = new_person['person_id']
person['name'] = new_person['name']
persons_to_save.append(person)
break
else:
known_count += 1
self.update_person_visit(person['person_id'])
persons_to_save.append(person)
# 统计
new_count = sum(1 for p in identified_persons if p['is_new'])
known_count = current_count - new_count
return {
'faces': faces,
'persons': persons_to_save,
'persons': identified_persons,
'new_count': new_count,
'known_count': known_count,
'total_count': len(persons_to_save),
'total_count': current_count,
'confirmed_change': confirmed_change,
'current_count': current_count,
'prev_count': prev_count,
'detection_source': faces[0]['source'] if faces else 'none'
'person_indices': [p['person_index'] for p in identified_persons],
'methods_used': [p['method'] for p in identified_persons],
'detection_source': 'yolo'
}
def add_new_person_with_encoding(self, person_id, encoding, name=None):
"""保存新人员到库(已有 encoding
Args:
person_id: 人员ID
encoding: 特征向量
name: 名称
Returns:
dict: 人员信息
"""
if name is None:
name = person_id
person_data = {
'person_id': person_id,
'name': name,
'face_encoding': encoding.tolist() if isinstance(encoding, np.ndarray) else encoding,
'first_seen': datetime.datetime.now().isoformat(),
'last_seen': datetime.datetime.now().isoformat(),
'visit_count': 1
}
self.persons[person_id] = person_data
self._save_persons_db()
self.new_persons_added += 1
print(f"[PersonManager] New person saved: {person_id} ({name})")
return person_data
def get_persons_list(self):
"""获取人员列表"""
return [

View File

@@ -184,9 +184,28 @@ function renderImages(images) {
var status = img.analyzed ? 'Analyzed' : 'Unanalyzed';
var events = img.events_summary || 'No events';
// Check for person indices in events
var personIndices = [];
if (img.events && img.events.length > 0) {
img.events.forEach(function(event) {
var match = event.description.match(/#(\d+)/g);
if (match) {
match.forEach(function(m) {
if (personIndices.indexOf(m) === -1) {
personIndices.push(m);
}
});
}
});
}
var indicesDisplay = personIndices.length > 0 ?
'<span class="image-person-indices">' + personIndices.slice(0, 3).join(' ') + '</span>' : '';
item.innerHTML = '<span class="image-number">#' + img.id + '</span>' +
'<span class="image-time">' + time + '</span>' +
'<span class="image-status">' + status + '</span>' +
indicesDisplay +
'<span class="image-events-summary">' + events + '</span>';
list.appendChild(item);
@@ -226,7 +245,19 @@ function openImageModal(imageId) {
if (localEvents.length > 0) {
var localSection = document.createElement('div');
localSection.className = 'modal-events-section';
localSection.innerHTML = '<h4 class="section-title local">Local Analysis (' + localEvents.length + ') </h4>';
// 显示人员序号
var personIndices = [];
localEvents.forEach(function(event) {
if (event.person_index) {
personIndices.push('#' + event.person_index);
}
});
var indicesDisplay = personIndices.length > 0 ?
' <span class="person-indices">[' + personIndices.join(', ') + ']</span>' : '';
localSection.innerHTML = '<h4 class="section-title local">Local Analysis (' + localEvents.length + ')' + indicesDisplay + '</h4>';
localEvents.forEach(function(event) {
var div = document.createElement('div');

View File

@@ -255,6 +255,21 @@ button {
flex: 1;
}
.image-person-indices {
background: #667eea;
color: white;
padding: 2px 8px;
border-radius: 3px;
font-size: 12px;
font-weight: bold;
margin-right: 10px;
}
.person-indices {
color: #667eea;
font-weight: bold;
}
/* 事件列表 */
.events-list {
max-height: 500px;