summaryrefslogtreecommitdiff
path: root/metaserver/socketoverudp.py
blob: 14565c8e5ee89807094f757332561bc7e93f2d32 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
from time import time as now
from .pipelayer import PipeLayer, InvalidPacket
from .pipelayer import FLAG_RANGE_START, FLAG_RANGE_STOP
import socket, struct

SOU_RANGE_START = FLAG_RANGE_START
SOU_MIXED_DATA  = FLAG_RANGE_STOP + 0
SOU_SHUTDOWN    = FLAG_RANGE_STOP + 1
SOU_RANGE_STOP  = FLAG_RANGE_STOP + 2

SHUTDOWN_PACKET = chr(SOU_SHUTDOWN) + '**'    #  < 4 characters

CONGESTION_TIMEOUT = 20.0
#CONSOLIDATE_DELAY  = 0.1


class SocketOverUdp(object):
    RECV_CAN_RETURN_EMPTY = True
    PACKETSIZE = 996
    MIXEDPACKETSIZE = 1080

    def __init__(self, udpsock, initialcrcs):
        self.udpsock = udpsock
        self.pl = PipeLayer(initialcrcs)
        self.congested_since = None
        #self.consolidate_sends = None
        #self.encode_delayed_until = now()

    def close(self):
        try:
            self.udpsock.send(SHUTDOWN_PACKET)
        except socket.error:
            pass
        self.udpsock.close()

    def _progress(self):
        if self.pl.settime(now()) == 0.0:
            self._encode()

    def _encode(self):
        #if self.consolidate_sends:
        #    if self.pl.cur_time < self.encode_delayed_until:
        #        return False
        #    self.encode_delayed_until = self.pl.cur_time + CONSOLIDATE_DELAY
        packet = self.pl.encode(self.PACKETSIZE)
        if packet is not None:
            #print 'send:', repr(packet)
            if self.pl.is_congested():
                if self.congested_since is None:
                    self.congested_since = now()
                else:
                    if now() > self.congested_since + CONGESTION_TIMEOUT:
                        self.udpsock.send(SHUTDOWN_PACKET)
                        raise socket.error("peer not responding, timing out")
            else:
                self.congested_since = None
            #print repr(packet[:10])
            #print "out:", len(packet)
            #print ' ---'
            self.udpsock.send(packet)

    def _decode(self, packet):
        try:
            data = self.pl.decode(packet)
            #print ' ~~~'
            return data
        except InvalidPacket:
            if len(packet) >= 4:
                hdr, reserved, size = struct.unpack("!BBH", packet[:4])
                if hdr == SOU_MIXED_DATA:
                    #print ' ~~~[unmix%d/%d]' % (len(packet[4+size:]),
                    #                            len(packet))
                    self.udp_over_udp_decoder(packet[4:4+size])
                    return self._decode(packet[4+size:])
                else:
                    # non-tiny packets with no recognized hdr byte are
                    # assumed to be pure video traffic
                    #print ' ~~~[video]'
                    self.udp_over_udp_decoder(packet)
                    return ''
            elif packet == SHUTDOWN_PACKET:
                raise socket.error("received an end-of-connexion packet")
            else:
                #print ' ~~~[INVALID%d]' % (len(packet),)
                return ''

    def fileno(self):
        self._progress()
        return self.udpsock.fileno()

    def flush(self):
        while self.pl.settime(now()) == 0.0:
            #self.encode_delayed_until = self.pl.cur_time
            self._encode()

    def recv(self, _ignoredbufsize=None):
        #print 'recv:'
        packet = self.udpsock.recv(65535)
        #print "                 in:", len(packet), hex(ord(packet[0]))
        #print repr(packet)
        self.pl.settime(now())
        data = self._decode(packet)
        #print 'which is really', repr(data)
        self._encode()
        #if data:
        #    print "                              IN:", len(data)
        return data

    def sendall(self, data):
        #print 'queuing', repr(data)
        #print '                        OUT:', len(data)
        self.pl.queue(data)
        #self._progress()
        return len(data)

    send = sendall

    def send_video_data(self, udpdata):
        forced_embedded = SOU_RANGE_START <= ord(udpdata[0]) < SOU_RANGE_STOP
        self.pl.settime(now())
        packet = self.pl.encode(self.PACKETSIZE) or ''
        if not forced_embedded and not packet:
            # no PipeLayer packet, send as plain udp data
            datagram = udpdata
        elif len(packet) + len(udpdata) <= self.MIXEDPACKETSIZE:
            # fits in a single mixed data packet
            datagram = (struct.pack("!BBH", SOU_MIXED_DATA, 0, len(udpdata))
                        + udpdata + packet)
            #print ' ---[mix%d/%d]' % (len(packet), len(datagram))
        else:
            # two packets needed
            #print repr(packet[:10])
            #print "out:", len(packet)
            #print ' ---'
            self.udpsock.send(packet)
            datagram = udpdata
        #print repr(datagram[:10])
        #print "out:", len(datagram), hex(ord(datagram[0]))
        self.udpsock.send(datagram)
        #self.encode_delayed_until = self.pl.cur_time + CONSOLIDATE_DELAY
        #if self.consolidate_sends is None:
        #    self.consolidate_sends = True
        return len(udpdata)

    def udp_over_udp_mixer(self):
        return UdpOverUdpMixer(self)

    def udp_over_udp_decoder(self, data):
        pass    # method overridden by pclient.py

    def getpeername(self):
        return self.udpsock.getpeername()

    def getsockname(self):
        return self.udpsock.getsockname()

    def setsockopt(self, level, opt, value):
        # note that TCP_NODELAY is set by the bub-n-bros client, not the server
        #if level == socket.SOL_TCP and opt == socket.TCP_NODELAY:
        #    self.consolidate_sends = not value
        #else:
        #   ignored
        pass

    def setblocking(self, _ignored):
        pass   # XXX good enough for common/gamesrv.py


class UdpOverUdpMixer(object):
    def __init__(self, sockoverudp):
        self.send = sockoverudp.send_video_data

    def setsockopt(self, *args):
        pass   # ignored