Source code for bibliopixel.util.threads.task_thread

import collections, threading
from . import runnable


[docs]class Task(object): def __init__(self, task=None, event=None): self.task = task or (lambda: None) self.event = event or threading.Event()
[docs] def run(self, next_task): """Wait for the event, run the task, trigger the next task.""" self.event.wait() self.task() self.event.clear() next_task.event.set()
[docs]class TaskThread(runnable.LoopThread): def __init__(self, producer_task, consumer_task, daemon=True, **kwds): super().__init__(daemon=daemon, **kwds) self.producer_task = producer_task self.consumer_task = consumer_task
[docs] def produce(self): self.producer_task.run(self.consumer_task)
[docs] def run_once(self): self.consumer_task.run(self.producer_task)