aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore1
-rw-r--r--README.rst6
-rw-r--r--kerbana/__init__.py16
-rw-r--r--kerbana/mqtt.py9
-rw-r--r--tests/test_mqtt.py32
5 files changed, 59 insertions, 5 deletions
diff --git a/.gitignore b/.gitignore
index 8a37618..50132af 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,5 +2,6 @@
.coverage
kerbana.toml
+kerbana_tests.toml
.*.sw?
diff --git a/README.rst b/README.rst
index de03a71..406f785 100644
--- a/README.rst
+++ b/README.rst
@@ -10,6 +10,12 @@ Installation
Contributing
------------
+Running Tests
+-------------
+
+To run the MQTT integration tests, create a file ``kerbana_tests.toml``
+in the base directory with the variable ``MQTT_SERVER`` pointing to a
+valid mqtt server you can use.
License
-------
diff --git a/kerbana/__init__.py b/kerbana/__init__.py
index 8d47d35..7e30c2d 100644
--- a/kerbana/__init__.py
+++ b/kerbana/__init__.py
@@ -44,9 +44,23 @@ def create_app(test_config: Optional[config.Config] = None):
app.config.from_prefixed_env(prefix="KERBANA_")
else:
app.config.from_object(test_config)
+ try:
+ app.config.from_file(
+ os.path.join(
+ os.path.dirname(os.path.abspath(__file__)),
+ "..",
+ "kerbana_tests.toml",
+ ),
+ load=toml.load,
+ )
+ except FileNotFoundError:
+ app.logger.debug("File kerbana_tests.toml not found.")
mqtt_client = mqtt.MQTTClient(app)
- mqtt_client.connect()
+
+ if test_config is None:
+ # If we're running tests, do not start the MQTTClient
+ mqtt_client.connect()
@app.route('/')
def root():
diff --git a/kerbana/mqtt.py b/kerbana/mqtt.py
index 9747de3..3177287 100644
--- a/kerbana/mqtt.py
+++ b/kerbana/mqtt.py
@@ -26,21 +26,22 @@ class MQTTClient:
self.client.loop_start()
def disconnect(self, reconnect: bool = True):
+ self.reconnect = reconnect
self.client.loop_stop()
self.client.disconnect()
- self.reconnect = reconnect
def on_connect(self, client, userdata, flags, rc):
- self.app.logger.warning("Connected to MQTT")
+ self.app.logger.info("Connected to MQTT")
self.connected = True
client.subscribe(self.app.config["MQTT_TOPIC"])
- def on_disconnect(self, client, userdata, flags, rc):
+ def on_disconnect(self, client, userdata, rc):
+ self.app.logger.info("Disconnected from MQTT")
self.connected = False
if self.reconnect:
self.connect()
def on_message(self, client, userdata, msg):
- self.app.logger.warning(
+ self.app.logger.debug(
"Received msg %s %s", msg.topic, msg.payload.decode()
)
diff --git a/tests/test_mqtt.py b/tests/test_mqtt.py
new file mode 100644
index 0000000..20c1cf2
--- /dev/null
+++ b/tests/test_mqtt.py
@@ -0,0 +1,32 @@
+import time
+import unittest
+
+from kerbana import config, create_app, mqtt
+
+
+class TestMQTT(unittest.TestCase):
+ def setUp(self):
+ test_config = config.TestConfig()
+ self.app = create_app(test_config)
+ self.mqtt = mqtt.MQTTClient(self.app)
+ self.mqtt.connect()
+ time.sleep(0.1)
+ if not self.mqtt.connected:
+ self.skipTest("Could not find an mqtt server")
+
+ def tearDown(self):
+ self.mqtt.disconnect(reconnect=False)
+
+ def test_disconnect(self):
+ # after disconnecting from the mqtt server, we should
+ # automatically reconnect
+ self.mqtt.disconnect()
+ time.sleep(2)
+ self.assertTrue(self.mqtt.connected)
+
+ def test_disconnect_and_stay(self):
+ # unless we really want to force a disconnection
+ self.mqtt.disconnect(reconnect=False)
+ time.sleep(2)
+ self.assertFalse(self.mqtt.connected)
+ self.assertFalse(self.mqtt.reconnect)