""" 摄像头拍照模块 - Windows 兼容 """ import cv2 import datetime import time from pathlib import Path from config import config_mgr class CameraCapture: """摄像头拍照管理""" def __init__(self): self.camera_index = config_mgr.get('camera_index', 0) self.cap = None self._closed = True self._release_time = 0 def open(self): """打开摄像头""" if self.cap is not None and self.cap.isOpened(): return True # 确保之前的摄像头已完全关闭 if not self._closed: self.close() import time time.sleep(0.5) # 等待摄像头完全释放 self.camera_index = config_mgr.get('camera_index', 0) self.cap = cv2.VideoCapture(self.camera_index, cv2.CAP_DSHOW) # Windows DirectShow if not self.cap.isOpened(): raise Exception(f"无法打开摄像头 {self.camera_index}") # 设置分辨率 self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) self._closed = False return self.cap.isOpened() def close(self): """关闭摄像头""" if self.cap is not None: self.cap.release() self.cap = None self._closed = True self._release_time = time.time() def capture(self, save_path=None, retry_count=3): """拍照并保存(每次拍照后自动关闭摄像头) Args: save_path: 保存路径 retry_count: 重试次数 Returns: dict: {'success': bool, 'path': str, 'timestamp': str, 'date_folder': str, 'error': str} """ for attempt in range(retry_count): try: # 每次拍照都重新打开摄像头 self.open() # 等待摄像头稳定(Windows 上需要预热) time.sleep(0.5) # 读取帧(丢弃前几帧确保稳定) for _ in range(5): ret, _ = self.cap.read() if not ret: self.close() if attempt < retry_count - 1: print(f"[Camera] 预热失败,重试 {attempt + 2}/{retry_count}") time.sleep(1) continue return {'success': False, 'error': '摄像头预热失败'} # 拍照 ret, frame = self.cap.read() # 立即关闭摄像头释放资源 self.close() if not ret or frame is None: if attempt < retry_count - 1: print(f"[Camera] 捕获失败,重试 {attempt + 2}/{retry_count}") time.sleep(1) continue return {'success': False, 'error': '无法捕获图像'} # 检查图像是否全黑 if self._is_black_frame(frame): if attempt < retry_count - 1: print(f"[Camera] 全黑图像,重试 {attempt + 2}/{retry_count}") time.sleep(2) continue return {'success': False, 'error': '捕获到全黑图像'} # 成功,保存图片 now = datetime.datetime.now() timestamp = now.strftime('%Y%m%d_%H%M%S') date_str = now.strftime('%Y-%m-%d') if save_path is None: date_folder = config_mgr.get_images_dir(date_str) filename = f"capture_{timestamp}.jpg" save_path = date_folder / filename cv2.imwrite(str(save_path), frame, [cv2.IMWRITE_JPEG_QUALITY, 90]) print(f"[Camera] 拍照成功: {save_path.name}") return { 'success': True, 'path': str(save_path), 'timestamp': timestamp, 'date_folder': save_path.parent.name, 'filename': save_path.name } except Exception as e: self.close() if attempt < retry_count - 1: print(f"[Camera] 异常: {e}, 重试 {attempt + 2}/{retry_count}") time.sleep(2) continue return {'success': False, 'error': str(e)} return {'success': False, 'error': '重试后仍失败'} def _is_black_frame(self, frame): """检测图像是否全黑""" if frame is None: return True # 计算平均亮度 gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) mean_brightness = cv2.mean(gray)[0] # 如果平均亮度小于 5,视为全黑 return mean_brightness < 5 def get_camera_info(self): """获取摄像头信息""" try: self.open() width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = int(self.cap.get(cv2.CAP_PROP_FPS)) self.close() return { 'index': self.camera_index, 'width': width, 'height': height, 'fps': fps, 'status': 'ok' } except Exception as e: self.close() return { 'index': self.camera_index, 'status': 'error', 'error': str(e) } def __del__(self): self.close() def list_available_cameras(max_test=5): """列出可用的摄像头""" available = [] for i in range(max_test): cap = cv2.VideoCapture(i, cv2.CAP_DSHOW) if cap.isOpened(): available.append(i) cap.release() return available if __name__ == "__main__": # 测试摄像头 print("检测可用摄像头...") cams = list_available_cameras() print(f"可用摄像头: {cams}") if cams: print("\n测试拍照...") camera = CameraCapture() result = camera.capture() print(f"结果: {result}") camera.close()