summaryrefslogtreecommitdiff
path: root/metaserver/socketoverudp.py
diff options
context:
space:
mode:
Diffstat (limited to 'metaserver/socketoverudp.py')
-rw-r--r--metaserver/socketoverudp.py174
1 files changed, 174 insertions, 0 deletions
diff --git a/metaserver/socketoverudp.py b/metaserver/socketoverudp.py
new file mode 100644
index 0000000..2df4a75
--- /dev/null
+++ b/metaserver/socketoverudp.py
@@ -0,0 +1,174 @@
+from time import time as now
+from pipelayer import PipeLayer, InvalidPacket
+from pipelayer import FLAG_RANGE_START, FLAG_RANGE_STOP
+import socket, struct
+
+SOU_RANGE_START = FLAG_RANGE_START
+SOU_MIXED_DATA = FLAG_RANGE_STOP + 0
+SOU_SHUTDOWN = FLAG_RANGE_STOP + 1
+SOU_RANGE_STOP = FLAG_RANGE_STOP + 2
+
+SHUTDOWN_PACKET = chr(SOU_SHUTDOWN) + '**' # < 4 characters
+
+CONGESTION_TIMEOUT = 20.0
+#CONSOLIDATE_DELAY = 0.1
+
+
+class SocketOverUdp(object):
+ RECV_CAN_RETURN_EMPTY = True
+ PACKETSIZE = 996
+ MIXEDPACKETSIZE = 1080
+
+ def __init__(self, udpsock, initialcrcs):
+ self.udpsock = udpsock
+ self.pl = PipeLayer(initialcrcs)
+ self.congested_since = None
+ #self.consolidate_sends = None
+ #self.encode_delayed_until = now()
+
+ def close(self):
+ try:
+ self.udpsock.send(SHUTDOWN_PACKET)
+ except socket.error:
+ pass
+ self.udpsock.close()
+
+ def _progress(self):
+ if self.pl.settime(now()) == 0.0:
+ self._encode()
+
+ def _encode(self):
+ #if self.consolidate_sends:
+ # if self.pl.cur_time < self.encode_delayed_until:
+ # return False
+ # self.encode_delayed_until = self.pl.cur_time + CONSOLIDATE_DELAY
+ packet = self.pl.encode(self.PACKETSIZE)
+ if packet is not None:
+ #print 'send:', repr(packet)
+ if self.pl.is_congested():
+ if self.congested_since is None:
+ self.congested_since = now()
+ else:
+ if now() > self.congested_since + CONGESTION_TIMEOUT:
+ self.udpsock.send(SHUTDOWN_PACKET)
+ raise socket.error("peer not responding, timing out")
+ else:
+ self.congested_since = None
+ #print repr(packet[:10])
+ #print "out:", len(packet)
+ #print ' ---'
+ self.udpsock.send(packet)
+
+ def _decode(self, packet):
+ try:
+ data = self.pl.decode(packet)
+ #print ' ~~~'
+ return data
+ except InvalidPacket:
+ if len(packet) >= 4:
+ hdr, reserved, size = struct.unpack("!BBH", packet[:4])
+ if hdr == SOU_MIXED_DATA:
+ #print ' ~~~[unmix%d/%d]' % (len(packet[4+size:]),
+ # len(packet))
+ self.udp_over_udp_decoder(packet[4:4+size])
+ return self._decode(packet[4+size:])
+ else:
+ # non-tiny packets with no recognized hdr byte are
+ # assumed to be pure video traffic
+ #print ' ~~~[video]'
+ self.udp_over_udp_decoder(packet)
+ return ''
+ elif packet == SHUTDOWN_PACKET:
+ raise socket.error("received an end-of-connexion packet")
+ else:
+ #print ' ~~~[INVALID%d]' % (len(packet),)
+ return ''
+
+ def fileno(self):
+ self._progress()
+ return self.udpsock.fileno()
+
+ def flush(self):
+ while self.pl.settime(now()) == 0.0:
+ #self.encode_delayed_until = self.pl.cur_time
+ self._encode()
+
+ def recv(self, _ignoredbufsize=None):
+ #print 'recv:'
+ packet = self.udpsock.recv(65535)
+ #print " in:", len(packet), hex(ord(packet[0]))
+ #print repr(packet)
+ self.pl.settime(now())
+ data = self._decode(packet)
+ #print 'which is really', repr(data)
+ self._encode()
+ #if data:
+ # print " IN:", len(data)
+ return data
+
+ def sendall(self, data):
+ #print 'queuing', repr(data)
+ #print ' OUT:', len(data)
+ self.pl.queue(data)
+ #self._progress()
+ return len(data)
+
+ send = sendall
+
+ def send_video_data(self, udpdata):
+ forced_embedded = SOU_RANGE_START <= ord(udpdata[0]) < SOU_RANGE_STOP
+ self.pl.settime(now())
+ packet = self.pl.encode(self.PACKETSIZE) or ''
+ if not forced_embedded and not packet:
+ # no PipeLayer packet, send as plain udp data
+ datagram = udpdata
+ elif len(packet) + len(udpdata) <= self.MIXEDPACKETSIZE:
+ # fits in a single mixed data packet
+ datagram = (struct.pack("!BBH", SOU_MIXED_DATA, 0, len(udpdata))
+ + udpdata + packet)
+ #print ' ---[mix%d/%d]' % (len(packet), len(datagram))
+ else:
+ # two packets needed
+ #print repr(packet[:10])
+ #print "out:", len(packet)
+ #print ' ---'
+ self.udpsock.send(packet)
+ datagram = udpdata
+ #print repr(datagram[:10])
+ #print "out:", len(datagram), hex(ord(datagram[0]))
+ self.udpsock.send(datagram)
+ #self.encode_delayed_until = self.pl.cur_time + CONSOLIDATE_DELAY
+ #if self.consolidate_sends is None:
+ # self.consolidate_sends = True
+ return len(udpdata)
+
+ def udp_over_udp_mixer(self):
+ return UdpOverUdpMixer(self)
+
+ def udp_over_udp_decoder(self, data):
+ pass # method overridden by pclient.py
+
+ def getpeername(self):
+ return self.udpsock.getpeername()
+
+ def getsockname(self):
+ return self.udpsock.getsockname()
+
+ def setsockopt(self, level, opt, value):
+ # note that TCP_NODELAY is set by the bub-n-bros client, not the server
+ #if level == socket.SOL_TCP and opt == socket.TCP_NODELAY:
+ # self.consolidate_sends = not value
+ #else:
+ # ignored
+ pass
+
+ def setblocking(self, _ignored):
+ pass # XXX good enough for common/gamesrv.py
+
+
+class UdpOverUdpMixer(object):
+ def __init__(self, sockoverudp):
+ self.send = sockoverudp.send_video_data
+
+ def setsockopt(self, *args):
+ pass # ignored