Source code for bibliopixel.control.serial_gamepad

import argparse, serial, serial.tools.list_ports
from distutils.version import LooseVersion
from . gamepad import BaseGamePad
from .. drivers.return_codes import RETURN_CODES, print_error
from .. util import exception, log
from .. util.util import generate_header

VERSION_ERROR = """
pyserial v{} found, please upgrade to v2.7+!
    pip install pyserial --upgrade
"""

UNABLE_TO_CONNECT_ERROR = """
Unable to connect to the device.
Please check that it is connected and the correct port is selected.
"""

if LooseVersion(serial.VERSION) < LooseVersion('2.7'):
    error = VERSION_ERROR.format(serial.VERSION)
    log.error(error)
    raise ImportError(error)

BTN_MAP = ('A', 'B', 'SELECT', 'START', 'UP', 'DOWN', 'LEFT', 'RIGHT', 'X', 'Y')


[docs]class CMDTYPE: INIT = 0 GET_BTNS = 1 SET_LEDS = 2 SET_MODE = 4
[docs]class SerialPadError(Exception): pass
[docs]class SerialGamePad(BaseGamePad): devices = [] def __init__(self, btn_map=BTN_MAP, dev="", hardwareID="1B4F:9206"): super().__init__() self._map = btn_map self._hardwareID = hardwareID self._com = None self.dev = dev resp = self._connect() if resp != RETURN_CODES.SUCCESS: raise SerialPadError(print_error(resp))
[docs] def cleanup(self): if self._com is not None: log.info("Closing connection to: %s", self.dev) exception.report(self._com.close) self._com = None
[docs] def close(self): from .. util import deprecated deprecated.deprecated('SerialGamePad.close') self.cleanup()
[docs] @staticmethod def find_serial_devices(hardwareID="1B4F:9206"): hardwareID = "(?i)" + hardwareID # forces case insensitive if len(SerialGamePad.devices) == 0: SerialGamePad.devices = [] for port in serial.tools.list_ports.grep(hardwareID): SerialGamePad.devices.append(port[0]) return SerialGamePad.devices
@staticmethod def _printError(error): msg = "Unknown error occured." if error == RETURN_CODES.ERROR_SIZE: msg = "Data packet size incorrect." elif error == RETURN_CODES.ERROR_UNSUPPORTED: msg = "Unsupported configuration attempted." elif error == RETURN_CODES.ERROR_BAD_CMD: msg = "Unsupported protocol command. Check your device version." log.error("%s: %s", error, msg) raise SerialPadError(msg) @staticmethod def _comError(fail=False): error = "There was an unknown error communicating with the device." log.error(error) if fail: raise IOError(error) def _connect(self): try: if(self.dev == ""): SerialGamePad.find_serial_devices(self._hardwareID) if len(SerialGamePad.devices) > 0: self.dev = SerialGamePad.devices[0] log.info("Using COM Port: %s", self.dev) try: self._com = serial.Serial(self.dev, timeout=5) except serial.SerialException: ports = SerialGamePad.find_serial_devices(self._hardwareID) error = "Invalid port specified. No COM ports available." if len(ports) > 0: error = "Invalid port specified. Try using one of: \n" + \ "\n".join(ports) log.info(error) raise SerialPadError(error) packet = generate_header(CMDTYPE.INIT, 0) self._com.write(packet) resp = self._com.read(1) return resp and ord(resp) except serial.SerialException as e: error = UNABLE_TO_CONNECT_ERROR log.exception(e) log.error(error) raise e
[docs] def getKeys(self): bits = 0 try: packet = generate_header(CMDTYPE.GET_BTNS, 0) self._com.write(packet) resp = self._com.read(1) if len(resp) == 0: SerialGamePad._comError() elif ord(resp) != RETURN_CODES.SUCCESS: print_error(ord(resp)) else: resp = self._com.read(2) if len(resp) != 2: SerialGamePad._comError() else: bits = ord(resp[0]) + (ord(resp[1]) << 8) except: log.error("IO Error Communicating With Game Pad!") index = 0 result = {} for m in self._map: result[m] = (bits & (1 << index) > 0) index += 1 return argparse.Namespace(**result)
[docs] def setLights(self, data): temp = [(0, 0, 0)] * len(data.keys()) for key in data: i = self._map.index(key) if i >= 0 and i < len(temp): temp[i] = (data[key]) leds = [] for l in temp: leds.extend(l) packet = generate_header(CMDTYPE.SET_LEDS, len(leds)) packet.extend(leds) self._com.write(packet) resp = self._com.read(1) if len(resp) == 0: SerialGamePad._comError() elif ord(resp) != RETURN_CODES.SUCCESS: print_error(ord(resp))
[docs] def setLightsOff(self, count): data = {} num = 0 for b in self._map: data[b] = (0, 0, 0) num += 1 if num >= count: break self.setLights(data)