349 lines
13 KiB
Python
349 lines
13 KiB
Python
"""
|
||
Local Analyzer - 本地视觉分析(无需大模型)
|
||
使用 OpenCV 传统方法进行快速检测
|
||
|
||
功能:
|
||
- 帧间差分:检测运动
|
||
- 背景建模:检测前景物体
|
||
- 人体检测:检测人员进出
|
||
- 亮度检测:检测光线变化
|
||
- 自动判断是否需要调用大模型
|
||
"""
|
||
import cv2
|
||
import numpy as np
|
||
from pathlib import Path
|
||
import datetime
|
||
|
||
|
||
class LocalAnalyzer:
|
||
"""本地视觉分析器"""
|
||
|
||
def __init__(self):
|
||
self.prev_frame = None
|
||
self.background_model = None
|
||
self.human_cascade = None
|
||
|
||
# 初始化人体检测器
|
||
self._init_human_detector()
|
||
|
||
# 阈值配置
|
||
self.config = {
|
||
'motion_threshold': 0.05, # 运动面积阈值(5%)
|
||
'human_scale_factor': 1.1, # 人体检测缩放因子
|
||
'human_min_neighbors': 3, # 人体检测最小邻居数
|
||
'brightness_change_threshold': 30, # 亮度变化阈值
|
||
'trigger_model_threshold': 0.08, # 触发大模型的阈值
|
||
}
|
||
|
||
# 统计
|
||
self.frame_count = 0
|
||
self.motion_count = 0
|
||
self.human_count = 0
|
||
|
||
def _init_human_detector(self):
|
||
"""初始化人体检测器"""
|
||
try:
|
||
# 使用 OpenCV 内置的 HOG 人体检测器
|
||
self.hog = cv2.HOGDescriptor()
|
||
self.hog.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())
|
||
|
||
# 或者使用 Haar Cascade(更快但精度稍低)
|
||
cascade_path = cv2.data.haarcascades + 'haarcascade_fullbody.xml'
|
||
if Path(cascade_path).exists():
|
||
self.human_cascade = cv2.CascadeClassifier(cascade_path)
|
||
|
||
print("[LocalAnalyzer] Human detector initialized")
|
||
except Exception as e:
|
||
print(f"[LocalAnalyzer] Human detector init failed: {e}")
|
||
|
||
def analyze(self, image_path, prev_image_path=None):
|
||
"""
|
||
分析单张图片
|
||
|
||
Args:
|
||
image_path: 当前图片路径
|
||
prev_image_path: 前一张图片路径(可选)
|
||
|
||
Returns:
|
||
dict: {
|
||
'success': bool,
|
||
'events': list, # 本地检测到的初级事件
|
||
'need_model': bool, # 是否需要大模型分析
|
||
'metrics': dict, # 各项指标
|
||
'error': str
|
||
}
|
||
"""
|
||
try:
|
||
# 加载当前图片
|
||
current_frame = cv2.imread(image_path)
|
||
if current_frame is None:
|
||
return {'success': False, 'error': f'无法加载图片: {image_path}'}
|
||
|
||
self.frame_count += 1
|
||
|
||
# 转为灰度图
|
||
current_gray = cv2.cvtColor(current_frame, cv2.COLOR_BGR2GRAY)
|
||
current_gray = cv2.GaussianBlur(current_gray, (21, 21), 0)
|
||
|
||
events = []
|
||
metrics = {}
|
||
|
||
# 1. 运动检测(帧间差分)
|
||
motion_result = self._detect_motion(current_gray, prev_image_path)
|
||
metrics['motion_ratio'] = motion_result['ratio']
|
||
if motion_result['detected']:
|
||
events.append({
|
||
'event_type': '运动检测',
|
||
'description': f'画面有运动,变化区域 {motion_result["ratio"]:.1%}',
|
||
'confidence': '高',
|
||
'source': 'local'
|
||
})
|
||
self.motion_count += 1
|
||
|
||
# 2. 人体检测
|
||
human_result = self._detect_human(current_frame)
|
||
metrics['human_count'] = human_result['count']
|
||
if human_result['count'] > 0:
|
||
events.append({
|
||
'event_type': '人物活动',
|
||
'description': f'检测到 {human_result["count"]} 个人',
|
||
'confidence': '中',
|
||
'source': 'local'
|
||
})
|
||
self.human_count += 1
|
||
|
||
# 3. 亮度检测
|
||
brightness_result = self._detect_brightness_change(current_gray, prev_image_path)
|
||
metrics['brightness'] = brightness_result['current']
|
||
metrics['brightness_change'] = brightness_result['change']
|
||
if brightness_result['change_detected']:
|
||
direction = "变亮" if brightness_result['change'] > 0 else "变暗"
|
||
events.append({
|
||
'event_type': '环境变化',
|
||
'description': f'光线{direction},变化 {abs(brightness_result["change"])} 点',
|
||
'confidence': '高',
|
||
'source': 'local'
|
||
})
|
||
|
||
# 4. 背景建模(如果有足够帧数)
|
||
if self.prev_frame is not None:
|
||
bg_result = self._detect_foreground(current_gray)
|
||
metrics['foreground_ratio'] = bg_result['ratio']
|
||
if bg_result['ratio'] > 0.02:
|
||
events.append({
|
||
'event_type': '物体变化',
|
||
'description': f'前景区域占比 {bg_result["ratio"]:.1%}',
|
||
'confidence': '中',
|
||
'source': 'local'
|
||
})
|
||
|
||
# 判断是否需要大模型分析
|
||
need_model = self._should_call_model(metrics, events)
|
||
|
||
# 保存当前帧供下次使用
|
||
self.prev_frame = current_gray.copy()
|
||
|
||
return {
|
||
'success': True,
|
||
'events': events,
|
||
'need_model': need_model,
|
||
'metrics': metrics,
|
||
'frame_count': self.frame_count
|
||
}
|
||
|
||
except Exception as e:
|
||
return {'success': False, 'error': str(e)}
|
||
|
||
def _detect_motion(self, current_gray, prev_image_path=None):
|
||
"""帧间差分检测运动"""
|
||
result = {'detected': False, 'ratio': 0}
|
||
|
||
# 如果有前一张图片,使用它
|
||
if prev_image_path and Path(prev_image_path).exists():
|
||
prev_frame = cv2.imread(prev_image_path)
|
||
if prev_frame is not None:
|
||
prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
|
||
prev_gray = cv2.GaussianBlur(prev_gray, (21, 21), 0)
|
||
|
||
# 计算差分
|
||
diff = cv2.absdiff(prev_gray, current_gray)
|
||
thresh = cv2.threshold(diff, 25, 255, cv2.THRESH_BINARY)[1]
|
||
|
||
# 计算变化比例
|
||
motion_pixels = np.sum(thresh > 0)
|
||
total_pixels = thresh.shape[0] * thresh.shape[1]
|
||
ratio = motion_pixels / total_pixels
|
||
|
||
result['ratio'] = ratio
|
||
result['detected'] = ratio > self.config['motion_threshold']
|
||
|
||
# 找到运动区域轮廓
|
||
contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||
result['motion_regions'] = len(contours)
|
||
|
||
# 或者使用保存的前一帧
|
||
elif self.prev_frame is not None:
|
||
diff = cv2.absdiff(self.prev_frame, current_gray)
|
||
thresh = cv2.threshold(diff, 25, 255, cv2.THRESH_BINARY)[1]
|
||
|
||
motion_pixels = np.sum(thresh > 0)
|
||
total_pixels = thresh.shape[0] * thresh.shape[1]
|
||
ratio = motion_pixels / total_pixels
|
||
|
||
result['ratio'] = ratio
|
||
result['detected'] = ratio > self.config['motion_threshold']
|
||
|
||
return result
|
||
|
||
def _detect_human(self, frame):
|
||
"""检测人体"""
|
||
result = {'count': 0, 'positions': []}
|
||
|
||
try:
|
||
# 方法1:HOG 检测(更准确但慢)
|
||
# regions, _ = self.hog.detectMultiScale(frame, winStride=(8,8))
|
||
|
||
# 方法2:Haar Cascade(更快)
|
||
if self.human_cascade is not None:
|
||
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
||
bodies = self.human_cascade.detectMultiScale(
|
||
gray,
|
||
scaleFactor=self.config['human_scale_factor'],
|
||
minNeighbors=self.config['human_min_neighbors'],
|
||
minSize=(30, 30)
|
||
)
|
||
|
||
result['count'] = len(bodies)
|
||
result['positions'] = bodies.tolist() if len(bodies) > 0 else []
|
||
|
||
except Exception as e:
|
||
print(f"[LocalAnalyzer] Human detection error: {e}")
|
||
|
||
return result
|
||
|
||
def _detect_brightness_change(self, current_gray, prev_image_path=None):
|
||
"""检测亮度变化"""
|
||
result = {
|
||
'current': 0,
|
||
'previous': 0,
|
||
'change': 0,
|
||
'change_detected': False
|
||
}
|
||
|
||
# 计算当前亮度(平均灰度值)
|
||
result['current'] = np.mean(current_gray)
|
||
|
||
# 如果有前一张图片
|
||
if prev_image_path and Path(prev_image_path).exists():
|
||
prev_frame = cv2.imread(prev_image_path)
|
||
if prev_frame is not None:
|
||
prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
|
||
result['previous'] = np.mean(prev_gray)
|
||
result['change'] = result['current'] - result['previous']
|
||
result['change_detected'] = abs(result['change']) > self.config['brightness_change_threshold']
|
||
|
||
elif self.prev_frame is not None:
|
||
result['previous'] = np.mean(self.prev_frame)
|
||
result['change'] = result['current'] - result['previous']
|
||
result['change_detected'] = abs(result['change']) > self.config['brightness_change_threshold']
|
||
|
||
return result
|
||
|
||
def _detect_foreground(self, current_gray):
|
||
"""背景建模检测前景"""
|
||
result = {'ratio': 0}
|
||
|
||
try:
|
||
# 初始化背景模型
|
||
if self.background_model is None:
|
||
self.background_model = cv2.createBackgroundSubtractorMOG2(
|
||
history=100,
|
||
varThreshold=50,
|
||
detectShadows=True
|
||
)
|
||
|
||
# 应用背景建模
|
||
fg_mask = self.background_model.apply(current_gray)
|
||
|
||
# 计算前景比例
|
||
foreground_pixels = np.sum(fg_mask > 200) # 只统计确定的前景(排除阴影)
|
||
total_pixels = fg_mask.shape[0] * fg_mask.shape[1]
|
||
result['ratio'] = foreground_pixels / total_pixels
|
||
|
||
except Exception as e:
|
||
print(f"[LocalAnalyzer] Background modeling error: {e}")
|
||
|
||
return result
|
||
|
||
def _should_call_model(self, metrics, events):
|
||
"""判断是否需要调用大模型"""
|
||
|
||
# 条件1:运动面积超过阈值
|
||
if metrics.get('motion_ratio', 0) > self.config['trigger_model_threshold']:
|
||
return True
|
||
|
||
# 条件2:检测到人
|
||
if metrics.get('human_count', 0) > 0:
|
||
return True
|
||
|
||
# 条件3:亮度大幅变化
|
||
if abs(metrics.get('brightness_change', 0)) > self.config['brightness_change_threshold'] * 2:
|
||
return True
|
||
|
||
# 条件4:有多个事件类型
|
||
event_types = set(e['event_type'] for e in events)
|
||
if len(event_types) >= 2:
|
||
return True
|
||
|
||
return False
|
||
|
||
def get_stats(self):
|
||
"""获取统计信息"""
|
||
return {
|
||
'frames_analyzed': self.frame_count,
|
||
'motion_detected': self.motion_count,
|
||
'human_detected': self.human_count,
|
||
'motion_rate': self.motion_count / max(self.frame_count, 1),
|
||
'human_rate': self.human_count / max(self.frame_count, 1)
|
||
}
|
||
|
||
def reset(self):
|
||
"""重置状态"""
|
||
self.prev_frame = None
|
||
self.background_model = None
|
||
self.frame_count = 0
|
||
self.motion_count = 0
|
||
self.human_count = 0
|
||
print("[LocalAnalyzer] Reset complete")
|
||
|
||
|
||
# 便捷函数
|
||
def analyze_local(image_path, prev_image_path=None):
|
||
"""本地分析便捷函数"""
|
||
analyzer = LocalAnalyzer()
|
||
return analyzer.analyze(image_path, prev_image_path)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# 测试
|
||
import sys
|
||
|
||
if len(sys.argv) >= 2:
|
||
test_image = sys.argv[1]
|
||
prev_image = sys.argv[2] if len(sys.argv) >= 3 else None
|
||
|
||
print(f"[Test] Analyzing: {test_image}")
|
||
if prev_image:
|
||
print(f"[Test] Previous: {prev_image}")
|
||
|
||
result = analyze_local(test_image, prev_image)
|
||
|
||
print(f"[Test] Result:")
|
||
print(f" - Events: {len(result['events'])}")
|
||
print(f" - Need model: {result['need_model']}")
|
||
print(f" - Metrics: {result['metrics']}")
|
||
|
||
for event in result['events']:
|
||
print(f" - [{event['source']}] {event['event_type']}: {event['description']}")
|
||
else:
|
||
print("Usage: python local_analyzer.py <image_path> [prev_image_path]") |