import threading, time
from .. util import log
from .. util.threads import compose_events, runnable
[docs]class UpdateDriverThread(runnable.LoopThread):
def __init__(self, driver):
super().__init__()
self._wait = threading.Event()
self._reading = threading.Event()
self._reading.set()
self._updating = threading.Event()
self._driver = driver
[docs] def update_colors(self):
self._reading.wait()
self._reading.clear()
self._wait.set()
[docs] def sync(self):
self._updating.wait()
self._driver.sync()
[docs] def sending(self):
return self._wait.isSet()
[docs] def run_once(self):
self._wait.wait()
self._updating.clear()
self._driver.update_colors()
self._wait.clear()
self._reading.set()
self._updating.set()
[docs]class UpdateThread(runnable.LoopThread):
def __init__(self, drivers):
super().__init__()
self._wait = threading.Event()
self._reading = threading.Event()
self._reading.set()
self._drivers = drivers
for d in self._drivers:
t = UpdateDriverThread(d)
t.start()
d._thread = t
events = (d._thread._updating for d in self._drivers)
self._updated = compose_events.compose_events(events)
[docs] def update_colors(self):
self._reading.wait()
for d in self._drivers:
d._thread.update_colors()
self._reading.clear()
self._wait.set()
[docs] def run_once(self):
self._wait.wait()
self._updated.wait()
for d in self._drivers:
d._thread.sync()
self._updated.clear()
self._wait.clear()
self._reading.set()
[docs]class NoThreading(runnable.Runnable):
"""
"""
def __init__(self, layout):
super().__init__()
self.layout = layout
[docs] def update_colors(self):
for d in self.layout.drivers:
d.update_colors()
for d in self.layout.drivers:
d.sync()
[docs] def wait_for_update(self):
pass
[docs] def push_to_driver(self):
"""Push the current pixel state to the driver"""
self.wait_for_update()
self.update_colors()
[docs]class UseThreading(NoThreading):
def __init__(self, layout):
self.layout = layout
self.update_thread = UpdateThread(layout.drivers)
self.update_thread.start()
[docs] def update_colors(self):
self.update_thread.update_colors()
[docs] def wait_for_update(self):
while all([d._thread.sending() for d in self.layout.drivers]):
time.sleep(0.000001)
[docs] def wait(self):
self.update_thread.wait()
[docs] def stop(self):
self.update_thread.stop()
[docs]def UpdateThreading(enable, layout):
"""
UpdateThreading handles threading - and eventually multiprocessing - for
Layout.
"""
return (UseThreading if enable else NoThreading)(layout)