Files
vision-record/local_analyzer.py

508 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Local Analyzer - 本地视觉分析(无需大模型)
使用 OpenCV 传统方法进行快速检测
功能:
- 帧间差分:检测运动
- 背景建模:检测前景物体
- 人员识别:检测并识别人物(新人员自动入库)
- 亮度检测:检测光线变化
- 自动判断是否需要调用大模型
"""
import cv2
import numpy as np
from pathlib import Path
import datetime
import sys
import os
# 添加项目路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from config import config_mgr
try:
from person_manager import person_manager
HAS_PERSON_MANAGER = True
except ImportError:
HAS_PERSON_MANAGER = False
print("[LocalAnalyzer] PersonManager not available")
try:
import mediapipe as mp
HAS_MEDIAPIPE = True
except ImportError:
HAS_MEDIAPIPE = False
class LocalAnalyzer:
"""本地视觉分析器"""
def __init__(self):
self.prev_frame = None
self.background_model = None
self.human_cascade = None
self.prev_human_count = 0 # 记录前一帧人数
# 初始化人体检测器
self._init_detectors()
# 阈值配置
self.config = {
'motion_threshold': 0.05, # 运动面积阈值5%
'human_scale_factor': 1.1, # 人体检测缩放因子
'human_min_neighbors': 3, # 人体检测最小邻居数
'brightness_change_threshold': 30, # 亮度变化阈值
'trigger_model_threshold': 0.08, # 触发大模型的阈值
'human_count_change_threshold': 1, # 人数变化阈值
}
# 统计
self.frame_count = 0
self.motion_count = 0
self.human_count = 0
self.person_change_count = 0 # 人员变化次数
def _init_detectors(self):
"""初始化检测器"""
# Haar Cascade 人体检测
try:
cascade_path = cv2.data.haarcascades + 'haarcascade_fullbody.xml'
if Path(cascade_path).exists():
self.human_cascade = cv2.CascadeClassifier(cascade_path)
print("[LocalAnalyzer] Haar Cascade body detector initialized")
except Exception as e:
print(f"[LocalAnalyzer] Haar Cascade init failed: {e}")
# MediaPipe 人脸检测状态(由 PersonManager 管理)
if HAS_PERSON_MANAGER:
print("[LocalAnalyzer] PersonManager available (MediaPipe face detection)")
def analyze(self, image_path, prev_image_path=None):
"""
分析单张图片
Args:
image_path: 当前图片路径
prev_image_path: 前一张图片路径(可选)
Returns:
dict: {
'success': bool,
'events': list, # 本地检测到的初级事件
'need_model': bool, # 是否需要大模型分析
'metrics': dict, # 各项指标
'error': str
}
"""
try:
print(f"[LocalAnalyzer] Loading image: {image_path}")
# 加载当前图片
current_frame = cv2.imread(image_path)
if current_frame is None:
return {'success': False, 'error': f'无法加载图片: {image_path}'}
self.frame_count += 1
# 转为灰度图
current_gray = cv2.cvtColor(current_frame, cv2.COLOR_BGR2GRAY)
current_gray = cv2.GaussianBlur(current_gray, (21, 21), 0)
events = []
metrics = {}
# 1. 运动检测(帧间差分)
motion_result = self._detect_motion(current_gray, prev_image_path)
metrics['motion_ratio'] = motion_result['ratio']
if motion_result['detected']:
events.append({
'event_type': '运动检测',
'description': f'画面有运动,变化区域 {motion_result["ratio"]:.1%}',
'confidence': '',
'source': 'local'
})
self.motion_count += 1
# 2. 人员检测与识别
person_result = {'persons': [], 'total_count': 0, 'new_count': 0, 'known_count': 0}
haar_result = {'count': 0, 'positions': []}
# 获取检测算法配置
use_haar = config_mgr.get('use_haar_cascade', True)
use_mediapipe = config_mgr.get('use_mediapipe_face', True)
# 方法1YOLO 人体检测 + 人员识别
if HAS_PERSON_MANAGER and use_mediapipe:
print(f"[LocalAnalyzer] Using YOLO + person identification...")
person_result = person_manager.analyze_image(image_path, save_new_person=True)
metrics['person_count'] = person_result['total_count']
metrics['new_persons'] = person_result['new_count']
metrics['known_persons'] = person_result['known_count']
metrics['detection_source'] = 'yolo'
metrics['methods_used'] = person_result.get('methods_used', [])
metrics['person_indices'] = person_result.get('person_indices', [])
prev_person_count = self.prev_human_count
current_count = person_result['current_count']
person_count_change = current_count - prev_person_count
# 只有确认的变化才记录事件
if person_result['confirmed_change']:
metrics['person_count_change'] = person_count_change
# 记录人员事件(带序号和方法)
for person in person_result['persons']:
person_index = person.get('person_index', 1)
method = person.get('method', 'unknown')
if person['is_new']:
events.append({
'event_type': '人物活动',
'description': f'#{person_index} 新人 [{method}]',
'confidence': '',
'source': 'local',
'person_index': person_index
})
self.human_count += 1
self.person_change_count += 1
else:
events.append({
'event_type': '人物活动',
'description': f'#{person_index} {person["name"]} [{method}]',
'confidence': '',
'source': 'local',
'person_index': person_index
})
# 检测人员进出
if person_count_change > 0:
indices = person_result.get('person_indices', [])
events.append({
'event_type': '人员进出',
'description': f'#{" #".join(map(str, indices[-person_count_change:]))} 进入,当前共 {current_count}',
'confidence': '',
'source': 'local',
'person_indices': indices
})
self.person_change_count += 1
elif person_count_change < 0:
events.append({
'event_type': '人员进出',
'description': f'{abs(person_count_change)} 人离开,当前剩 {current_count}',
'confidence': '',
'source': 'local'
})
self.person_change_count += 1
self.prev_human_count = current_count
else:
# 没有确认变化,只记录当前状态
metrics['person_count_change'] = 0
if current_count > 0:
indices = person_result.get('person_indices', [])
methods = person_result.get('methods_used', [])
for i, person in enumerate(person_result['persons']):
events.append({
'event_type': '人物活动',
'description': f'#{person.get("person_index", i+1)} [{methods[i] if i < len(methods) else "unknown"}]',
'confidence': '',
'source': 'local',
'person_index': person.get('person_index', i+1)
})
# 方法2Haar Cascade 人体检测(备用或并行)
if use_haar and self.human_cascade is not None:
print(f"[LocalAnalyzer] Using Haar Cascade body detection...")
haar_result = self._detect_human(current_frame)
# 如果 MediaPipe 没检测到,用 Haar Cascade 结果
if not (HAS_PERSON_MANAGER and use_mediapipe):
metrics['human_count'] = haar_result['count']
human_count_change = haar_result['count'] - self.prev_human_count
metrics['human_count_change'] = human_count_change
if human_count_change > 0:
events.append({
'event_type': '人员进出',
'description': f'检测到 {human_count_change} 人进入Haar',
'confidence': '',
'source': 'local'
})
self.human_count += human_count_change
self.person_change_count += 1
elif human_count_change < 0:
events.append({
'event_type': '人员进出',
'description': f'检测到 {abs(human_count_change)} 人离开Haar',
'confidence': '',
'source': 'local'
})
self.person_change_count += 1
self.prev_human_count = haar_result['count']
else:
# 并行运行时,记录 Haar 结果但不作为主要判断
metrics['haar_count'] = haar_result['count']
# 3. 亮度检测
brightness_result = self._detect_brightness_change(current_gray, prev_image_path)
metrics['brightness'] = brightness_result['current']
metrics['brightness_change'] = brightness_result['change']
if brightness_result['change_detected']:
direction = "变亮" if brightness_result['change'] > 0 else "变暗"
events.append({
'event_type': '环境变化',
'description': f'光线{direction},变化 {abs(brightness_result["change"])}',
'confidence': '',
'source': 'local'
})
# 4. 背景建模(如果有足够帧数)
if self.prev_frame is not None:
bg_result = self._detect_foreground(current_gray)
metrics['foreground_ratio'] = bg_result['ratio']
if bg_result['ratio'] > 0.02:
events.append({
'event_type': '物体变化',
'description': f'前景区域占比 {bg_result["ratio"]:.1%}',
'confidence': '',
'source': 'local'
})
# 判断是否需要大模型分析
need_model = self._should_call_model(metrics, events)
# 保存当前帧供下次使用
self.prev_frame = current_gray.copy()
return {
'success': True,
'events': events,
'need_model': need_model,
'metrics': metrics,
'frame_count': self.frame_count
}
except Exception as e:
return {'success': False, 'error': str(e)}
def _detect_motion(self, current_gray, prev_image_path=None):
"""帧间差分检测运动"""
result = {'detected': False, 'ratio': 0}
# 如果有前一张图片,使用它
if prev_image_path and Path(prev_image_path).exists():
prev_frame = cv2.imread(prev_image_path)
if prev_frame is not None:
prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
prev_gray = cv2.GaussianBlur(prev_gray, (21, 21), 0)
# 计算差分
diff = cv2.absdiff(prev_gray, current_gray)
thresh = cv2.threshold(diff, 25, 255, cv2.THRESH_BINARY)[1]
# 计算变化比例
motion_pixels = np.sum(thresh > 0)
total_pixels = thresh.shape[0] * thresh.shape[1]
ratio = motion_pixels / total_pixels
result['ratio'] = ratio
result['detected'] = ratio > self.config['motion_threshold']
# 找到运动区域轮廓
contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
result['motion_regions'] = len(contours)
# 或者使用保存的前一帧
elif self.prev_frame is not None:
diff = cv2.absdiff(self.prev_frame, current_gray)
thresh = cv2.threshold(diff, 25, 255, cv2.THRESH_BINARY)[1]
motion_pixels = np.sum(thresh > 0)
total_pixels = thresh.shape[0] * thresh.shape[1]
ratio = motion_pixels / total_pixels
result['ratio'] = ratio
result['detected'] = ratio > self.config['motion_threshold']
return result
def _detect_human(self, frame):
"""检测人体"""
result = {'count': 0, 'positions': []}
try:
# 方法1HOG 检测(更准确但慢)
# regions, _ = self.hog.detectMultiScale(frame, winStride=(8,8))
# 方法2Haar Cascade更快
if self.human_cascade is not None:
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
bodies = self.human_cascade.detectMultiScale(
gray,
scaleFactor=self.config['human_scale_factor'],
minNeighbors=self.config['human_min_neighbors'],
minSize=(30, 30)
)
result['count'] = len(bodies)
result['positions'] = bodies.tolist() if len(bodies) > 0 else []
except Exception as e:
print(f"[LocalAnalyzer] Human detection error: {e}")
return result
def _detect_brightness_change(self, current_gray, prev_image_path=None):
"""检测亮度变化"""
result = {
'current': 0,
'previous': 0,
'change': 0,
'change_detected': False
}
# 计算当前亮度(平均灰度值)
result['current'] = np.mean(current_gray)
# 如果有前一张图片
if prev_image_path and Path(prev_image_path).exists():
prev_frame = cv2.imread(prev_image_path)
if prev_frame is not None:
prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
result['previous'] = np.mean(prev_gray)
result['change'] = result['current'] - result['previous']
result['change_detected'] = abs(result['change']) > self.config['brightness_change_threshold']
elif self.prev_frame is not None:
result['previous'] = np.mean(self.prev_frame)
result['change'] = result['current'] - result['previous']
result['change_detected'] = abs(result['change']) > self.config['brightness_change_threshold']
return result
def _detect_foreground(self, current_gray):
"""背景建模检测前景"""
result = {'ratio': 0}
try:
# 初始化背景模型
if self.background_model is None:
self.background_model = cv2.createBackgroundSubtractorMOG2(
history=100,
varThreshold=50,
detectShadows=True
)
# 应用背景建模
fg_mask = self.background_model.apply(current_gray)
# 计算前景比例
foreground_pixels = np.sum(fg_mask > 200) # 只统计确定的前景(排除阴影)
total_pixels = fg_mask.shape[0] * fg_mask.shape[1]
result['ratio'] = foreground_pixels / total_pixels
except Exception as e:
print(f"[LocalAnalyzer] Background modeling error: {e}")
return result
def _should_call_model(self, metrics, events):
"""判断是否需要调用大模型"""
# 检查大模型是否启用
use_vision_api = config_mgr.get('use_vision_api', False)
if not use_vision_api:
print("[LocalAnalyzer] Vision API disabled, skipping model call")
return False
# 获取触发条件配置
trigger_condition = config_mgr.get('vision_api_trigger', 'person_change')
# 条件1人员变化person_change 模式)
if trigger_condition in ['person_change', 'always']:
current_person_count = metrics.get('person_count', metrics.get('human_count', 0))
person_count_change = metrics.get('person_count_change', metrics.get('human_count_change', 0))
if abs(person_count_change) >= self.config['human_count_change_threshold']:
print(f"[LocalAnalyzer] Person change detected: {person_count_change}, triggering model")
return True
# 检测到新人
if metrics.get('new_persons', 0) > 0:
print(f"[LocalAnalyzer] New person detected: {metrics.get('new_persons', 0)}, triggering model")
return True
# 条件2运动检测motion 模式)
if trigger_condition in ['motion', 'always']:
if metrics.get('motion_ratio', 0) > self.config['trigger_model_threshold'] * 2:
print(f"[LocalAnalyzer] Large motion detected: {metrics.get('motion_ratio', 0):.2%}")
return True
# 条件3亮度变化brightness 模式)
if trigger_condition in ['brightness', 'always']:
if abs(metrics.get('brightness_change', 0)) > self.config['brightness_change_threshold'] * 2:
print(f"[LocalAnalyzer] Brightness changed: {metrics.get('brightness_change', 0)}")
return True
# always 模式:总是调用
if trigger_condition == 'always':
print("[LocalAnalyzer] Always trigger mode enabled")
return True
return False
def get_stats(self):
"""获取统计信息"""
return {
'frames_analyzed': self.frame_count,
'motion_detected': self.motion_count,
'human_detected': self.human_count,
'motion_rate': self.motion_count / max(self.frame_count, 1),
'human_rate': self.human_count / max(self.frame_count, 1)
}
def reset(self):
"""重置状态"""
self.prev_frame = None
self.background_model = None
self.prev_human_count = 0
self.frame_count = 0
self.motion_count = 0
self.human_count = 0
print("[LocalAnalyzer] Reset complete")
# 便捷函数
def analyze_local(image_path, prev_image_path=None):
"""本地分析便捷函数"""
analyzer = LocalAnalyzer()
return analyzer.analyze(image_path, prev_image_path)
if __name__ == "__main__":
# 测试
import sys
if len(sys.argv) >= 2:
test_image = sys.argv[1]
prev_image = sys.argv[2] if len(sys.argv) >= 3 else None
print(f"[Test] Analyzing: {test_image}")
if prev_image:
print(f"[Test] Previous: {prev_image}")
result = analyze_local(test_image, prev_image)
print(f"[Test] Result:")
print(f" - Events: {len(result['events'])}")
print(f" - Need model: {result['need_model']}")
print(f" - Metrics: {result['metrics']}")
for event in result['events']:
print(f" - [{event['source']}] {event['event_type']}: {event['description']}")
else:
print("Usage: python local_analyzer.py <image_path> [prev_image_path]")