Files
vision-record/local_analyzer.py

351 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Local Analyzer - 本地视觉分析(无需大模型)
使用 OpenCV 传统方法进行快速检测
功能:
- 帧间差分:检测运动
- 背景建模:检测前景物体
- 人体检测:检测人员进出
- 亮度检测:检测光线变化
- 自动判断是否需要调用大模型
"""
import cv2
import numpy as np
from pathlib import Path
import datetime
class LocalAnalyzer:
"""本地视觉分析器"""
def __init__(self):
self.prev_frame = None
self.background_model = None
self.human_cascade = None
# 初始化人体检测器
self._init_human_detector()
# 阈值配置
self.config = {
'motion_threshold': 0.05, # 运动面积阈值5%
'human_scale_factor': 1.1, # 人体检测缩放因子
'human_min_neighbors': 3, # 人体检测最小邻居数
'brightness_change_threshold': 30, # 亮度变化阈值
'trigger_model_threshold': 0.08, # 触发大模型的阈值
}
# 统计
self.frame_count = 0
self.motion_count = 0
self.human_count = 0
def _init_human_detector(self):
"""初始化人体检测器"""
try:
# 使用 OpenCV 内置的 HOG 人体检测器
self.hog = cv2.HOGDescriptor()
self.hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())
# 或者使用 Haar Cascade更快但精度稍低
cascade_path = cv2.data.haarcascades + 'haarcascade_fullbody.xml'
if Path(cascade_path).exists():
self.human_cascade = cv2.CascadeClassifier(cascade_path)
print("[LocalAnalyzer] Human detector initialized")
except Exception as e:
print(f"[LocalAnalyzer] Human detector init failed: {e}")
def analyze(self, image_path, prev_image_path=None):
"""
分析单张图片
Args:
image_path: 当前图片路径
prev_image_path: 前一张图片路径(可选)
Returns:
dict: {
'success': bool,
'events': list, # 本地检测到的初级事件
'need_model': bool, # 是否需要大模型分析
'metrics': dict, # 各项指标
'error': str
}
"""
try:
print(f"[LocalAnalyzer] Loading image: {image_path}")
# 加载当前图片
current_frame = cv2.imread(image_path)
if current_frame is None:
return {'success': False, 'error': f'无法加载图片: {image_path}'}
self.frame_count += 1
# 转为灰度图
current_gray = cv2.cvtColor(current_frame, cv2.COLOR_BGR2GRAY)
current_gray = cv2.GaussianBlur(current_gray, (21, 21), 0)
events = []
metrics = {}
# 1. 运动检测(帧间差分)
motion_result = self._detect_motion(current_gray, prev_image_path)
metrics['motion_ratio'] = motion_result['ratio']
if motion_result['detected']:
events.append({
'event_type': '运动检测',
'description': f'画面有运动,变化区域 {motion_result["ratio"]:.1%}',
'confidence': '',
'source': 'local'
})
self.motion_count += 1
# 2. 人体检测
human_result = self._detect_human(current_frame)
metrics['human_count'] = human_result['count']
if human_result['count'] > 0:
events.append({
'event_type': '人物活动',
'description': f'检测到 {human_result["count"]} 个人',
'confidence': '',
'source': 'local'
})
self.human_count += 1
# 3. 亮度检测
brightness_result = self._detect_brightness_change(current_gray, prev_image_path)
metrics['brightness'] = brightness_result['current']
metrics['brightness_change'] = brightness_result['change']
if brightness_result['change_detected']:
direction = "变亮" if brightness_result['change'] > 0 else "变暗"
events.append({
'event_type': '环境变化',
'description': f'光线{direction},变化 {abs(brightness_result["change"])}',
'confidence': '',
'source': 'local'
})
# 4. 背景建模(如果有足够帧数)
if self.prev_frame is not None:
bg_result = self._detect_foreground(current_gray)
metrics['foreground_ratio'] = bg_result['ratio']
if bg_result['ratio'] > 0.02:
events.append({
'event_type': '物体变化',
'description': f'前景区域占比 {bg_result["ratio"]:.1%}',
'confidence': '',
'source': 'local'
})
# 判断是否需要大模型分析
need_model = self._should_call_model(metrics, events)
# 保存当前帧供下次使用
self.prev_frame = current_gray.copy()
return {
'success': True,
'events': events,
'need_model': need_model,
'metrics': metrics,
'frame_count': self.frame_count
}
except Exception as e:
return {'success': False, 'error': str(e)}
def _detect_motion(self, current_gray, prev_image_path=None):
"""帧间差分检测运动"""
result = {'detected': False, 'ratio': 0}
# 如果有前一张图片,使用它
if prev_image_path and Path(prev_image_path).exists():
prev_frame = cv2.imread(prev_image_path)
if prev_frame is not None:
prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
prev_gray = cv2.GaussianBlur(prev_gray, (21, 21), 0)
# 计算差分
diff = cv2.absdiff(prev_gray, current_gray)
thresh = cv2.threshold(diff, 25, 255, cv2.THRESH_BINARY)[1]
# 计算变化比例
motion_pixels = np.sum(thresh > 0)
total_pixels = thresh.shape[0] * thresh.shape[1]
ratio = motion_pixels / total_pixels
result['ratio'] = ratio
result['detected'] = ratio > self.config['motion_threshold']
# 找到运动区域轮廓
contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
result['motion_regions'] = len(contours)
# 或者使用保存的前一帧
elif self.prev_frame is not None:
diff = cv2.absdiff(self.prev_frame, current_gray)
thresh = cv2.threshold(diff, 25, 255, cv2.THRESH_BINARY)[1]
motion_pixels = np.sum(thresh > 0)
total_pixels = thresh.shape[0] * thresh.shape[1]
ratio = motion_pixels / total_pixels
result['ratio'] = ratio
result['detected'] = ratio > self.config['motion_threshold']
return result
def _detect_human(self, frame):
"""检测人体"""
result = {'count': 0, 'positions': []}
try:
# 方法1HOG 检测(更准确但慢)
# regions, _ = self.hog.detectMultiScale(frame, winStride=(8,8))
# 方法2Haar Cascade更快
if self.human_cascade is not None:
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
bodies = self.human_cascade.detectMultiScale(
gray,
scaleFactor=self.config['human_scale_factor'],
minNeighbors=self.config['human_min_neighbors'],
minSize=(30, 30)
)
result['count'] = len(bodies)
result['positions'] = bodies.tolist() if len(bodies) > 0 else []
except Exception as e:
print(f"[LocalAnalyzer] Human detection error: {e}")
return result
def _detect_brightness_change(self, current_gray, prev_image_path=None):
"""检测亮度变化"""
result = {
'current': 0,
'previous': 0,
'change': 0,
'change_detected': False
}
# 计算当前亮度(平均灰度值)
result['current'] = np.mean(current_gray)
# 如果有前一张图片
if prev_image_path and Path(prev_image_path).exists():
prev_frame = cv2.imread(prev_image_path)
if prev_frame is not None:
prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
result['previous'] = np.mean(prev_gray)
result['change'] = result['current'] - result['previous']
result['change_detected'] = abs(result['change']) > self.config['brightness_change_threshold']
elif self.prev_frame is not None:
result['previous'] = np.mean(self.prev_frame)
result['change'] = result['current'] - result['previous']
result['change_detected'] = abs(result['change']) > self.config['brightness_change_threshold']
return result
def _detect_foreground(self, current_gray):
"""背景建模检测前景"""
result = {'ratio': 0}
try:
# 初始化背景模型
if self.background_model is None:
self.background_model = cv2.createBackgroundSubtractorMOG2(
history=100,
varThreshold=50,
detectShadows=True
)
# 应用背景建模
fg_mask = self.background_model.apply(current_gray)
# 计算前景比例
foreground_pixels = np.sum(fg_mask > 200) # 只统计确定的前景(排除阴影)
total_pixels = fg_mask.shape[0] * fg_mask.shape[1]
result['ratio'] = foreground_pixels / total_pixels
except Exception as e:
print(f"[LocalAnalyzer] Background modeling error: {e}")
return result
def _should_call_model(self, metrics, events):
"""判断是否需要调用大模型"""
# 条件1运动面积超过阈值
if metrics.get('motion_ratio', 0) > self.config['trigger_model_threshold']:
return True
# 条件2检测到人
if metrics.get('human_count', 0) > 0:
return True
# 条件3亮度大幅变化
if abs(metrics.get('brightness_change', 0)) > self.config['brightness_change_threshold'] * 2:
return True
# 条件4有多个事件类型
event_types = set(e['event_type'] for e in events)
if len(event_types) >= 2:
return True
return False
def get_stats(self):
"""获取统计信息"""
return {
'frames_analyzed': self.frame_count,
'motion_detected': self.motion_count,
'human_detected': self.human_count,
'motion_rate': self.motion_count / max(self.frame_count, 1),
'human_rate': self.human_count / max(self.frame_count, 1)
}
def reset(self):
"""重置状态"""
self.prev_frame = None
self.background_model = None
self.frame_count = 0
self.motion_count = 0
self.human_count = 0
print("[LocalAnalyzer] Reset complete")
# 便捷函数
def analyze_local(image_path, prev_image_path=None):
"""本地分析便捷函数"""
analyzer = LocalAnalyzer()
return analyzer.analyze(image_path, prev_image_path)
if __name__ == "__main__":
# 测试
import sys
if len(sys.argv) >= 2:
test_image = sys.argv[1]
prev_image = sys.argv[2] if len(sys.argv) >= 3 else None
print(f"[Test] Analyzing: {test_image}")
if prev_image:
print(f"[Test] Previous: {prev_image}")
result = analyze_local(test_image, prev_image)
print(f"[Test] Result:")
print(f" - Events: {len(result['events'])}")
print(f" - Need model: {result['need_model']}")
print(f" - Metrics: {result['metrics']}")
for event in result['events']:
print(f" - [{event['source']}] {event['event_type']}: {event['description']}")
else:
print("Usage: python local_analyzer.py <image_path> [prev_image_path]")