Source code for bibliopixel.control.extractor
import collections
[docs]class Extractor:
"""
Extractor is a class that extracts and normalizes values from
incoming message dictionaries into ordered dictionaries based on the
`type` key of each message.
"""
def __init__(self, omit=None, normalizers=None, keys_by_type=None,
accept=None, reject=None, auto_omit=True):
"""
Arguments
omit -- A list of keys that will not be extracted.
normalizers -- Some keys also need to be "normalized" -
scaled and offset so they are between 0 and 1, or -1 and 1.
The `normalizers` table maps key names to a function that
normalizes the value of that key.
keys_by_type -- `keys_by_type` is a dictionary from the `type` in an
incoming message to a list of message keys to be extracted
accept -- maps keys to a value or a list of values that are
accepted for that key. A message has to match *all* entries in
`accept` to be accepted.
reject -- map key to a value or a list of values that are not
accepted for that key. A message is rejected if it matches *any*
entry in the reject map.
auto_omit -- if True, omit all keys in `accept` that only have one
possible value.
auto_omit=True, the default, is probably more useful: if you
request data for channel=1, type=note_on then you probably don't
want to see channel=1, type=note_on with each message.
"""
def to_set(x):
if x is None:
return set()
if isinstance(x, (list, tuple)):
return set(x)
return set([x])
def make_match(m):
return m and {k: to_set(v) for k, v in m.items()}
self.accept, self.reject = make_match(accept), make_match(reject)
self.omit = to_set(omit)
if auto_omit and self.accept:
self.omit.update(k for k, v in self.accept.items() if len(v) == 1)
self.normalizers = normalizers or {}
if keys_by_type is None:
self.keys_by_type = None
else:
self.keys_by_type = {}
for k, v in keys_by_type.items():
if isinstance(v, str):
v = [v]
self.keys_by_type[k] = tuple(i for i in v if i not in self.omit)
[docs] def extract(self, msg):
"""Yield an ordered dictionary if msg['type'] is in keys_by_type."""
def normal(key):
v = msg.get(key)
if v is None:
return v
normalizer = self.normalizers.get(key, lambda x: x)
return normalizer(v)
def odict(keys):
return collections.OrderedDict((k, normal(k)) for k in keys)
def match(m):
return (msg.get(k) in v for k, v in m.items()) if m else ()
accept = all(match(self.accept))
reject = any(match(self.reject))
if reject or not accept:
keys = ()
elif self.keys_by_type is None:
keys = [k for k in msg.keys() if k not in self.omit]
else:
keys = self.keys_by_type.get(msg.get('type'))
return odict(keys)