Source code for bibliopixel.control.artnet

import collections, copy, queue
from . import control
from .. util import artnet_message, log, offset_range, server_cache, udp

QUEUE_TIMEOUT = 0.1


[docs]class ArtNet(control.ExtractedControl): def __init__(self, *args, ip_address='', port=artnet_message.UDP_PORT, offset=0, **kwds): super().__init__(*args, **kwds) self.address = ip_address, port self.offset = offset_range.DMXChannel.make(offset) def _convert(self, msg): msg = artnet_message.bytes_to_message(msg) if not msg: return data = bytes(self.offset.read_from(msg.data)) msg = collections.OrderedDict(( ('type', 'dmx'), ('net', msg.net), ('subUni', msg.subUni), ('data', data))) return super()._convert(msg) def _make_thread(self): return udp.Receiver(self.address, receive=self.receive)