Source code for BiblioPixelAnimations.matrix.kimotion

from bibliopixel.animation.matrix import Matrix
try:
    from websocket import create_connection
except ImportError:
    create_connection = None

import threading
import numpy as np

WS_FRAME_WIDTH = 640
WS_FRAME_HEIGHT = 480
WS_FRAME_SIZE = WS_FRAME_WIDTH * WS_FRAME_HEIGHT


[docs]def clamp(v, _min, _max): return max(min(v, _max), _min)
[docs]def lerp(n, low, high): return clamp((n - low) / (high - low), 0.0, 1.0)
[docs]def rebin(a, shape): sh = shape[0], a.shape[0] // shape[0], shape[1], a.shape[1] // shape[1] return a.reshape(sh).mean(-1).mean(1)
[docs]def thread_lock(): e = threading.Event() e.lock = e.clear e.release = e.set e.is_released = e.is_set e.release() return e
[docs]class ws_thread(threading.Thread): def __init__(self, server): super().__init__() self.setDaemon(True) self._stop = threading.Event() self._reading = thread_lock() self.dt = np.dtype(np.uint16) self.dt = self.dt.newbyteorder('<') self._data = [np.zeros(WS_FRAME_SIZE, self.dt), np.zeros(WS_FRAME_SIZE, self.dt)] self._buf = False self.ws = create_connection("ws://{}/".format(server))
[docs] def stop(self): self._stop.set()
[docs] def stopped(self): return self._stop.isSet()
[docs] def get_frame(self): self._reading.lock() d = np.copy(self._data[0 if self._buf else 1]) self._reading.release() return d
[docs] def run(self): while not self.stopped(): d = self.ws.recv() d = np.frombuffer(d, dtype=self.dt) self._reading.wait() self._data[1 if self._buf else 0] = d self._buf = not self._buf self.ws.close()
[docs]class KimotionShader(object): def __init__(self, anim): self.anim = anim self.width = self.anim.width self.height = self.anim.height self.led = self.anim._led
[docs]class SandStorm(KimotionShader): def __init__(self, anim, min_z=440, max_z=1100, near_color=[229, 107, 0], near_z=760, mid_color=[40, 0, 114], far_color=[2, 2, 12], far_z=1100): super().__init__(anim) self.min_z = float(min_z) self.max_z = float(max_z) self.near_color = np.array(near_color) self.near_z = float(near_z) self.mid_color = np.array(mid_color) self.mid_z = ((self.max_z + self.near_z) / 2.0) self.far_color = np.array(far_color) self.far_z = float(far_z) self.z_colors = np.array([self.z_color(z) for z in range(0, int(Kimotion.max_depth) + 1)]).tolist()
[docs] def z_color(self, z): z = float(z) alpha = 1.0 if z <= self.mid_z: ns = lerp(z, self.near_z, self.mid_z) color = (1.0 - ns) * self.near_color + ns * self.mid_color else: # z must be between self.mid_z and FAR_Z fs = lerp(z, self.mid_z, self.far_z) color = (1.0 - fs) * self.mid_color + fs * self.far_color alpha = 1.0 - lerp(z, self.min_z, self.max_z) if z <= -self.min_z: alpha = 0.0 # gl_FragColor = vec4(color, alpha) * texture2D( texture, gl_PointCoord ) return (color * alpha).astype(np.uint8).tolist()
[docs] def render(self, frame): for y in range(self.height): for x in range(self.width): c = self.z_colors[frame[y][x]] self.led.set(x, y, c)
[docs]class Kimotion(Matrix): max_depth = 1200.0 shaders = { "Sandstorm": SandStorm } def __init__(self, layout, server="localhost:1337", mirror=True, crop=True, shader="Sandstorm", **kwargs): super().__init__(layout, **kwargs) self.server = server self.mirror = mirror self.crop = crop self.fw = WS_FRAME_WIDTH self.fh = WS_FRAME_HEIGHT # TODO: Implement something to actually use this # Right now needs 4:3 aspect display self.frame_aspect = (float(WS_FRAME_WIDTH) / float(WS_FRAME_HEIGHT)) self.aspect = (float(self.width) / float(self.height)) self.resize_box = (self.width, self.height) self.crop_box = (0, 0, self.width, self.height) if self.frame_aspect > self.aspect: self.resize_box[0] = int(self.height * self.frame_aspect) half = (self.resize_box[0] - self.width) / 2 self.crop_box[0] = half self.crop_box[2] = self.resize_box[0] - half elif self.frame_aspect < self.aspect: self.resize_box[1] = int(self.width / self.aspect) half = (self.resize_box[1] - self.height) / 2 self.crop_box[1] = half self.crop_box[3] = self.resize_box[1] - half self.shader = Kimotion.shaders[shader](self, **kwargs) self._ws_thread = ws_thread(self.server) self._ws_thread.start() def _exit(self, type, value, traceback): self._ws_thread.stop()
[docs] def step(self, amt=1): d = self._ws_thread.get_frame() d = d.reshape(WS_FRAME_HEIGHT, WS_FRAME_WIDTH) if self.mirror: d = np.fliplr(d) d = rebin(d, (self.height, self.width)).astype(np.uint16) self.shader.render(d)