Integrate detection service
This commit is contained in:
0
rpi/hardware/camera/__init__.py
Normal file
0
rpi/hardware/camera/__init__.py
Normal file
60
rpi/hardware/camera/camera.py
Normal file
60
rpi/hardware/camera/camera.py
Normal file
@@ -0,0 +1,60 @@
|
||||
import cv2
|
||||
import threading
|
||||
import numpy as np
|
||||
from typing import Callable, Optional
|
||||
|
||||
|
||||
class Camera:
|
||||
def __init__(self, index: int = 0):
|
||||
self.index = index
|
||||
self.cap: Optional[cv2.VideoCapture] = None
|
||||
self.thread: Optional[threading.Thread] = None
|
||||
self.stop_event = threading.Event()
|
||||
|
||||
def open(self):
|
||||
if self.cap is None:
|
||||
self.cap = cv2.VideoCapture(self.index)
|
||||
if not self.cap.isOpened():
|
||||
self.cap = None
|
||||
raise RuntimeError("Could not open camera")
|
||||
|
||||
def close(self):
|
||||
if self.cap is not None:
|
||||
self.cap.release()
|
||||
self.cap = None
|
||||
|
||||
def take_photo(self) -> np.ndarray:
|
||||
self.open()
|
||||
try:
|
||||
ret, frame = self.cap.read()
|
||||
if not ret:
|
||||
raise RuntimeError("Failed to capture image")
|
||||
return frame
|
||||
finally:
|
||||
self.close()
|
||||
|
||||
def start_capture(self, callback: Callable[[np.ndarray], None]):
|
||||
if self.thread and self.thread.is_alive():
|
||||
return
|
||||
|
||||
self.open()
|
||||
self.stop_event.clear()
|
||||
|
||||
def loop():
|
||||
while not self.stop_event.is_set():
|
||||
ret, frame = self.cap.read()
|
||||
if not ret:
|
||||
break
|
||||
callback(frame)
|
||||
|
||||
self.thread = threading.Thread(target=loop, daemon=True)
|
||||
self.thread.start()
|
||||
|
||||
def stop_capture(self):
|
||||
self.stop_event.set()
|
||||
|
||||
self.close()
|
||||
|
||||
if self.thread:
|
||||
self.thread.join(timeout=2)
|
||||
self.thread = None
|
||||
Reference in New Issue
Block a user