Source code for bibliopixel.util.threads.runnable

from .. import log
import contextlib, functools, threading, time, traceback, queue


[docs]class Runnable: """ Base class for all objects that contain threads - including threads created and started by bp code, and threads started by external libraries. There are three possible thread categories for a Runnable: 1. Runs on the master thread - M 2. A new thread created by us - N 3. A new external thread created by some third-party code - X Case X is tricky because we would like our code to be called at the start of the new thread, and again at the end of that thread, but we can't. Lifecycle - what a Runnable does, and what threads it could be called on * construction: M * start: M * on_start: M * called on the master thread after any new thread has started up * run: MN * callbacks: MNX * join: M * cleanup: M(N?) TODO: right now we run all our cleanups on the new thread, if there is a new thread, otherwise on the master thread. Should we move to doing all the cleanups on the master thread? The way to use a Runnable is like a context manager: with some_runnable() as runnable: add_some_callbacks(runnable) more_stuff_that_runs_on_start() # Depending on the thread category, the `Runnable` isn't guaranteed to # actually "go off" until the end of this block. We're going to call the code inside the context manager `on_start` """ MASTER, NEW, EXTERNAL = 'M', 'N', 'X' category = NEW timeout = 0.1 def __init__(self): self.run_event = threading.Event() self.stop_event = threading.Event() @property def running(self): """ Is this Runnable expected to make any progress from here? The Runnable might still execute a little code after it has stopped running. """ return self.run_event.is_set() and not self.stop_event.is_set()
[docs] def is_alive(self): """ Is this Runnable still executing code? In some cases, such as threads, self.is_alive() might be true for some time after self.running has turned False. """ return self.running
[docs] def start(self): self.run_event.set()
[docs] def stop(self): self.stop_event.set()
[docs] def wait(self): self.stop_event.wait()
[docs] def cleanup(self): """ Cleans up resources after the Runnable. self.cleanup() may not throw an exception. """
[docs] def run(self): try: while self.running: self.run_once() except: log.error('Exception at %s: \n%s', str(self), traceback.format_exc()) finally: self.stop() self.cleanup()
[docs] def run_once(self): """The target code that is repeatedly executed in the run method""" pass
[docs] @contextlib.contextmanager def run_until_stop(self): """ A context manager that starts this Runnable, yields, and then waits for it to finish.""" self.start() try: yield self finally: self.stop() self.wait()
[docs]class LoopThread(Runnable): def __init__(self, daemon=True, **kwds): super().__init__() self.thread = threading.Thread(daemon=daemon, target=self.run, **kwds)
[docs] def start(self): self.thread.start()
[docs] def run(self): super().start() super().run()
[docs] def is_alive(self): return self.thread.is_alive()
[docs]class QueueHandler(LoopThread): def __init__(self, timeout=0.1, send=None, **kwds): super().__init__(**kwds) self.timeout = timeout self.queue = queue.Queue() self.send = send or self.send
[docs] def run_once(self): try: msg = self.queue.get(timeout=self.timeout) except queue.Empty: pass else: self.send(msg)
[docs] def send(self, msg): pass