import datetime import logging import paho.mqtt.client as mqtt import slugify from django.conf import settings from . import models log = logging.getLogger(__name__) class MQTTClient: def __init__(self): self.reconnect = True self.connected = False self.client = mqtt.Client() self.client.on_connect = self.on_connect self.client.on_disconnect = self.on_disconnect self.client.on_message = self.on_message def loop_forever(self): self.connect() self.client.loop_forever() def loop_start(self): self.connect() self.client.loop_start() def loop_stop(self): self.disconnect(reconnect=False) self.client.loop_stop() def connect(self): try: self.client.connect( settings.MQTT_SERVER, settings.MQTT_PORT, 60, # TODO: make the keepalive configurable ) except OSError as e: log.warning("Could not connect to MQTT server") log.warning(e) def disconnect(self, reconnect: bool = True): self.reconnect = reconnect self.client.disconnect() def on_connect(self, client, userdata, flags, rc): log.info("Connected to MQTT") self.connected = True client.subscribe(settings.MQTT_TOPIC + "#") def on_disconnect(self, client, userdata, rc): log.info("Disconnected from MQTT") self.connected = False if self.reconnect: self.connect() def on_message(self, client, userdata, msg): log.debug( "Received msg %s %s", msg.topic, msg.payload.decode() ) ts = int(datetime.datetime.now().timestamp()) topic = msg.topic.removeprefix(settings.MQTT_TOPIC) dss = models.DataSource.objects.filter(topic=topic) if not dss: dss = (models.DataSource.objects.create( topic=topic, path=slugify.slugify(topic) + ".rrd", ),) for ds in dss: ds.update(ts, msg.payload.decode())