From ef6bd01d5dddc184ca27462648713925839030fe Mon Sep 17 00:00:00 2001 From: Elena ``of Valhalla'' Grandi Date: Sun, 23 Jul 2023 11:25:05 +0200 Subject: Test connecting and disconnecting from MQTT --- tests/test_mqtt.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 tests/test_mqtt.py (limited to 'tests/test_mqtt.py') diff --git a/tests/test_mqtt.py b/tests/test_mqtt.py new file mode 100644 index 0000000..20c1cf2 --- /dev/null +++ b/tests/test_mqtt.py @@ -0,0 +1,32 @@ +import time +import unittest + +from kerbana import config, create_app, mqtt + + +class TestMQTT(unittest.TestCase): + def setUp(self): + test_config = config.TestConfig() + self.app = create_app(test_config) + self.mqtt = mqtt.MQTTClient(self.app) + self.mqtt.connect() + time.sleep(0.1) + if not self.mqtt.connected: + self.skipTest("Could not find an mqtt server") + + def tearDown(self): + self.mqtt.disconnect(reconnect=False) + + def test_disconnect(self): + # after disconnecting from the mqtt server, we should + # automatically reconnect + self.mqtt.disconnect() + time.sleep(2) + self.assertTrue(self.mqtt.connected) + + def test_disconnect_and_stay(self): + # unless we really want to force a disconnection + self.mqtt.disconnect(reconnect=False) + time.sleep(2) + self.assertFalse(self.mqtt.connected) + self.assertFalse(self.mqtt.reconnect) -- cgit v1.2.3