From 27bc8de7c7f8e29ba10324b56fef9fc4e80a9fd6 Mon Sep 17 00:00:00 2001 From: Elena ``of Valhalla'' Grandi Date: Sat, 9 Dec 2023 15:26:02 +0100 Subject: Start recording mqtt data in rss. --- rrd/models.py | 53 ++++++++++++++++++++++++++++++++++++++++--- rrd/mqtt.py | 30 ++++++++++++++++-------- rrd/tests/test_datasources.py | 52 ++++++++++++++++++++++++++++++++++++++++++ rrd/tests/test_django.py | 5 ---- rrd/tests/test_mqtt.py | 24 +++++++++++++++++++- rrd/views.py | 4 +++- 6 files changed, 149 insertions(+), 19 deletions(-) create mode 100644 rrd/tests/test_datasources.py delete mode 100644 rrd/tests/test_django.py (limited to 'rrd') diff --git a/rrd/models.py b/rrd/models.py index 6adc17f..8d995c8 100644 --- a/rrd/models.py +++ b/rrd/models.py @@ -1,7 +1,13 @@ +import logging +import os + import django.contrib.auth.models as amodels +import rrdtool from django.conf import settings from django.db import models +log = logging.getLogger(__name__) + class ModelWithPerms(models.Model): users_read = models.ManyToManyField( @@ -30,29 +36,70 @@ class DataSource(ModelWithPerms): # which is probably too much for a sensible db topic = models.CharField(max_length=512) path = models.FilePathField( - path=settings.RRD_DB_PATH, + path=settings.RRD_DB_PATH.as_posix(), recursive=True, max_length=512, ) - rrd_config = models.TextField() + rrd_config = models.TextField( + default=settings.RRD_DS_CONFIG + ) active = models.BooleanField( default=True, ) + def __str__(self): + return self.topic + + @property + def lastupdate(self): + try: + last = rrdtool.lastupdate(os.path.join( + settings.RRD_DB_PATH, self.path + )) + except rrdtool.OperationalError as e: + log.warning("Failure reading from ds: %s", e) + return (None, None) + else: + return last["date"], last["ds"][self.topic.split("/")[-1]] + + def update(self, ts, value): + rrd_path = os.path.join(settings.RRD_DB_PATH, self.path) + if not os.path.isfile(rrd_path): + rrdtool.create( + os.path.join(settings.RRD_DB_PATH, self.path), + "--no-overwrite", + self.rrd_config.format( + ds_name=self.topic.split("/")[-1] + ).strip().split('\n'), + ) + try: + rrdtool.update( + os.path.join(settings.RRD_DB_PATH, self.path), + str(ts) + ":" + str(value) + ) + except ValueError as e: + log.warning("Could not update ds: %s", e) + class Graph(ModelWithPerms): title = models.CharField(max_length=64) data_sources = models.ManyToManyField(DataSource) path = models.FilePathField( - path=settings.RRD_GRAPH_PATH, + path=settings.RRD_GRAPH_PATH.as_posix(), recursive=True, max_length=512, ) rrd_config = models.TextField() + def __str__(self): + return self.title + class Dashboard(ModelWithPerms): title = models.CharField(max_length=64) graphs = models.ManyToManyField(Graph) data_sources = models.ManyToManyField(DataSource) template = models.TextField() + + def __str__(self): + return self.title diff --git a/rrd/mqtt.py b/rrd/mqtt.py index de679ce..09589c3 100644 --- a/rrd/mqtt.py +++ b/rrd/mqtt.py @@ -1,7 +1,11 @@ +import datetime import logging -import django.conf import paho.mqtt.client as mqtt +import slugify +from django.conf import settings + +from . import models log = logging.getLogger(__name__) @@ -31,13 +35,13 @@ class MQTTClient: def connect(self): try: self.client.connect( - django.conf.settings.MQTT_SERVER, - django.conf.settings.MQTT_PORT, + settings.MQTT_SERVER, + settings.MQTT_PORT, 60, # TODO: make the keepalive configurable ) except OSError as e: - log.debug("Could not connect to MQTT server") - log.debug(e) + log.warning("Could not connect to MQTT server") + log.warning(e) def disconnect(self, reconnect: bool = True): self.reconnect = reconnect @@ -46,7 +50,7 @@ class MQTTClient: def on_connect(self, client, userdata, flags, rc): log.info("Connected to MQTT") self.connected = True - client.subscribe(django.conf.settings.MQTT_TOPIC) + client.subscribe(settings.MQTT_TOPIC + "#") def on_disconnect(self, client, userdata, rc): log.info("Disconnected from MQTT") @@ -55,9 +59,17 @@ class MQTTClient: self.connect() def on_message(self, client, userdata, msg): - print( - "Received msg %s %s", msg.topic, msg.payload.decode() - ) log.debug( "Received msg %s %s", msg.topic, msg.payload.decode() ) + ts = int(datetime.datetime.now().timestamp()) + topic = msg.topic.removeprefix(settings.MQTT_TOPIC) + dss = models.DataSource.objects.filter(topic=topic) + if not dss: + dss = [models.DataSource.objects.create( + topic=topic, + path=slugify.slugify(topic) + ".rrd", + )] + dss[0].save() + for ds in dss: + ds.update(ts, msg.payload.decode()) diff --git a/rrd/tests/test_datasources.py b/rrd/tests/test_datasources.py new file mode 100644 index 0000000..4057b14 --- /dev/null +++ b/rrd/tests/test_datasources.py @@ -0,0 +1,52 @@ +import datetime +import os + +from django.conf import settings +from django.test import TestCase + +from .. import models + + +class TestDataSource(TestCase): + def setUp(self): + self.rrd_file = "test.rrd" + try: + os.remove(os.path.join(settings.RRD_DB_PATH, self.rrd_file)) + except FileNotFoundError: + pass + + def tearDown(self): + try: + os.remove(os.path.join(settings.RRD_DB_PATH, self.rrd_file)) + except FileNotFoundError: + pass + + def test_create_with_defaults(self): + os.makedirs(settings.RRD_DB_PATH, exist_ok=True) + ds = models.DataSource.objects.create( + topic="test", + path=self.rrd_file, + ) + self.assertTrue(ds.active) + self.assertIn("RRA:AVERAGE:0.5:1:2016", ds.rrd_config) + + def test_read_empty_ds(self): + ds = models.DataSource.objects.create( + topic="test", + path=self.rrd_file, + ) + with self.assertLogs() as cm: + self.assertEqual(ds.lastupdate, (None, None)) + self.assertIn("WARNING:rrd.models:Failure", cm.output[0]) + + def test_update(self): + ds = models.DataSource.objects.create( + topic="test", + path=self.rrd_file, + ) + now = datetime.datetime.now() + ts = int(now.timestamp()) + ds.update(ts, 10) + last = ds.lastupdate + self.assertEqual(last[1], 10) + self.assertEqual(last[0].year, now.year) diff --git a/rrd/tests/test_django.py b/rrd/tests/test_django.py deleted file mode 100644 index e0fbb45..0000000 --- a/rrd/tests/test_django.py +++ /dev/null @@ -1,5 +0,0 @@ -from django.test import TestCase - -# Create your tests here. - - diff --git a/rrd/tests/test_mqtt.py b/rrd/tests/test_mqtt.py index 0a7b90d..66ced97 100644 --- a/rrd/tests/test_mqtt.py +++ b/rrd/tests/test_mqtt.py @@ -1,8 +1,10 @@ +import os import time import django.test +from django.conf import settings -from .. import mqtt +from .. import models, mqtt class TestMQTT(django.test.TestCase): @@ -12,9 +14,12 @@ class TestMQTT(django.test.TestCase): time.sleep(0.1) if not self.mqtt.connected: self.skipTest("Could not find an mqtt server") + self.files = [] def tearDown(self): self.mqtt.loop_stop() + for path in self.files: + os.remove(path) def test_disconnect(self): # after disconnecting from the mqtt server, we should @@ -29,3 +34,20 @@ class TestMQTT(django.test.TestCase): time.sleep(2) self.assertFalse(self.mqtt.connected) self.assertFalse(self.mqtt.reconnect) + + def test_receive_data(self): + os.makedirs(settings.RRD_DB_PATH, exist_ok=True) + self.files.append(os.path.join(settings.RRD_DB_PATH, "test.rrd")) + self.mqtt.client.publish( + topic=settings.MQTT_TOPIC + "test", + payload="10", + ) + time.sleep(1) + ds = models.DataSource.objects.get(topic="test") + self.assertEqual(ds.lastupdate[1], 10) + + def test_receive_data_non_existing_ds(self): + pass + + def test_receive_invalid_data(self): + pass diff --git a/rrd/views.py b/rrd/views.py index 5357b77..99195b9 100644 --- a/rrd/views.py +++ b/rrd/views.py @@ -1,7 +1,9 @@ import django.http -from django.shortcuts import render + +# from django.shortcuts import render # Create your views here. + def index(request): return django.http.HttpResponse("Hello, World") -- cgit v1.2.3