import board import json import os import rtc import time import wifi import adafruit_connection_manager import adafruit_minimqtt.adafruit_minimqtt as MQTT import adafruit_ntp import colours import neopixel num_pixels = 12 pixels = neopixel.NeoPixel(board.GP0, num_pixels) CMD_FEED = "ctrl/gd_clock/cmd" SENS_FEED = os.getenv('MQTT_SENS_FEED') class Flashlight: def start(self, clock): print("Starting Flashlight mode") pixels.fill((255, 255, 255)) clock.brightness = 0.5 pixels.brighness = clock.brightness def loop(self, clock): pass flashlight = Flashlight() class Rainbow: count = 0 step = 1 def start(self, clock): print("Starting rainbow mode") pixels.fill((0, 0, 0)) clock.brightness = 0.1 pixels.brightness = clock.brightness def loop(self, clock): self.count += 1 if self.count >= num_pixels: self.count = 0 i = self.count // self.step for j in range(8): pixels[i - j] = colours.hsv2rgb( int(360 / num_pixels) * (i - j), 255, 256 * (1/2 ** j) - 1 ) pixels[i - 8] = (0, 0, 0) rainbow = Rainbow() class StatusRing: def start(self, clock): pixels.fill((0, 0, 0)) clock.brightness = 0.1 pixels.brightness = clock.brightness def loop(self, clock): if any(clock.batteries.values()): pixels[0] = (255, 0, 0) else: pixels[0] = (0, 0, 32) status_ring = StatusRing() class ClockFace: offset_hours = 1 offset_min = 0 def start(self, clock): print("Starting clock face mode") clock.brightness = 0.1 pixels.brightness = clock.brightness pixels.fill((0, 0, 0)) def loop(self, clock): now = time.localtime() hour = (now.tm_hour + self.offset_hours) % 12 minutes = (now.tm_min + self.offset_min) % 60 // 5 pixels.fill((0, 0, 0)) if hour == minutes: pixels[hour] = (64, 0, 64) else: pixels[hour] = (0, 0, 64) pixels[minutes] = (64, 0, 0) clock_face = ClockFace() class GD_Clock: brightness = 0.1 current_mode = clock_face batteries = {} def connected(self, client, userdata, flags, rc): print("Connected to MQTT") client.subscribe(CMD_FEED + "/#") client.subscribe(SENS_FEED + "/#") def disconnected(self, client, topic, message): time.sleep(10) client.connect() def parse_cmd(self, topic, message): print(f"New cmd on topic {topic}: {message}") try: payload = json.loads(message) except ValueError: return None cmd = payload.get("cmd", None) value = payload.get("value", None) kwargs = payload.get("kwargs", {}) if cmd == "mode": if value == "clock": self.current_mode = clock_face self.current_mode.start(self) if value == "flashlight": self.current_mode = flashlight self.current_mode.start(self) elif value == "rainbow": self.current_mode = rainbow self.current_mode.start(self) elif value == "status": self.current_mode = status_ring self.current_mode.start(self) elif cmd == "brightness": if value == "+": self.brightness = min(1.0, self.brightness + 0.1) elif value == "-": self.brightness = max(0.1, self.brightness - 0.1) else: try: bright = float(value) except ValueError: return None self.brightness = min(0.1, max(1.0, bright)) print("Setting brightness to", self.brightness) pixels.brightness = self.brightness def parse_sensor(self, topic, message): print(f"New sensor message on topic {topic}: {message}") hm, room, sensor = topic.split("/") if sensor == "batt_low": try: self.batteries[room] = bool(int(message)) except ValueError: pass def message(self, client, topic, message): if topic.startswith(CMD_FEED): self.parse_cmd(topic, message) elif topic.startswith(SENS_FEED): self.parse_sensor(topic, message) # ****** Main code ********************************************************** # print("Connecting to WiFi") try: wifi.radio.connect( os.getenv('CIRCUITPY_WIFI_SSID'), os.getenv('CIRCUITPY_WIFI_PASSWORD') ) except TypeError: print("Could not find WiFi info. Check your settings.toml file!") raise print("Connected to WiFi") pool = adafruit_connection_manager.get_radio_socketpool(wifi.radio) ntp = adafruit_ntp.NTP(pool, tz_offset=0, cache_seconds=3600) rtc.RTC().datetime = ntp.datetime mqtt_client = MQTT.MQTT( broker=os.getenv('MQTT_HOST'), socket_pool=pool, ) clock = GD_Clock() mqtt_client.on_connect = clock.connected mqtt_client.on_disconnect = clock.disconnected mqtt_client.on_message = clock.message mqtt_client.connect() clock.current_mode.start(clock) while True: mqtt_client.loop(timeout=1) clock.current_mode.loop(clock) time.sleep(1)