fix: 摄像头每次拍照后立即关闭释放资源,添加全黑检测和重试机制

This commit is contained in:
2026-04-17 20:45:41 +08:00
parent 70a71347bd
commit e75425ac14

163
camera.py
View File

@@ -3,6 +3,7 @@
""" """
import cv2 import cv2
import datetime import datetime
import time
from pathlib import Path from pathlib import Path
from config import config_mgr from config import config_mgr
@@ -13,17 +14,30 @@ class CameraCapture:
def __init__(self): def __init__(self):
self.camera_index = config_mgr.get('camera_index', 0) self.camera_index = config_mgr.get('camera_index', 0)
self.cap = None self.cap = None
self._closed = True
self._release_time = 0
def open(self): def open(self):
"""打开摄像头""" """打开摄像头"""
if self.cap is None or not self.cap.isOpened(): if self.cap is not None and self.cap.isOpened():
self.camera_index = config_mgr.get('camera_index', 0) return True
self.cap = cv2.VideoCapture(self.camera_index, cv2.CAP_DSHOW) # Windows DirectShow
if not self.cap.isOpened(): # 确保之前的摄像头已完全关闭
raise Exception(f"无法打开摄像头 {self.camera_index}") if not self._closed:
# 设置分辨率 self.close()
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) import time
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) time.sleep(0.5) # 等待摄像头完全释放
self.camera_index = config_mgr.get('camera_index', 0)
self.cap = cv2.VideoCapture(self.camera_index, cv2.CAP_DSHOW) # Windows DirectShow
if not self.cap.isOpened():
raise Exception(f"无法打开摄像头 {self.camera_index}")
# 设置分辨率
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
self._closed = False
return self.cap.isOpened() return self.cap.isOpened()
def close(self): def close(self):
@@ -31,49 +45,102 @@ class CameraCapture:
if self.cap is not None: if self.cap is not None:
self.cap.release() self.cap.release()
self.cap = None self.cap = None
self._closed = True
self._release_time = time.time()
def capture(self, save_path=None): def capture(self, save_path=None, retry_count=3):
"""拍照并保存(按日期文件夹组织 """拍照并保存(每次拍照后自动关闭摄像头
Args:
save_path: 保存路径
retry_count: 重试次数
Returns: Returns:
dict: {'success': bool, 'path': str, 'timestamp': str, 'date_folder': str, 'error': str} dict: {'success': bool, 'path': str, 'timestamp': str, 'date_folder': str, 'error': str}
""" """
try: for attempt in range(retry_count):
self.open() try:
# 每次拍照都重新打开摄像头
# 等待摄像头稳定Windows 上需要预热) self.open()
for _ in range(5):
self.cap.read() # 等待摄像头稳定Windows 上需要预热)
time.sleep(0.5)
ret, frame = self.cap.read()
if not ret: # 读取帧(丢弃前几帧确保稳定)
return {'success': False, 'error': '无法捕获图像'} for _ in range(5):
ret, _ = self.cap.read()
# 获取当前日期 if not ret:
now = datetime.datetime.now() self.close()
date_str = now.strftime('%Y-%m-%d') if attempt < retry_count - 1:
timestamp = now.strftime('%Y%m%d_%H%M%S') print(f"[Camera] 预热失败,重试 {attempt + 2}/{retry_count}")
time.sleep(1)
# 获取按日期的保存目录 continue
date_folder = config_mgr.get_images_dir(date_str) return {'success': False, 'error': '摄像头预热失败'}
if save_path is None: # 拍照
filename = f"capture_{timestamp}.jpg" ret, frame = self.cap.read()
save_path = date_folder / filename
# 立即关闭摄像头释放资源
# 保存图片 self.close()
cv2.imwrite(str(save_path), frame, [cv2.IMWRITE_JPEG_QUALITY, 90])
if not ret or frame is None:
return { if attempt < retry_count - 1:
'success': True, print(f"[Camera] 捕获失败,重试 {attempt + 2}/{retry_count}")
'path': str(save_path), time.sleep(1)
'timestamp': timestamp, continue
'date_folder': date_str, return {'success': False, 'error': '无法捕获图像'}
'filename': save_path.name
} # 检查图像是否全黑
if self._is_black_frame(frame):
except Exception as e: if attempt < retry_count - 1:
return {'success': False, 'error': str(e)} print(f"[Camera] 全黑图像,重试 {attempt + 2}/{retry_count}")
time.sleep(2)
continue
return {'success': False, 'error': '捕获到全黑图像'}
# 成功,保存图片
now = datetime.datetime.now()
timestamp = now.strftime('%Y%m%d_%H%M%S')
date_str = now.strftime('%Y-%m-%d')
if save_path is None:
date_folder = config_mgr.get_images_dir(date_str)
filename = f"capture_{timestamp}.jpg"
save_path = date_folder / filename
cv2.imwrite(str(save_path), frame, [cv2.IMWRITE_JPEG_QUALITY, 90])
print(f"[Camera] 拍照成功: {save_path.name}")
return {
'success': True,
'path': str(save_path),
'timestamp': timestamp,
'date_folder': save_path.parent.name,
'filename': save_path.name
}
except Exception as e:
self.close()
if attempt < retry_count - 1:
print(f"[Camera] 异常: {e}, 重试 {attempt + 2}/{retry_count}")
time.sleep(2)
continue
return {'success': False, 'error': str(e)}
return {'success': False, 'error': '重试后仍失败'}
def _is_black_frame(self, frame):
"""检测图像是否全黑"""
if frame is None:
return True
# 计算平均亮度
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
mean_brightness = cv2.mean(gray)[0]
# 如果平均亮度小于 5视为全黑
return mean_brightness < 5
def get_camera_info(self): def get_camera_info(self):
"""获取摄像头信息""" """获取摄像头信息"""
@@ -82,14 +149,16 @@ class CameraCapture:
width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(self.cap.get(cv2.CAP_PROP_FPS)) fps = int(self.cap.get(cv2.CAP_PROP_FPS))
self.close()
return { return {
'index': self.camera_index, 'index': self.camera_index,
'width': width, 'width': width,
'height': height, 'height': height,
'fps': fps, 'fps': fps,
'status': 'opened' 'status': 'ok'
} }
except Exception as e: except Exception as e:
self.close()
return { return {
'index': self.camera_index, 'index': self.camera_index,
'status': 'error', 'status': 'error',