Source code for bibliopixel.drivers.network_udp

import socket, sys, time, os

from . driver_base import DriverBase
from .. util import log, util
from .. drivers.return_codes import RETURN_CODES


[docs]class CMDTYPE: SETUP_DATA = 1 # reserved for future use PIXEL_DATA = 2 BRIGHTNESS = 3
[docs]class NetworkUDP(DriverBase): """Driver for communicating with another device on the network via UDP To be used with either BiblioPixelAnimations.receivers.GenericNetworkReceiver or :py:class:`.network_receiver.NetworkReceiver` Provides the same parameters of :py:class:`.driver_base.DriverBase` as well as those below: :param str host: Network hostname or IP address of receiver. :param int port: Network port of receiver :param bool broadcast: If True, broadcast to subnet instead of specific host """ def __init__(self, num=0, width=0, height=0, host="localhost", broadcast=False, port=3142, broadcast_interface='', **kwds): super().__init__(num, width, height, **kwds) self._host = host self._port = port self._sock = None self._broadcast = broadcast self._broadcast_interface = broadcast_interface def _generateHeader(self, cmd, size): packet = bytearray() packet.append(cmd) packet.append(size & 0xFF) packet.append(size >> 8) return packet def _connect(self): try: self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) if self._broadcast: self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) return self._sock except socket.gaierror: error = "Unable to connect to or resolve host: {}".format( self._host) log.error(error) raise IOError(error) def _compute_packet(self): self._render() count = self.bufByteCount() self._packet = util.generate_header(CMDTYPE.PIXEL_DATA, count) self._packet.extend(self._buf) # Push new data to strand def _send_packet(self): try: s = self._connect() s.sendto(self._packet, (self._host, self._port)) s.close() except Exception as e: log.exception(e) error = "Problem communicating with network receiver!" log.error(error) raise IOError(error)