508 lines
21 KiB
Python
508 lines
21 KiB
Python
"""
|
||
Local Analyzer - 本地视觉分析(无需大模型)
|
||
使用 OpenCV 传统方法进行快速检测
|
||
|
||
功能:
|
||
- 帧间差分:检测运动
|
||
- 背景建模:检测前景物体
|
||
- 人员识别:检测并识别人物(新人员自动入库)
|
||
- 亮度检测:检测光线变化
|
||
- 自动判断是否需要调用大模型
|
||
"""
|
||
import cv2
|
||
import numpy as np
|
||
from pathlib import Path
|
||
import datetime
|
||
import sys
|
||
import os
|
||
|
||
# 添加项目路径
|
||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||
|
||
from config import config_mgr
|
||
|
||
try:
|
||
from person_manager import person_manager
|
||
HAS_PERSON_MANAGER = True
|
||
except ImportError:
|
||
HAS_PERSON_MANAGER = False
|
||
print("[LocalAnalyzer] PersonManager not available")
|
||
|
||
try:
|
||
import mediapipe as mp
|
||
HAS_MEDIAPIPE = True
|
||
except ImportError:
|
||
HAS_MEDIAPIPE = False
|
||
|
||
|
||
class LocalAnalyzer:
|
||
"""本地视觉分析器"""
|
||
|
||
def __init__(self):
|
||
self.prev_frame = None
|
||
self.background_model = None
|
||
self.human_cascade = None
|
||
self.prev_human_count = 0 # 记录前一帧人数
|
||
|
||
# 初始化人体检测器
|
||
self._init_detectors()
|
||
|
||
# 阈值配置
|
||
self.config = {
|
||
'motion_threshold': 0.05, # 运动面积阈值(5%)
|
||
'human_scale_factor': 1.1, # 人体检测缩放因子
|
||
'human_min_neighbors': 3, # 人体检测最小邻居数
|
||
'brightness_change_threshold': 30, # 亮度变化阈值
|
||
'trigger_model_threshold': 0.08, # 触发大模型的阈值
|
||
'human_count_change_threshold': 1, # 人数变化阈值
|
||
}
|
||
|
||
# 统计
|
||
self.frame_count = 0
|
||
self.motion_count = 0
|
||
self.human_count = 0
|
||
self.person_change_count = 0 # 人员变化次数
|
||
|
||
def _init_detectors(self):
|
||
"""初始化检测器"""
|
||
# Haar Cascade 人体检测
|
||
try:
|
||
cascade_path = cv2.data.haarcascades + 'haarcascade_fullbody.xml'
|
||
if Path(cascade_path).exists():
|
||
self.human_cascade = cv2.CascadeClassifier(cascade_path)
|
||
print("[LocalAnalyzer] Haar Cascade body detector initialized")
|
||
except Exception as e:
|
||
print(f"[LocalAnalyzer] Haar Cascade init failed: {e}")
|
||
|
||
# MediaPipe 人脸检测状态(由 PersonManager 管理)
|
||
if HAS_PERSON_MANAGER:
|
||
print("[LocalAnalyzer] PersonManager available (MediaPipe face detection)")
|
||
|
||
def analyze(self, image_path, prev_image_path=None):
|
||
"""
|
||
分析单张图片
|
||
|
||
Args:
|
||
image_path: 当前图片路径
|
||
prev_image_path: 前一张图片路径(可选)
|
||
|
||
Returns:
|
||
dict: {
|
||
'success': bool,
|
||
'events': list, # 本地检测到的初级事件
|
||
'need_model': bool, # 是否需要大模型分析
|
||
'metrics': dict, # 各项指标
|
||
'error': str
|
||
}
|
||
"""
|
||
try:
|
||
print(f"[LocalAnalyzer] Loading image: {image_path}")
|
||
|
||
# 加载当前图片
|
||
current_frame = cv2.imread(image_path)
|
||
if current_frame is None:
|
||
return {'success': False, 'error': f'无法加载图片: {image_path}'}
|
||
|
||
self.frame_count += 1
|
||
|
||
# 转为灰度图
|
||
current_gray = cv2.cvtColor(current_frame, cv2.COLOR_BGR2GRAY)
|
||
current_gray = cv2.GaussianBlur(current_gray, (21, 21), 0)
|
||
|
||
events = []
|
||
metrics = {}
|
||
|
||
# 1. 运动检测(帧间差分)
|
||
motion_result = self._detect_motion(current_gray, prev_image_path)
|
||
metrics['motion_ratio'] = motion_result['ratio']
|
||
if motion_result['detected']:
|
||
events.append({
|
||
'event_type': '运动检测',
|
||
'description': f'画面有运动,变化区域 {motion_result["ratio"]:.1%}',
|
||
'confidence': '高',
|
||
'source': 'local'
|
||
})
|
||
self.motion_count += 1
|
||
|
||
# 2. 人员检测与识别
|
||
person_result = {'persons': [], 'total_count': 0, 'new_count': 0, 'known_count': 0}
|
||
haar_result = {'count': 0, 'positions': []}
|
||
|
||
# 获取检测算法配置
|
||
use_haar = config_mgr.get('use_haar_cascade', True)
|
||
use_mediapipe = config_mgr.get('use_mediapipe_face', True)
|
||
|
||
# 方法1:YOLO 人体检测 + 人员识别
|
||
if HAS_PERSON_MANAGER and use_mediapipe:
|
||
print(f"[LocalAnalyzer] Using YOLO + person identification...")
|
||
person_result = person_manager.analyze_image(image_path, save_new_person=True)
|
||
|
||
metrics['person_count'] = person_result['total_count']
|
||
metrics['new_persons'] = person_result['new_count']
|
||
metrics['known_persons'] = person_result['known_count']
|
||
metrics['detection_source'] = 'yolo'
|
||
metrics['methods_used'] = person_result.get('methods_used', [])
|
||
metrics['person_indices'] = person_result.get('person_indices', [])
|
||
|
||
prev_person_count = self.prev_human_count
|
||
current_count = person_result['current_count']
|
||
person_count_change = current_count - prev_person_count
|
||
|
||
# 只有确认的变化才记录事件
|
||
if person_result['confirmed_change']:
|
||
metrics['person_count_change'] = person_count_change
|
||
|
||
# 记录人员事件(带序号和方法)
|
||
for person in person_result['persons']:
|
||
person_index = person.get('person_index', 1)
|
||
method = person.get('method', 'unknown')
|
||
|
||
if person['is_new']:
|
||
events.append({
|
||
'event_type': '人物活动',
|
||
'description': f'#{person_index} 新人 [{method}]',
|
||
'confidence': '高',
|
||
'source': 'local',
|
||
'person_index': person_index
|
||
})
|
||
self.human_count += 1
|
||
self.person_change_count += 1
|
||
else:
|
||
events.append({
|
||
'event_type': '人物活动',
|
||
'description': f'#{person_index} {person["name"]} [{method}]',
|
||
'confidence': '高',
|
||
'source': 'local',
|
||
'person_index': person_index
|
||
})
|
||
|
||
# 检测人员进出
|
||
if person_count_change > 0:
|
||
indices = person_result.get('person_indices', [])
|
||
events.append({
|
||
'event_type': '人员进出',
|
||
'description': f'#{" #".join(map(str, indices[-person_count_change:]))} 进入,当前共 {current_count} 人',
|
||
'confidence': '高',
|
||
'source': 'local',
|
||
'person_indices': indices
|
||
})
|
||
self.person_change_count += 1
|
||
elif person_count_change < 0:
|
||
events.append({
|
||
'event_type': '人员进出',
|
||
'description': f'{abs(person_count_change)} 人离开,当前剩 {current_count} 人',
|
||
'confidence': '高',
|
||
'source': 'local'
|
||
})
|
||
self.person_change_count += 1
|
||
|
||
self.prev_human_count = current_count
|
||
else:
|
||
# 没有确认变化,只记录当前状态
|
||
metrics['person_count_change'] = 0
|
||
if current_count > 0:
|
||
indices = person_result.get('person_indices', [])
|
||
methods = person_result.get('methods_used', [])
|
||
for i, person in enumerate(person_result['persons']):
|
||
events.append({
|
||
'event_type': '人物活动',
|
||
'description': f'#{person.get("person_index", i+1)} [{methods[i] if i < len(methods) else "unknown"}]',
|
||
'confidence': '低',
|
||
'source': 'local',
|
||
'person_index': person.get('person_index', i+1)
|
||
})
|
||
|
||
# 方法2:Haar Cascade 人体检测(备用或并行)
|
||
if use_haar and self.human_cascade is not None:
|
||
print(f"[LocalAnalyzer] Using Haar Cascade body detection...")
|
||
haar_result = self._detect_human(current_frame)
|
||
|
||
# 如果 MediaPipe 没检测到,用 Haar Cascade 结果
|
||
if not (HAS_PERSON_MANAGER and use_mediapipe):
|
||
metrics['human_count'] = haar_result['count']
|
||
|
||
human_count_change = haar_result['count'] - self.prev_human_count
|
||
metrics['human_count_change'] = human_count_change
|
||
|
||
if human_count_change > 0:
|
||
events.append({
|
||
'event_type': '人员进出',
|
||
'description': f'检测到 {human_count_change} 人进入(Haar)',
|
||
'confidence': '中',
|
||
'source': 'local'
|
||
})
|
||
self.human_count += human_count_change
|
||
self.person_change_count += 1
|
||
elif human_count_change < 0:
|
||
events.append({
|
||
'event_type': '人员进出',
|
||
'description': f'检测到 {abs(human_count_change)} 人离开(Haar)',
|
||
'confidence': '中',
|
||
'source': 'local'
|
||
})
|
||
self.person_change_count += 1
|
||
|
||
self.prev_human_count = haar_result['count']
|
||
else:
|
||
# 并行运行时,记录 Haar 结果但不作为主要判断
|
||
metrics['haar_count'] = haar_result['count']
|
||
|
||
# 3. 亮度检测
|
||
brightness_result = self._detect_brightness_change(current_gray, prev_image_path)
|
||
metrics['brightness'] = brightness_result['current']
|
||
metrics['brightness_change'] = brightness_result['change']
|
||
if brightness_result['change_detected']:
|
||
direction = "变亮" if brightness_result['change'] > 0 else "变暗"
|
||
events.append({
|
||
'event_type': '环境变化',
|
||
'description': f'光线{direction},变化 {abs(brightness_result["change"])} 点',
|
||
'confidence': '高',
|
||
'source': 'local'
|
||
})
|
||
|
||
# 4. 背景建模(如果有足够帧数)
|
||
if self.prev_frame is not None:
|
||
bg_result = self._detect_foreground(current_gray)
|
||
metrics['foreground_ratio'] = bg_result['ratio']
|
||
if bg_result['ratio'] > 0.02:
|
||
events.append({
|
||
'event_type': '物体变化',
|
||
'description': f'前景区域占比 {bg_result["ratio"]:.1%}',
|
||
'confidence': '中',
|
||
'source': 'local'
|
||
})
|
||
|
||
# 判断是否需要大模型分析
|
||
need_model = self._should_call_model(metrics, events)
|
||
|
||
# 保存当前帧供下次使用
|
||
self.prev_frame = current_gray.copy()
|
||
|
||
return {
|
||
'success': True,
|
||
'events': events,
|
||
'need_model': need_model,
|
||
'metrics': metrics,
|
||
'frame_count': self.frame_count
|
||
}
|
||
|
||
except Exception as e:
|
||
return {'success': False, 'error': str(e)}
|
||
|
||
def _detect_motion(self, current_gray, prev_image_path=None):
|
||
"""帧间差分检测运动"""
|
||
result = {'detected': False, 'ratio': 0}
|
||
|
||
# 如果有前一张图片,使用它
|
||
if prev_image_path and Path(prev_image_path).exists():
|
||
prev_frame = cv2.imread(prev_image_path)
|
||
if prev_frame is not None:
|
||
prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
|
||
prev_gray = cv2.GaussianBlur(prev_gray, (21, 21), 0)
|
||
|
||
# 计算差分
|
||
diff = cv2.absdiff(prev_gray, current_gray)
|
||
thresh = cv2.threshold(diff, 25, 255, cv2.THRESH_BINARY)[1]
|
||
|
||
# 计算变化比例
|
||
motion_pixels = np.sum(thresh > 0)
|
||
total_pixels = thresh.shape[0] * thresh.shape[1]
|
||
ratio = motion_pixels / total_pixels
|
||
|
||
result['ratio'] = ratio
|
||
result['detected'] = ratio > self.config['motion_threshold']
|
||
|
||
# 找到运动区域轮廓
|
||
contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||
result['motion_regions'] = len(contours)
|
||
|
||
# 或者使用保存的前一帧
|
||
elif self.prev_frame is not None:
|
||
diff = cv2.absdiff(self.prev_frame, current_gray)
|
||
thresh = cv2.threshold(diff, 25, 255, cv2.THRESH_BINARY)[1]
|
||
|
||
motion_pixels = np.sum(thresh > 0)
|
||
total_pixels = thresh.shape[0] * thresh.shape[1]
|
||
ratio = motion_pixels / total_pixels
|
||
|
||
result['ratio'] = ratio
|
||
result['detected'] = ratio > self.config['motion_threshold']
|
||
|
||
return result
|
||
|
||
def _detect_human(self, frame):
|
||
"""检测人体"""
|
||
result = {'count': 0, 'positions': []}
|
||
|
||
try:
|
||
# 方法1:HOG 检测(更准确但慢)
|
||
# regions, _ = self.hog.detectMultiScale(frame, winStride=(8,8))
|
||
|
||
# 方法2:Haar Cascade(更快)
|
||
if self.human_cascade is not None:
|
||
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
||
bodies = self.human_cascade.detectMultiScale(
|
||
gray,
|
||
scaleFactor=self.config['human_scale_factor'],
|
||
minNeighbors=self.config['human_min_neighbors'],
|
||
minSize=(30, 30)
|
||
)
|
||
|
||
result['count'] = len(bodies)
|
||
result['positions'] = bodies.tolist() if len(bodies) > 0 else []
|
||
|
||
except Exception as e:
|
||
print(f"[LocalAnalyzer] Human detection error: {e}")
|
||
|
||
return result
|
||
|
||
def _detect_brightness_change(self, current_gray, prev_image_path=None):
|
||
"""检测亮度变化"""
|
||
result = {
|
||
'current': 0,
|
||
'previous': 0,
|
||
'change': 0,
|
||
'change_detected': False
|
||
}
|
||
|
||
# 计算当前亮度(平均灰度值)
|
||
result['current'] = np.mean(current_gray)
|
||
|
||
# 如果有前一张图片
|
||
if prev_image_path and Path(prev_image_path).exists():
|
||
prev_frame = cv2.imread(prev_image_path)
|
||
if prev_frame is not None:
|
||
prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
|
||
result['previous'] = np.mean(prev_gray)
|
||
result['change'] = result['current'] - result['previous']
|
||
result['change_detected'] = abs(result['change']) > self.config['brightness_change_threshold']
|
||
|
||
elif self.prev_frame is not None:
|
||
result['previous'] = np.mean(self.prev_frame)
|
||
result['change'] = result['current'] - result['previous']
|
||
result['change_detected'] = abs(result['change']) > self.config['brightness_change_threshold']
|
||
|
||
return result
|
||
|
||
def _detect_foreground(self, current_gray):
|
||
"""背景建模检测前景"""
|
||
result = {'ratio': 0}
|
||
|
||
try:
|
||
# 初始化背景模型
|
||
if self.background_model is None:
|
||
self.background_model = cv2.createBackgroundSubtractorMOG2(
|
||
history=100,
|
||
varThreshold=50,
|
||
detectShadows=True
|
||
)
|
||
|
||
# 应用背景建模
|
||
fg_mask = self.background_model.apply(current_gray)
|
||
|
||
# 计算前景比例
|
||
foreground_pixels = np.sum(fg_mask > 200) # 只统计确定的前景(排除阴影)
|
||
total_pixels = fg_mask.shape[0] * fg_mask.shape[1]
|
||
result['ratio'] = foreground_pixels / total_pixels
|
||
|
||
except Exception as e:
|
||
print(f"[LocalAnalyzer] Background modeling error: {e}")
|
||
|
||
return result
|
||
|
||
def _should_call_model(self, metrics, events):
|
||
"""判断是否需要调用大模型"""
|
||
|
||
# 检查大模型是否启用
|
||
use_vision_api = config_mgr.get('use_vision_api', False)
|
||
if not use_vision_api:
|
||
print("[LocalAnalyzer] Vision API disabled, skipping model call")
|
||
return False
|
||
|
||
# 获取触发条件配置
|
||
trigger_condition = config_mgr.get('vision_api_trigger', 'person_change')
|
||
|
||
# 条件1:人员变化(person_change 模式)
|
||
if trigger_condition in ['person_change', 'always']:
|
||
current_person_count = metrics.get('person_count', metrics.get('human_count', 0))
|
||
person_count_change = metrics.get('person_count_change', metrics.get('human_count_change', 0))
|
||
|
||
if abs(person_count_change) >= self.config['human_count_change_threshold']:
|
||
print(f"[LocalAnalyzer] Person change detected: {person_count_change}, triggering model")
|
||
return True
|
||
|
||
# 检测到新人
|
||
if metrics.get('new_persons', 0) > 0:
|
||
print(f"[LocalAnalyzer] New person detected: {metrics.get('new_persons', 0)}, triggering model")
|
||
return True
|
||
|
||
# 条件2:运动检测(motion 模式)
|
||
if trigger_condition in ['motion', 'always']:
|
||
if metrics.get('motion_ratio', 0) > self.config['trigger_model_threshold'] * 2:
|
||
print(f"[LocalAnalyzer] Large motion detected: {metrics.get('motion_ratio', 0):.2%}")
|
||
return True
|
||
|
||
# 条件3:亮度变化(brightness 模式)
|
||
if trigger_condition in ['brightness', 'always']:
|
||
if abs(metrics.get('brightness_change', 0)) > self.config['brightness_change_threshold'] * 2:
|
||
print(f"[LocalAnalyzer] Brightness changed: {metrics.get('brightness_change', 0)}")
|
||
return True
|
||
|
||
# always 模式:总是调用
|
||
if trigger_condition == 'always':
|
||
print("[LocalAnalyzer] Always trigger mode enabled")
|
||
return True
|
||
|
||
return False
|
||
|
||
def get_stats(self):
|
||
"""获取统计信息"""
|
||
return {
|
||
'frames_analyzed': self.frame_count,
|
||
'motion_detected': self.motion_count,
|
||
'human_detected': self.human_count,
|
||
'motion_rate': self.motion_count / max(self.frame_count, 1),
|
||
'human_rate': self.human_count / max(self.frame_count, 1)
|
||
}
|
||
|
||
def reset(self):
|
||
"""重置状态"""
|
||
self.prev_frame = None
|
||
self.background_model = None
|
||
self.prev_human_count = 0
|
||
self.frame_count = 0
|
||
self.motion_count = 0
|
||
self.human_count = 0
|
||
print("[LocalAnalyzer] Reset complete")
|
||
|
||
|
||
# 便捷函数
|
||
def analyze_local(image_path, prev_image_path=None):
|
||
"""本地分析便捷函数"""
|
||
analyzer = LocalAnalyzer()
|
||
return analyzer.analyze(image_path, prev_image_path)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# 测试
|
||
import sys
|
||
|
||
if len(sys.argv) >= 2:
|
||
test_image = sys.argv[1]
|
||
prev_image = sys.argv[2] if len(sys.argv) >= 3 else None
|
||
|
||
print(f"[Test] Analyzing: {test_image}")
|
||
if prev_image:
|
||
print(f"[Test] Previous: {prev_image}")
|
||
|
||
result = analyze_local(test_image, prev_image)
|
||
|
||
print(f"[Test] Result:")
|
||
print(f" - Events: {len(result['events'])}")
|
||
print(f" - Need model: {result['need_model']}")
|
||
print(f" - Metrics: {result['metrics']}")
|
||
|
||
for event in result['events']:
|
||
print(f" - [{event['source']}] {event['event_type']}: {event['description']}")
|
||
else:
|
||
print("Usage: python local_analyzer.py <image_path> [prev_image_path]") |