1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
|
from time import time as now
from pipelayer import PipeLayer, InvalidPacket
from pipelayer import FLAG_RANGE_START, FLAG_RANGE_STOP
import socket, struct
SOU_RANGE_START = FLAG_RANGE_START
SOU_MIXED_DATA = FLAG_RANGE_STOP + 0
SOU_SHUTDOWN = FLAG_RANGE_STOP + 1
SOU_RANGE_STOP = FLAG_RANGE_STOP + 2
SHUTDOWN_PACKET = chr(SOU_SHUTDOWN) + '**' # < 4 characters
CONGESTION_TIMEOUT = 20.0
#CONSOLIDATE_DELAY = 0.1
class SocketOverUdp(object):
RECV_CAN_RETURN_EMPTY = True
PACKETSIZE = 996
MIXEDPACKETSIZE = 1080
def __init__(self, udpsock, initialcrcs):
self.udpsock = udpsock
self.pl = PipeLayer(initialcrcs)
self.congested_since = None
#self.consolidate_sends = None
#self.encode_delayed_until = now()
def close(self):
try:
self.udpsock.send(SHUTDOWN_PACKET)
except socket.error:
pass
self.udpsock.close()
def _progress(self):
if self.pl.settime(now()) == 0.0:
self._encode()
def _encode(self):
#if self.consolidate_sends:
# if self.pl.cur_time < self.encode_delayed_until:
# return False
# self.encode_delayed_until = self.pl.cur_time + CONSOLIDATE_DELAY
packet = self.pl.encode(self.PACKETSIZE)
if packet is not None:
#print 'send:', repr(packet)
if self.pl.is_congested():
if self.congested_since is None:
self.congested_since = now()
else:
if now() > self.congested_since + CONGESTION_TIMEOUT:
self.udpsock.send(SHUTDOWN_PACKET)
raise socket.error("peer not responding, timing out")
else:
self.congested_since = None
#print repr(packet[:10])
#print "out:", len(packet)
#print ' ---'
self.udpsock.send(packet)
def _decode(self, packet):
try:
data = self.pl.decode(packet)
#print ' ~~~'
return data
except InvalidPacket:
if len(packet) >= 4:
hdr, reserved, size = struct.unpack("!BBH", packet[:4])
if hdr == SOU_MIXED_DATA:
#print ' ~~~[unmix%d/%d]' % (len(packet[4+size:]),
# len(packet))
self.udp_over_udp_decoder(packet[4:4+size])
return self._decode(packet[4+size:])
else:
# non-tiny packets with no recognized hdr byte are
# assumed to be pure video traffic
#print ' ~~~[video]'
self.udp_over_udp_decoder(packet)
return ''
elif packet == SHUTDOWN_PACKET:
raise socket.error("received an end-of-connexion packet")
else:
#print ' ~~~[INVALID%d]' % (len(packet),)
return ''
def fileno(self):
self._progress()
return self.udpsock.fileno()
def flush(self):
while self.pl.settime(now()) == 0.0:
#self.encode_delayed_until = self.pl.cur_time
self._encode()
def recv(self, _ignoredbufsize=None):
#print 'recv:'
packet = self.udpsock.recv(65535)
#print " in:", len(packet), hex(ord(packet[0]))
#print repr(packet)
self.pl.settime(now())
data = self._decode(packet)
#print 'which is really', repr(data)
self._encode()
#if data:
# print " IN:", len(data)
return data
def sendall(self, data):
#print 'queuing', repr(data)
#print ' OUT:', len(data)
self.pl.queue(data)
#self._progress()
return len(data)
send = sendall
def send_video_data(self, udpdata):
forced_embedded = SOU_RANGE_START <= ord(udpdata[0]) < SOU_RANGE_STOP
self.pl.settime(now())
packet = self.pl.encode(self.PACKETSIZE) or ''
if not forced_embedded and not packet:
# no PipeLayer packet, send as plain udp data
datagram = udpdata
elif len(packet) + len(udpdata) <= self.MIXEDPACKETSIZE:
# fits in a single mixed data packet
datagram = (struct.pack("!BBH", SOU_MIXED_DATA, 0, len(udpdata))
+ udpdata + packet)
#print ' ---[mix%d/%d]' % (len(packet), len(datagram))
else:
# two packets needed
#print repr(packet[:10])
#print "out:", len(packet)
#print ' ---'
self.udpsock.send(packet)
datagram = udpdata
#print repr(datagram[:10])
#print "out:", len(datagram), hex(ord(datagram[0]))
self.udpsock.send(datagram)
#self.encode_delayed_until = self.pl.cur_time + CONSOLIDATE_DELAY
#if self.consolidate_sends is None:
# self.consolidate_sends = True
return len(udpdata)
def udp_over_udp_mixer(self):
return UdpOverUdpMixer(self)
def udp_over_udp_decoder(self, data):
pass # method overridden by pclient.py
def getpeername(self):
return self.udpsock.getpeername()
def getsockname(self):
return self.udpsock.getsockname()
def setsockopt(self, level, opt, value):
# note that TCP_NODELAY is set by the bub-n-bros client, not the server
#if level == socket.SOL_TCP and opt == socket.TCP_NODELAY:
# self.consolidate_sends = not value
#else:
# ignored
pass
def setblocking(self, _ignored):
pass # XXX good enough for common/gamesrv.py
class UdpOverUdpMixer(object):
def __init__(self, sockoverudp):
self.send = sockoverudp.send_video_data
def setsockopt(self, *args):
pass # ignored
|