Source code for bibliopixel.layout.update_threading

import threading, time
from .. util import log
from .. util.threads import compose_events, runnable


[docs]class UpdateDriverThread(runnable.LoopThread): def __init__(self, driver): super().__init__() self._wait = threading.Event() self._reading = threading.Event() self._reading.set() self._updating = threading.Event() self._driver = driver
[docs] def update_colors(self): self._reading.wait() self._reading.clear() self._wait.set()
[docs] def sync(self): self._updating.wait() self._driver.sync()
[docs] def sending(self): return self._wait.isSet()
[docs] def run_once(self): self._wait.wait() self._updating.clear() self._driver.update_colors() self._wait.clear() self._reading.set() self._updating.set()
[docs]class UpdateThread(runnable.LoopThread): def __init__(self, drivers): super().__init__() self._wait = threading.Event() self._reading = threading.Event() self._reading.set() self._drivers = drivers for d in self._drivers: t = UpdateDriverThread(d) t.start() d._thread = t events = (d._thread._updating for d in self._drivers) self._updated = compose_events.compose_events(events)
[docs] def update_colors(self): self._reading.wait() for d in self._drivers: d._thread.update_colors() self._reading.clear() self._wait.set()
[docs] def run_once(self): self._wait.wait() self._updated.wait() for d in self._drivers: d._thread.sync() self._updated.clear() self._wait.clear() self._reading.set()
[docs]class NoThreading(runnable.Runnable): """ """ def __init__(self, layout): super().__init__() self.layout = layout
[docs] def update_colors(self): for d in self.layout.drivers: d.update_colors() for d in self.layout.drivers: d.sync()
[docs] def wait_for_update(self): pass
[docs] def push_to_driver(self): """Push the current pixel state to the driver""" self.wait_for_update() self.update_colors()
[docs]class UseThreading(NoThreading): def __init__(self, layout): self.layout = layout self.update_thread = UpdateThread(layout.drivers) self.update_thread.start()
[docs] def update_colors(self): self.update_thread.update_colors()
[docs] def wait_for_update(self): while all([d._thread.sending() for d in self.layout.drivers]): time.sleep(0.000001)
[docs] def wait(self): self.update_thread.wait()
[docs] def stop(self): self.update_thread.stop()
[docs]def UpdateThreading(enable, layout): """ UpdateThreading handles threading - and eventually multiprocessing - for Layout. """ return (UseThreading if enable else NoThreading)(layout)