Source code for bibliopixel.util.udp

"""
A server that queues and sends UDP requests for a specific port on a
separate thread.
"""

import queue, socket
from .. util.threads import runnable
from .. util import log


[docs]class Sender(runnable.Runnable): def __init__(self, address): """ :param str address: a pair (ip_address, port) to pass to socket.connect """ super().__init__() self.address = address
[docs] def send(self, msg): with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.settimeout(self.timeout) sock.connect(self.address) sock.send(msg)
# sock.sendto(msg, self.address)
[docs]class QueuedSender(runnable.QueueHandler): """ The UPD protocol is stateless but not necessarily thread-safe. ``QueuedSender`` uses a queue to send all UDP messages to one address from a new thread. """ def __init__(self, address, **kwds): """ :param str address: a pair (ip_address, port) to pass to socket.connect """ super().__init__(**kwds) self.sender = Sender(address)
[docs] def send(self, msg): self.sender.send(msg)
[docs]def sender(address, use_queue=True, **kwds): """ :param str address: a pair (ip_address, port) to pass to socket.connect :param bool use_queue: if True, run the connection in a different thread with a queue """ return QueuedSender(address, **kwds) if use_queue else Sender(address)
[docs]class Receiver(runnable.LoopThread): """ Receive UDP messages in a thread """ def __init__(self, address, bufsize=0x1000, receive=None, **kwds): super().__init__(**kwds) self.address = address self.bufsize = bufsize self.receive = receive or self.receive self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind(self.address) def __str__(self): return 'udp.Receiver(%s, %s)' % (self.address[0], hex(self.address[1]))
[docs] def run_once(self): try: data, addr = self.socket.recvfrom(self.bufsize) except OSError as e: if e.errno != 9: raise if self.running: log.error('Someone else closed the socket') super().stop() return if data: self.receive(data)
[docs] def stop(self): super().stop() try: self.socket.close() except Exception as e: log.error('Exception in socket.close: %s', e)
[docs]class QueuedReceiver(Receiver): """ Receive UDP messages in a thread and put them on a queue. """ def __init__(self, *args, **kwds): self.queue = queue.Queue() super().__init__(*args, **kwds)
[docs] def receive(self, msg): self.queue.put(msg)