61 lines
1.6 KiB
Python
61 lines
1.6 KiB
Python
import cv2
|
|
import threading
|
|
import numpy as np
|
|
from typing import Callable, Optional
|
|
|
|
|
|
class Camera:
|
|
def __init__(self, index: int = 0):
|
|
self.index = index
|
|
self.cap: Optional[cv2.VideoCapture] = None
|
|
self.thread: Optional[threading.Thread] = None
|
|
self.stop_event = threading.Event()
|
|
|
|
def open(self):
|
|
if self.cap is None:
|
|
self.cap = cv2.VideoCapture(self.index)
|
|
if not self.cap.isOpened():
|
|
self.cap = None
|
|
raise RuntimeError("Could not open camera")
|
|
|
|
def close(self):
|
|
if self.cap is not None:
|
|
self.cap.release()
|
|
self.cap = None
|
|
|
|
def take_photo(self) -> bytes:
|
|
self.open()
|
|
try:
|
|
ret, frame = self.cap.read()
|
|
if not ret:
|
|
raise RuntimeError("Failed to capture image")
|
|
return cv2.imencode('.jpg', frame, [int(cv2.IMWRITE_JPEG_QUALITY), 80])[1].tobytes()
|
|
finally:
|
|
self.close()
|
|
|
|
def start_capture(self, callback: Callable[[np.ndarray], None]):
|
|
if self.thread and self.thread.is_alive():
|
|
return
|
|
|
|
self.open()
|
|
self.stop_event.clear()
|
|
|
|
def loop():
|
|
while not self.stop_event.is_set():
|
|
ret, frame = self.cap.read()
|
|
if not ret:
|
|
break
|
|
callback(frame)
|
|
|
|
self.thread = threading.Thread(target=loop, daemon=True)
|
|
self.thread.start()
|
|
|
|
def stop_capture(self):
|
|
self.stop_event.set()
|
|
|
|
self.close()
|
|
|
|
if self.thread:
|
|
self.thread.join(timeout=2)
|
|
self.thread = None
|