import cv2 import threading import numpy as np from typing import Callable, Optional class Camera: def __init__(self, index: int = 0): self.index = index self.cap: Optional[cv2.VideoCapture] = None self.thread: Optional[threading.Thread] = None self.stop_event = threading.Event() def open(self): if self.cap is None: self.cap = cv2.VideoCapture(self.index) if not self.cap.isOpened(): self.cap = None raise RuntimeError("Could not open camera") def close(self): if self.cap is not None: self.cap.release() self.cap = None def take_photo(self) -> np.ndarray: self.open() try: ret, frame = self.cap.read() if not ret: raise RuntimeError("Failed to capture image") return frame finally: self.close() def start_capture(self, callback: Callable[[np.ndarray], None]): if self.thread and self.thread.is_alive(): return self.open() self.stop_event.clear() def loop(): while not self.stop_event.is_set(): ret, frame = self.cap.read() if not ret: break callback(frame) self.thread = threading.Thread(target=loop, daemon=True) self.thread.start() def stop_capture(self): self.stop_event.set() self.close() if self.thread: self.thread.join(timeout=2) self.thread = None