import board import json import os import time import wifi import adafruit_connection_manager import adafruit_minimqtt.adafruit_minimqtt as MQTT import colours import neopixel num_pixels = 12 pixels = neopixel.NeoPixel(board.GP0, num_pixels) CMD_FEED = "ctrl/gd_clock/cmd" SENS_FEED = os.getenv('MQTT_SENS_FEED') class Flashlight: def start(self, clock): print("Starting Flashlight mode") pixels.fill((255, 255, 255)) clock.brightness = 0.5 pixels.brighness = clock.brightness def loop(self, clock): pass flashlight = Flashlight() class Rainbow: count = 0 step = 1 def start(self, clock): print("Starting rainbow mode") pixels.fill((0, 0, 0)) clock.brightness = 0.1 pixels.brightness = clock.brightness def loop(self, clock): self.count += 1 if self.count >= num_pixels: self.count = 0 i = self.count // self.step for j in range(8): pixels[i - j] = colours.hsv2rgb( int(360 / num_pixels) * (i - j), 255, 256 * (1/2 ** j) - 1 ) pixels[i - 8] = (0, 0, 0) rainbow = Rainbow() class GD_Clock: brightness = 0.1 current_mode = rainbow def connected(self, client, userdata, flags, rc): print("Connected to MQTT") client.subscribe(CMD_FEED + "/#") client.subscribe(SENS_FEED + "/#") def disconnected(self, client, topic, message): time.sleep(10) client.connect() def parse_cmd(self, topic, message): print(f"New cmd on topic {topic}: {message}") try: payload = json.loads(message) except ValueError: return None cmd = payload.get("cmd", None) value = payload.get("value", None) kwargs = payload.get("kwargs", {}) if cmd == "mode": if value == "flashlight": self.current_mode = flashlight self.current_mode.start(self) elif value == "rainbow": self.current_mode = rainbow self.current_mode.start(self) elif cmd == "brightness": if value == "+": self.brightness = min(1.0, self.brightness + 0.1) elif value == "-": self.brightness = max(0.1, self.brightness - 0.1) else: try: bright = float(value) except ValueError: return None self.brightness = min(0.1, max(1.0, bright)) print("Setting brightness to", self.brightness) pixels.brightness = self.brightness def parse_sensor(self, topic, message): print(f"New sensor message on topic {topic}: {message}") pass def message(self, client, topic, message): if topic.startswith(CMD_FEED): self.parse_cmd(topic, message) elif topic.startswith(SENS_FEED): self.parse_sensor(topic, message) # ****** Main code ********************************************************** # print("Connecting to WiFi") try: wifi.radio.connect( os.getenv('CIRCUITPY_WIFI_SSID'), os.getenv('CIRCUITPY_WIFI_PASSWORD') ) except TypeError: print("Could not find WiFi info. Check your settings.toml file!") raise print("Connected to WiFi") pool = adafruit_connection_manager.get_radio_socketpool(wifi.radio) mqtt_client = MQTT.MQTT( broker=os.getenv('MQTT_HOST'), socket_pool=pool, ) clock = GD_Clock() mqtt_client.on_connect = clock.connected mqtt_client.on_disconnect = clock.disconnected mqtt_client.on_message = clock.message mqtt_client.connect() clock.current_mode.start(clock) while True: mqtt_client.loop(timeout=1) clock.current_mode.loop(clock) time.sleep(1)