blob: 4ea08529dfb7e867a1706bd13a93841ab0bb15bf (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
import time
import django.test
from .. import mqtt
class TestMQTT(django.test.TestCase):
def setUp(self):
self.mqtt = mqtt.MQTTClient()
self.mqtt.loop_start()
time.sleep(0.1)
if not self.mqtt.connected:
self.skipTest("Could not find an mqtt server")
def tearDown(self):
self.mqtt.loop_stop()
def test_disconnect(self):
# after disconnecting from the mqtt server, we should
# automatically reconnect
self.mqtt.disconnect()
time.sleep(2)
self.assertTrue(self.mqtt.connected)
def test_disconnect_and_stay(self):
# unless we really want to force a disconnection
self.mqtt.disconnect(reconnect=False)
time.sleep(2)
self.assertFalse(self.mqtt.connected)
self.assertFalse(self.mqtt.reconnect)
|