86 lines
2.3 KiB
Python
86 lines
2.3 KiB
Python
import threading
|
|
import time
|
|
from typing import Callable
|
|
|
|
|
|
class Clock:
|
|
def __init__(self, value: int, increment: int) -> None:
|
|
self.millis = value * 1000
|
|
self.increment = increment * 1000
|
|
|
|
self._run_event = threading.Event()
|
|
self._terminate_event = threading.Event()
|
|
self._lock = threading.Lock()
|
|
|
|
self.on_update: Callable[[str], None] = lambda _: None
|
|
self.on_terminated: Callable[[], None] = lambda: None
|
|
|
|
self.thread = threading.Thread(
|
|
target=self._update_clock,
|
|
daemon=True
|
|
)
|
|
|
|
def _compute_seconds(self) -> int:
|
|
return (self.millis // 1000) % 60
|
|
|
|
def _compute_minutes(self) -> int:
|
|
return self.millis // 60000
|
|
|
|
def _add_increment(self) -> None:
|
|
self.millis += self.increment
|
|
|
|
def _update_clock(self):
|
|
while not self._terminate_event.is_set() and self.millis > 0:
|
|
self._run_event.wait()
|
|
time.sleep(1)
|
|
with self._lock:
|
|
self.millis -= 1000
|
|
if self.millis < 0:
|
|
self.millis = 0
|
|
self._notify_update()
|
|
|
|
self._notify_terminated()
|
|
|
|
def _notify_update(self) -> None:
|
|
self.on_update(self.clock_to_str())
|
|
|
|
def _notify_terminated(self) -> None:
|
|
self.on_terminated()
|
|
|
|
def set_on_update(self, callback: Callable[[str], None]) -> None:
|
|
self.on_update = callback
|
|
|
|
def set_on_terminated(self, callback: Callable[[], None]) -> None:
|
|
self.on_terminated = callback
|
|
|
|
def is_running(self) -> bool:
|
|
return self._run_event.is_set()
|
|
|
|
def start(self) -> None:
|
|
if not self.thread.is_alive():
|
|
self.thread.start()
|
|
self._run_event.set()
|
|
|
|
def stop(self) -> None:
|
|
if self._run_event.is_set():
|
|
with self._lock:
|
|
self._add_increment()
|
|
self._run_event.clear()
|
|
|
|
def clock_to_str(self) -> str:
|
|
with self._lock:
|
|
minutes = self._compute_minutes()
|
|
seconds = self._compute_seconds()
|
|
return f"{minutes:02d}:{seconds:02d}"
|
|
|
|
if __name__ == "__main__":
|
|
def display(content):
|
|
print(content)
|
|
|
|
clock = Clock(600, 0)
|
|
clock.start()
|
|
clock.set_on_update(display)
|
|
|
|
while True:
|
|
pass
|