1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
import os
import tempfile
import unittest
from tornado.testing import AsyncTestCase, gen_test
from pyapd import ctl, config, app
class TestCtlServerCommands(AsyncTestCase):
def setUp(self):
super().setUp()
self.tempdir = tempfile.TemporaryDirectory()
test_app = app.App(config.Config('tests/data/test_config.yaml'))
test_app.config.ctl_socket = os.path.join(
self.tempdir.name,
'pyapd.sock'
)
self.ctl_server = ctl.CtlServer(test_app)
@gen_test
def test_ping_command(self):
res = yield self.ctl_server.get_command('{"command": "ping"}')
self.assertIn('pong', res)
@gen_test
def test_add_object_command(self):
self.assertEqual(len(self.ctl_server.app.store.objects), 0)
obj = '''
{
"id": "http://example.org/123456",
"type": "object"
}
'''
res = yield (self.ctl_server.get_command('''
{{
"command": "add_object",
"obj": {obj}
}}
'''.format(obj=obj)
))
self.assertIn('ok', res)
self.assertEqual(len(self.ctl_server.app.store.objects), 1)
class TestCtlClient(unittest.TestCase):
def setUp(self):
self.client = ctl.CtlClient(
config.Config('tests/data/test_config.yaml')
)
def test_cmd_ping(self):
cmd = self.client.ping()
self.assertEqual(cmd, {"command": "ping"})
def test_cmd_add_object(self):
obj = {
"id": "http://example.org/123456",
"type": "object",
}
cmd = self.client.add_object(obj)
self.assertEqual(cmd['command'], 'add_object')
self.assertEqual(cmd['obj'], obj)
|