summaryrefslogtreecommitdiff
path: root/metaserver
diff options
context:
space:
mode:
authorDiego Roversi <diegor@tiscali.it>2019-09-08 18:12:27 +0200
committerDiego Roversi <diegor@tiscali.it>2019-09-08 18:12:27 +0200
commit1d9925c287b318ec21343e2682b51ab6a36ae8db (patch)
tree17d1c0ac21eea6f291146520afa8381db4586fb4 /metaserver
initial commit from cvs 1.6.2
Diffstat (limited to 'metaserver')
-rw-r--r--metaserver/.cvsignore4
-rw-r--r--metaserver/__init__.py1
-rw-r--r--metaserver/home.pngbin0 -> 1283 bytes
-rw-r--r--metaserver/index.html51
-rw-r--r--metaserver/mbub.pngbin0 -> 1138 bytes
-rw-r--r--metaserver/metaclient.py596
-rw-r--r--metaserver/metaserver.py365
-rw-r--r--metaserver/metastruct.py75
-rw-r--r--metaserver/pipelayer.py337
-rw-r--r--metaserver/socketoverudp.py174
10 files changed, 1603 insertions, 0 deletions
diff --git a/metaserver/.cvsignore b/metaserver/.cvsignore
new file mode 100644
index 0000000..687980a
--- /dev/null
+++ b/metaserver/.cvsignore
@@ -0,0 +1,4 @@
+*.py[co]
+build
+*.so
+*.pyd
diff --git a/metaserver/__init__.py b/metaserver/__init__.py
new file mode 100644
index 0000000..1bb8bf6
--- /dev/null
+++ b/metaserver/__init__.py
@@ -0,0 +1 @@
+# empty
diff --git a/metaserver/home.png b/metaserver/home.png
new file mode 100644
index 0000000..cc7a46e
--- /dev/null
+++ b/metaserver/home.png
Binary files differ
diff --git a/metaserver/index.html b/metaserver/index.html
new file mode 100644
index 0000000..fdb5583
--- /dev/null
+++ b/metaserver/index.html
@@ -0,0 +1,51 @@
+<html>
+<head><title>The Bub's Brothers Server List</title>
+</head>
+<body text="#000000" bgcolor="#C0FFC0" link="#0000EE" vlink="#000099" alink="#FF0000">
+
+<h1>The Bub's Brothers Server List</h1>
+%s
+<br>
+
+<table border=1 cellspacing=1>
+<tr><td bgcolor="#FFFFFF">
+<table border=0 cellspacing=6>
+<tr>
+ <td bgcolor="#008000" colspan=2 align="center"><img src="mbub.png">
+ <font color="#FFFF00" size=+1><strong>&nbsp;&nbsp;&nbsp;Running servers</strong></font></td>
+</tr>
+
+<tr>
+ <td>&nbsp;</td>
+ <td>&nbsp;</td>
+</tr>
+
+<tr>
+ <td>&nbsp;</td>
+ <td bgcolor="#FFFFFF"><table border=0 cellspacing=5>
+\
+<tr>
+<td valign="bottom"><font size=-1>%(stime)s</font></td>
+<td valign="bottom">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;%(icon)s&nbsp;&nbsp;&nbsp;</td>
+<td bgcolor="%(bgcolor)s"><font size=+1>%(hosthtml)s</font>
+ <br><strong>%(desc)s</strong> (%(extradesc)s)</td>
+</tr>
+\
+ </table></td>
+</tr>
+
+</table>
+</td></tr></table>
+<font size=-2>%(tbfiles)s</font>
+<br><hr><br>
+<p><font size=-1>
+%(bottommsg)s
+<br>
+<a href="javascript: location.reload()">Reload</a> this page for an up-to-date version!
+</font></p>
+%(extrafooter)s
+<p><img src="home.png"> I click on the servers but nothing happens - <a href="http://bub-n-bros.sourceforge.net/help.html#meta">why?</a></p>
+<p><img src="home.png"> Chat about the Bub's Brothers? Go to our IRC channel <a href="irc://irc.freenode.net:6667/bub-n-bros">#bub-n-bros on irc.freenode.net</a>.</p>
+<p><img src="home.png"> <a href="http://bub-n-bros.sourceforge.net">The Bub's Brothers Home Page</a></p>
+<p><img src="home.png"> Thanks to <a href="http://ctpug.org.za">CTPUG</a> for hosting the meta-server.</p>
+</body></html>
diff --git a/metaserver/mbub.png b/metaserver/mbub.png
new file mode 100644
index 0000000..d7aaae9
--- /dev/null
+++ b/metaserver/mbub.png
Binary files differ
diff --git a/metaserver/metaclient.py b/metaserver/metaclient.py
new file mode 100644
index 0000000..aedb99c
--- /dev/null
+++ b/metaserver/metaclient.py
@@ -0,0 +1,596 @@
+import sys, os, time, random
+from select import select
+from socket import *
+from metastruct import *
+
+_SERVER = 'ctpug.org.za'
+
+METASERVER = (_SERVER, 8055)
+METASERVER_UDP = (_SERVER, 8055)
+METASERVER_URL = 'http://%s:8050/bub-n-bros.html' % (_SERVER,)
+VERSION_TAG = 1601
+
+def connect(failure=[]):
+ if len(failure) >= 2:
+ return None
+ print >> sys.stderr, 'Connecting to the meta-server %s:%d...' % METASERVER
+ try:
+ s = socket(AF_INET, SOCK_STREAM)
+ s.connect(METASERVER)
+ except error, e:
+ print >> sys.stderr, '*** cannot contact meta-server:', str(e)
+ failure.append(e)
+ return None
+ else:
+ print >> sys.stderr, 'connected.'
+ return s
+
+sys.setcheckinterval(4096)
+
+
+def float2str(f):
+ # don't trust locale issues and write a string with a '.'
+ s = str(long(f*1000000.0))
+ return s[:-6] + '.' + s[-6:]
+
+def str2float(s):
+ try:
+ return float(s)
+ except:
+ # locale issues may prevent float() from decoding the string
+ s = s.strip()
+ try:
+ i = s.index('.')
+ except ValueError:
+ try:
+ i = s.index(',')
+ except ValueError:
+ i = len(s)
+ frac = s[i+1:]
+ return float(s[:i] or '0') + float(frac or '0')/(10**len(frac))
+
+
+# ____________________________________________________________
+# Game Servers
+
+class MetaClientSrv(MessageSocket):
+
+ def __init__(self, s, game):
+ MessageSocket.__init__(self, s)
+ self.game = game
+ self.lastwakeup = None
+ self.synsockets = {}
+ import gamesrv
+ gamesrv.addsocket('META', s, self.receive)
+ self.closed = 0
+
+ def close(self):
+ if not self.closed:
+ self.disconnect()
+ try:
+ self.s.shutdown(2)
+ except:
+ pass
+
+ def disconnect(self):
+ import gamesrv
+ gamesrv.removesocket('META', self.s)
+ self.closed = 1
+ print >> sys.stderr, 'disconnected from the meta-server'
+
+ def send_traceback(self):
+ if not self.closed:
+ import traceback, cStringIO, sys
+ f = cStringIO.StringIO()
+ print >> f, sys.version
+ print >> f, 'platform: ', sys.platform
+ print >> f, 'executable: ', sys.executable
+ print >> f, 'argv: ', sys.argv
+ print >> f, 'cwd: ', os.getcwd()
+ print >> f, 'version tag:', VERSION_TAG
+ print >> f
+ traceback.print_exc(file = f)
+ self.s.sendall(message(MMSG_TRACEBACK, f.getvalue()))
+
+ def msg_wakeup(self, origin, *rest):
+ if self.lastwakeup is None or time.time()-self.lastwakeup > 4.0:
+ def fastresponses(wakeup):
+ sys.setcheckinterval(64)
+ time.sleep(12.01)
+ if self.lastwakeup == wakeup:
+ sys.setcheckinterval(4096)
+ self.synsockets.clear()
+ import thread
+ self.lastwakeup = time.time()
+ thread.start_new_thread(fastresponses, (self.lastwakeup,))
+
+ def msg_connect(self, origin, port, *rest):
+ def connect(origin, port):
+ host, _ = origin.split(':')
+ addr = host, port
+ s = socket(AF_INET, SOCK_STREAM)
+ print >> sys.stderr, 'backconnecting to', addr
+ try:
+ s.connect(addr)
+ except error, e:
+ print >> sys.stderr, 'backconnecting:', str(e)
+ else:
+ self.game.newclient_threadsafe(s, addr)
+ import thread
+ thread.start_new_thread(connect, (origin, port))
+
+ def msg_udp_conn(self, origin, secret, port, *rest):
+ def connect(origin, secret, port):
+ host, _ = origin.split(':')
+ addr = host, port
+ s = socket(AF_INET, SOCK_DGRAM)
+ print >> sys.stderr, 'udp connecting to', addr
+ s.connect(addr)
+ mysecret = random.randrange(0, 65536)
+ packet = ('B' + chr( secret & 0xFF) + chr( secret >> 8)
+ + chr(mysecret & 0xFF) + chr(mysecret >> 8))
+ from socketoverudp import SocketOverUdp
+ from socketoverudp import SOU_RANGE_START, SOU_RANGE_STOP
+ for i in range(5):
+ #print 'sending', repr(packet)
+ s.send(packet)
+ iwtd, owtd, ewtd = select([s], [], [], 0.25)
+ if s in iwtd:
+ #print 'reading'
+ try:
+ inbuf = s.recv(SocketOverUdp.PACKETSIZE)
+ except error:
+ inbuf = ''
+ # try again?
+ iwtd, owtd, ewtd = select([s], [], [], 0.35)
+ if s in iwtd:
+ try:
+ inbuf = s.recv(SocketOverUdp.PACKETSIZE)
+ except error:
+ pass
+ #print 'got', repr(inbuf)
+ if (inbuf and
+ SOU_RANGE_START <= ord(inbuf[0]) < SOU_RANGE_STOP):
+ break
+ else:
+ print >> sys.stderr, 'udp connecting: no answer, giving up'
+ return
+ sock = SocketOverUdp(s, (mysecret, secret))
+ data = sock._decode(inbuf)
+ #print 'decoded as', repr(data)
+ expected = '[bnb c->s]' + packet[3:5]
+ while len(data) < len(expected) + 2:
+ #print 'waiting for more'
+ iwtd, owtd, ewtd = select([sock], [], [], 5.0)
+ if sock not in iwtd:
+ print >> sys.stderr, 'udp connecting: timed out'
+ return
+ #print 'decoding more'
+ data += sock.recv()
+ #print 'now data is', repr(data)
+ if data[:-2] != expected:
+ print >> sys.stderr, 'udp connecting: bad data'
+ return
+ sock.sendall('[bnb s->c]' + data[-2:])
+ sock.flush()
+ #print 'waiting for the last dot...'
+ while 1:
+ iwtd, owtd, ewtd = select([sock], [], [], 5.0)
+ if sock not in iwtd:
+ print >> sys.stderr, 'udp connecting: timed out'
+ return
+ data = sock.recv(200)
+ if data:
+ break
+ if data != '^':
+ print >> sys.stderr, 'udp connecting: bad data'
+ return
+ #print 'done!'
+ self.game.newclient_threadsafe(sock, addr)
+
+ import thread
+ thread.start_new_thread(connect, (origin, secret, port))
+
+ def msg_ping(self, origin, *rest):
+ # ping time1 --> pong time2 time1
+ self.s.sendall(message(MMSG_ROUTE, origin,
+ RMSG_PONG, float2str(time.time()), *rest))
+
+ def msg_sync(self, origin, clientport, time3, time2, time1, *rest):
+ time4 = time.time()
+ s = socket(AF_INET, SOCK_STREAM)
+ s.bind(('', INADDR_ANY))
+ _, serverport = s.getsockname()
+ self.s.sendall(message(MMSG_ROUTE, origin,
+ RMSG_CONNECT, serverport, clientport))
+ #print 'times:', time1, time2, time3, time4
+ doubleping = (str2float(time3)-str2float(time1)) + (time4-str2float(time2))
+ connecttime = time4 + doubleping / 4.0
+ def connect(origin, port, connecttime, s):
+ host, _ = origin.split(':')
+ addr = host, port
+ delay = connecttime - time.time()
+ #print 'sleep(%r)' % delay
+ if 0.0 <= delay <= 10.0:
+ time.sleep(delay)
+ print >> sys.stderr, 'synconnecting to', addr
+ try:
+ s.connect(addr)
+ except error, e:
+ print >> sys.stderr, 'synconnecting:', str(e)
+ else:
+ self.game.newclient_threadsafe(s, addr)
+ import thread
+ thread.start_new_thread(connect, (origin, clientport, connecttime, s))
+
+ MESSAGES = {
+ RMSG_CONNECT: msg_connect,
+ RMSG_WAKEUP: msg_wakeup,
+ RMSG_PING: msg_ping,
+ RMSG_SYNC: msg_sync,
+ RMSG_UDP_CONN:msg_udp_conn,
+ }
+
+metaclisrv = None
+
+def meta_register(game):
+ global metaclisrv
+ import gamesrv
+ info = {}
+ if game.FnDesc:
+ info['desc'] = game.FnDesc or ''
+ info['extradesc'] = game.FnExtraDesc() or ''
+
+ s = gamesrv.opentcpsocket()
+ hs = gamesrv.openhttpsocket()
+ port = int(gamesrv.displaysockport(s))
+ info['httpport'] = gamesrv.displaysockport(hs)
+
+ if not metaclisrv or metaclisrv.closed:
+ s = connect()
+ if not s:
+ return
+ metaclisrv = MetaClientSrv(s, game)
+ metaclisrv.s.sendall(message(MMSG_INFO, encodedict(info)) +
+ message(MMSG_START, port))
+
+def meta_unregister(game):
+ global metaclisrv
+ if metaclisrv:
+ metaclisrv.close()
+ metaclisrv = None
+
+
+# ____________________________________________________________
+# Game Clients
+
+class Event:
+ def __init__(self):
+ import thread
+ self.lock = thread.allocate_lock()
+ self.lock.acquire()
+ def signal(self):
+ try:
+ self.lock.release()
+ except:
+ pass
+ def wait1(self):
+ self.lock.acquire()
+
+
+class MetaClientCli:
+ fatalerror = False
+
+ def __init__(self, serverkey, backconnectport):
+ self.resultsocket = None
+ self.serverkey = serverkey
+ self.backconnectport = backconnectport
+ self.threads = {}
+
+ def run(self):
+ import thread
+ print >> sys.stderr, 'Trying to connect to', self.serverkey
+ self.ev = Event()
+ self.ev2 = Event()
+ self.buffer = ""
+ self.sendlock = thread.allocate_lock()
+ self.recvlock = thread.allocate_lock()
+ self.inputmsgqueue = []
+ self.gotudpport = None
+ if not (PORTS.get('CLIENT') or PORTS.get('sendudpto')):
+ self.s = connect()
+ thread.start_new_thread(self.acquire_udp_port, ())
+ else:
+ self.s = None
+ self.ev2.signal()
+ self.startthread(self.try_udp_connect)
+
+ thread.start_new_thread(self.bipbip, ())
+ self.startthread(self.try_direct_connect, 0.75)
+ self.startthread(self.try_indirect_connect, 1.50)
+ while self.resultsocket is None:
+ self.threadsleft()
+ self.ev.wait1()
+ self.ev2.wait1()
+ return self.resultsocket
+
+ def done(self):
+ sys.setcheckinterval(4096)
+
+ def bipbip(self):
+ while self.resultsocket is None:
+ time.sleep(0.31416)
+ self.ev.signal()
+
+ def startthread(self, fn, sleep=0.0, args=()):
+ import thread
+ def bootstrap(fn, atom, sleep, args):
+ try:
+ time.sleep(sleep)
+ if self.resultsocket is None:
+ fn(*args)
+ finally:
+ del self.threads[atom]
+ self.ev.signal()
+ atom = object()
+ self.threads[atom] = time.time()
+ thread.start_new_thread(bootstrap, (fn, atom, sleep, args))
+
+ def threadsleft(self):
+ if self.fatalerror:
+ sys.exit(1)
+ now = time.time()
+ TIMEOUT = 11
+ for starttime in self.threads.values():
+ if now < starttime + TIMEOUT:
+ break
+ else:
+ if self.threads:
+ print >> sys.stderr, '*** time out, giving up.'
+ else:
+ print >> sys.stderr, '*** failed to connect.'
+ sys.exit(1)
+
+ def try_direct_connect(self):
+ host, port = self.serverkey.split(':')
+ port = int(port)
+ s = socket(AF_INET, SOCK_STREAM)
+ try:
+ s.connect((host, port))
+ except error, e:
+ print >> sys.stderr, 'direct connexion failed:', str(e)
+ else:
+ print >> sys.stderr, 'direct connexion accepted.'
+ self.resultsocket = s
+
+ def try_indirect_connect(self):
+ import thread, time
+ if not self.s:
+ self.s = connect()
+ if not self.s:
+ return
+ self.routemsg(RMSG_WAKEUP)
+ self.startthread(self.try_backconnect)
+ self.socketcache = {}
+ tries = [0.6, 0.81, 1.2, 1.69, 2.6, 3.6, 4.9, 6.23]
+ for delay in tries:
+ self.startthread(self.send_ping, delay)
+ while self.resultsocket is None:
+ msg = self.inputmsg()
+ now = time.time()
+ if self.resultsocket is not None:
+ break
+ if msg[0] == RMSG_CONNECT:
+ # connect serverport clientport
+ self.startthread(self.try_synconnect, args=msg[1:])
+ if msg[0] == RMSG_PONG:
+ # pong time2 time1 --> sync port time3 time2 time1
+ if len(self.socketcache) < len(tries):
+ s = socket(AF_INET, SOCK_STREAM)
+ s.bind(('', INADDR_ANY))
+ _, port = s.getsockname()
+ self.socketcache[port] = s
+ self.routemsg(RMSG_SYNC, port, float2str(now), *msg[2:])
+
+ def sendmsg(self, data):
+ self.sendlock.acquire()
+ try:
+ self.s.sendall(data)
+ finally:
+ self.sendlock.release()
+
+ def routemsg(self, *rest):
+ self.sendmsg(message(MMSG_ROUTE, self.serverkey, *rest))
+
+ def _readnextmsg(self, blocking):
+ self.recvlock.acquire()
+ try:
+ while 1:
+ msg, self.buffer = decodemessage(self.buffer)
+ if msg is not None:
+ if msg[0] == RMSG_UDP_ADDR:
+ if len(msg) > 2:
+ self.gotudpport = msg[1], int(msg[2])
+ continue
+ if msg[0] == RMSG_NO_HOST and msg[1] == self.serverkey:
+ print >> sys.stderr, ('*** server %r is not registered'
+ ' on the meta-server' % (msg[1],))
+ self.fatalerror = True
+ sys.exit()
+ self.inputmsgqueue.append(msg)
+ return
+ iwtd, owtd, ewtd = select([self.s], [], [], 0)
+ if not iwtd:
+ if self.inputmsgqueue or not blocking:
+ return
+ data = self.s.recv(2048)
+ if not data:
+ print >> sys.stderr, 'disconnected from the meta-server'
+ sys.exit()
+ self.buffer += data
+ finally:
+ self.recvlock.release()
+
+ def inputmsg(self):
+ self._readnextmsg(blocking=True)
+ return self.inputmsgqueue.pop(0)
+
+ def try_backconnect(self):
+ s1 = socket(AF_INET, SOCK_STREAM)
+ s1.bind(('', self.backconnectport or INADDR_ANY))
+ s1.listen(1)
+ _, port = s1.getsockname()
+ self.routemsg(RMSG_CONNECT, port)
+ print >> sys.stderr, 'listening for backward connection'
+ iwtd, owtd, ewtd = select([s1], [], [], 7.5)
+ if s1 in iwtd:
+ s, addr = s1.accept()
+ print >> sys.stderr, 'accepted backward connection from', addr
+ self.resultsocket = s
+
+ def send_ping(self):
+ sys.stderr.write('. ')
+ self.routemsg(RMSG_PING, float2str(time.time()))
+
+ def try_synconnect(self, origin, remoteport, localport, *rest):
+ sys.stderr.write('+ ')
+ s = self.socketcache[localport]
+ remotehost, _ = origin.split(':')
+ remoteaddr = remotehost, remoteport
+ try:
+ s.connect(remoteaddr)
+ except error, e:
+ print >> sys.stderr, 'SYN connect failed:', str(e)
+ return
+ print >> sys.stderr, ('simultaneous SYN connect succeeded with %s:%d' %
+ remoteaddr)
+ self.resultsocket = s
+
+ def try_udp_connect(self):
+ for i in range(3): # three attempts
+ self.attempt_udp_connect()
+ if self.resultsocket is not None:
+ break
+
+ def attempt_udp_connect(self):
+ if '*udpsock*' in PORTS:
+ s, (host, port) = PORTS['*udpsock*']
+ else:
+ s = socket(AF_INET, SOCK_DGRAM)
+ s.bind(('', PORTS.get('CLIENT', INADDR_ANY)))
+ host, port = s.getsockname()
+ if 'sendudpto' in PORTS:
+ host = PORTS['sendudpto']
+ secret = originalsecret = random.randrange(0, 65536)
+ self.routemsg(RMSG_UDP_CONN, secret, port)
+ secret = 'B' + chr(secret & 0xFF) + chr(secret >> 8)
+ while True:
+ iwtd, owtd, ewtd = select([s], [], [], 2.94)
+ if s not in iwtd:
+ return
+ packet, addr = s.recvfrom(200)
+ if packet.startswith(secret) and len(packet) == 5:
+ break
+ s.connect(addr)
+ #print 'got', repr(packet)
+ remotesecret = ord(packet[3]) | (ord(packet[4]) << 8)
+ secret = random.randrange(0, 65536)
+ secret = chr(secret & 0xFF) + chr(secret >> 8)
+ packet = '[bnb c->s]' + packet[3:5] + secret
+ for name in ('*udpsock*', 'CLIENT'):
+ if name in PORTS:
+ del PORTS[name]
+ from socketoverudp import SocketOverUdp
+ sock = SocketOverUdp(s, (originalsecret, remotesecret))
+ #print 'sending', repr(packet)
+ sock.sendall(packet)
+ sock.flush()
+ data = ''
+ expected = '[bnb s->c]' + secret
+ while len(data) < len(expected):
+ #print 'waiting'
+ iwtd, owtd, ewtd = select([sock], [], [], 2.5)
+ if sock not in iwtd:
+ print >> sys.stderr, 'socket-over-udp timed out'
+ return
+ #print 'we get:'
+ data += sock.recv()
+ #print repr(data)
+ if data != expected:
+ print >> sys.stderr, 'bad udp data from', addr
+ else:
+ sock.sendall('^')
+ sock.flush()
+ print 'udp connexion handshake succeeded'
+ self.resultsocket = sock
+
+ def acquire_udp_port(self):
+ try:
+ s = socket(AF_INET, SOCK_DGRAM)
+ s.bind(('', INADDR_ANY))
+ randomdata = hex(random.randrange(0, sys.maxint))
+ for i in range(5):
+ s.sendto(randomdata, METASERVER_UDP)
+ time.sleep(0.05)
+ self.sendmsg(message(MMSG_UDP_ADDR, randomdata))
+ time.sleep(0.05)
+ self._readnextmsg(blocking=False)
+ if self.gotudpport:
+ PORTS['*udpsock*'] = s, self.gotudpport
+ udphost, udpport = self.gotudpport
+ print >> sys.stderr, ('udp port %d is visible from '
+ 'outside on %s:%d' % (
+ s.getsockname()[1], udphost, udpport))
+ self.startthread(self.try_udp_connect)
+ break
+ finally:
+ self.ev2.signal()
+
+
+def meta_connect(serverkey, backconnectport=None):
+ global METASERVER
+ if PORTS.get('SSH_RELAY'):
+ METASERVER = PORTS['SSH_RELAY']
+ c = MetaClientCli(serverkey, backconnectport)
+ s = c.run()
+ c.done()
+ return s
+
+def print_server_list():
+ s = connect()
+ if s is not None:
+ s.sendall(message(MMSG_LIST))
+ buffer = ""
+ while decodemessage(buffer)[0] is None:
+ buffer += s.recv(8192)
+ s.close()
+ msg = decodemessage(buffer)[0]
+ assert msg[0] == RMSG_LIST
+ entries = decodedict(msg[1])
+ if not entries:
+ print >> sys.stderr, 'No registered server.'
+ else:
+ print
+ print ' %-25s | %-30s | %s' % (
+ 'server', 'game', 'players')
+ print '-'*27+'+'+'-'*32+'+'+'-'*11
+ for key, value in entries.items():
+ if ':' in key:
+ try:
+ addr, _, _ = gethostbyaddr(key[:key.index(':')])
+ except:
+ pass
+ else:
+ addr = '%-27s' % (addr,)
+ if len(addr) < 28: addr += '|'
+ addr = '%-60s' % (addr,)
+ if len(addr) < 61: addr += '|'
+ print addr
+ value = decodedict(value)
+ print ' %-25s | %-30s | %s' % (
+ key, value.get('desc', '<no description>'),
+ value.get('extradesc', ''))
+ print
+
+if __name__ == '__main__':
+ print_server_list()
diff --git a/metaserver/metaserver.py b/metaserver/metaserver.py
new file mode 100644
index 0000000..cb4ea19
--- /dev/null
+++ b/metaserver/metaserver.py
@@ -0,0 +1,365 @@
+from socket import *
+import os, sys, time, random
+from select import select
+from cStringIO import StringIO
+from weakref import WeakValueDictionary
+from metastruct import *
+from common import httpserver, stdlog
+
+if __name__ == '__main__':
+ os.chdir(os.path.dirname(sys.argv[0]) or os.curdir)
+
+META_SERVER_HTTP_PORT = 8050
+META_SERVER_PORT = 8055
+META_SERVER_UDP_PORT = 8055
+IMAGE_DIR = "../bubbob/doc/images"
+ICONS = [open(os.path.join(IMAGE_DIR, s), 'rb').read()
+ for s in os.listdir(IMAGE_DIR) if s.endswith('.png')]
+assert ICONS, "you need to run ../bubbob/doc/bonus-doc.py"
+MAX_SERVERS = 50
+MAX_CONNEXIONS = 60
+
+
+serversockets = {}
+
+class MetaServer:
+
+ def __init__(self, port=META_SERVER_PORT, udpport=META_SERVER_UDP_PORT):
+ s = socket(AF_INET, SOCK_STREAM)
+ s.bind(('', port))
+ s.listen(5)
+ self.parentsock = s
+ self.ServersDict = {}
+ self.ServersList = []
+ serversockets[s] = self.clientconnect, sys.exit
+ self.udpsock = socket(AF_INET, SOCK_DGRAM)
+ self.udpsock.bind(('', udpport))
+ serversockets[self.udpsock] = self.udp_message, None
+ self.udpdata = []
+
+ def detach(self):
+ pid = os.fork()
+ if pid:
+ print pid
+ os._exit(0)
+ # in the child process
+ os.setsid()
+ logfile = stdlog.LogFile(limitsize=131072)
+ if logfile:
+ print >> logfile
+ print "Logging to", logfile.filename
+ fd = logfile.f.fileno()
+ try:
+ # detach from parent
+ os.dup2(fd, 1)
+ os.dup2(fd, 2)
+ os.dup2(fd, 0)
+ except OSError:
+ pass
+ logfile.close()
+ # record pid
+ f = open('pid', 'w')
+ print >> f, os.getpid()
+ f.close()
+
+ def clientconnect(self):
+ s, addr = self.parentsock.accept()
+ Connexion(s, addr)
+
+ def publish(self, server):
+ key = server.serverkey
+ if key in self.ServersDict:
+ current = self.ServersDict[key]
+ if current is server:
+ return
+ self.ServersList.remove(current)
+ elif len(self.ServersDict) >= MAX_SERVERS:
+ raise OverflowError
+ self.ServersList.append(server)
+ self.ServersDict[key] = server
+ print '+', key
+
+ def unpublish(self, server):
+ key = server.serverkey
+ if key in self.ServersDict:
+ current = self.ServersDict[key]
+ if current is server:
+ del self.ServersDict[key]
+ self.ServersList.remove(server)
+ print '-', key
+
+ def makelist(self):
+ items = {}
+ for srv in self.ServersList:
+ items[srv.serverkey] = encodedict(srv.serverinfo)
+ return encodedict(items)
+
+ def getserver(self, key):
+ return self.ServersDict[key]
+
+ def udp_message(self):
+ data, addr = self.udpsock.recvfrom(32)
+ self.udpdata.append((data, addr))
+ if len(self.udpdata) > 50:
+ del self.udpdata[0]
+
+
+class Connexion(MessageSocket):
+
+ def __init__(self, s, addr):
+ MessageSocket.__init__(self, s)
+ self.serverinfo = {
+ 'time': int(time.time()),
+ 'icon': random.choice(ICONS),
+ 'iconformat': 'png',
+ }
+ self.addr = addr
+ self.key = '%s:%d' % addr
+ self.serverkey = None
+ print '[', self.key
+ self.backlinks = WeakValueDictionary()
+ if len(serversockets) >= MAX_CONNEXIONS:
+ self.disconnect()
+ raise OverflowError
+ serversockets[s] = self.receive, self.disconnect
+
+ def disconnect(self):
+ metaserver.unpublish(self)
+ try:
+ del serversockets[self.s]
+ except KeyError:
+ pass
+ print ']', self.key
+
+ def msg_serverinfo(self, info, *rest):
+ print '|', self.key
+ if len(info) > 15000:
+ raise OverflowError
+ info = decodedict(info)
+ self.serverinfo.update(info)
+
+ def msg_startserver(self, port, *rest):
+ serverkey = '%s:%d' % (self.addr[0], port)
+ if self.serverkey and self.serverkey != serverkey:
+ metaserver.unpublish(self)
+ self.serverkey = serverkey
+ metaserver.publish(self)
+
+ def msg_stopserver(self, *rest):
+ metaserver.unpublish(self)
+
+ def msg_list(self, *rest):
+ self.s.sendall(message(RMSG_LIST, metaserver.makelist()))
+
+ def msg_route(self, targetkey, *rest):
+ try:
+ target = metaserver.getserver(targetkey)
+ except KeyError:
+ try:
+ target = self.backlinks[targetkey]
+ except KeyError:
+ self.s.sendall(message(RMSG_NO_HOST, targetkey))
+ return
+ target.route(self, *rest)
+
+ def route(self, origin, msgcode, *rest):
+ self.backlinks[origin.key] = origin
+ self.s.sendall(message(msgcode, origin.key, *rest))
+
+ def msg_traceback(self, tb, *rest):
+ f = stdlog.LogFile('tb-%s.log' % (self.addr[0],))
+ if f:
+ print >> f, tb
+ f.close()
+
+ def msg_udp_addr(self, pattern, *rest):
+ for data, addr in metaserver.udpdata:
+ if data == pattern:
+ try:
+ host, port = addr
+ port = int(port)
+ except ValueError:
+ continue
+ self.s.sendall(message(RMSG_UDP_ADDR, host, port))
+ return
+ else:
+ self.s.sendall(message(RMSG_UDP_ADDR))
+
+ MESSAGES = {
+ MMSG_INFO: msg_serverinfo,
+ MMSG_START: msg_startserver,
+ MMSG_STOP: msg_stopserver,
+ MMSG_LIST: msg_list,
+ MMSG_ROUTE: msg_route,
+ MMSG_TRACEBACK: msg_traceback,
+ MMSG_UDP_ADDR: msg_udp_addr,
+ }
+
+
+# ____________________________________________________________
+
+import htmlentitydefs
+text_to_html = {}
+for key, value in htmlentitydefs.entitydefs.items():
+ text_to_html[value] = '&' + key + ';'
+for i in range(32):
+ text_to_html[chr(i)] = '?'
+def htmlquote(s, getter=text_to_html.get):
+ lst = [getter(c, c) for c in s if ' ' <= c < '\x7F']
+ return ''.join(lst)
+def txtfilter(s, maxlen=200):
+ s = str(s)[:maxlen]
+ l = [c for c in s if c in "!$*,-.0123456789:@ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+ "^_`abcdefghijklmnopqrstuvwxyz{|}"]
+ return ''.join(l)
+
+f = open('index.html', 'r')
+HEADER, ROW, FOOTER = f.read().split('\\')
+f.close()
+
+def makehosthtml(srv, bottommsg, join):
+ info = srv.serverinfo
+ hostname, port = srv.serverkey.split(':')
+ try:
+ fullhostname = gethostbyaddr(hostname)[0]
+ except:
+ fullhostname = hostname
+ url = None
+ if join:
+ url = "http://%s/join.html?host=%s&port=%s&httpport=%s&m=%s" % (
+ join, hostname, port, info.get('httpport') or 'off', time.time())
+ else:
+ try:
+ httpport = int(info.get('httpport'))
+ except:
+ pass
+ else:
+ url = "http://%s:%s/" % (hostname, httpport)
+ javamsg = """
+<p>Click on a server above to join the game. This only works if:
+<ul><li>your browser understands Java;
+ <li>the server is not behind a firewall;
+ <li>you don't mind not hearing the nice background music.
+</ul></p>
+<p>Alternatively, install the
+<a href="http://bub-n-bros.sourceforge.net/download.html">Python version</a>
+of the client, which can cope with all of the above problems.</p>
+<br>"""
+ if javamsg not in bottommsg:
+ bottommsg.append(javamsg)
+ result = '<strong>%s</strong>:%s' % (fullhostname, port)
+ if url:
+ result = '<a href="%s">%s</a>' % (url, result)
+ return result
+
+def indexloader(headers, join=[], head=[], **options):
+ if join:
+ join = join[0]
+ data = [HEADER % (head and head[0] or '')]
+ bottommsg = []
+ if metaserver.ServersList:
+ counter = 0
+ for srv in metaserver.ServersList:
+ info = srv.serverinfo
+ icon = '<img src="ico?key=%s">' % srv.serverkey
+ bgcolor = ('#C0D0D0', '#E0D0A8')[counter&1]
+ hosthtml = makehosthtml(srv, bottommsg, join)
+ desc = htmlquote(info.get('desc')) or ''
+ extradesc = htmlquote(info.get('extradesc')) or ''
+ if isinstance(info.get('time'), int):
+ stime = time.strftime('%a %b %d<br>%H:%M GMT',
+ time.gmtime(info['time']))
+ else:
+ stime = ''
+ data.append(ROW % locals())
+ counter += 1
+ else:
+ data.append('''<tr><td bgcolor="#FFFFFF">
+ Sorry, there is no registered server at the moment.
+ </td></tr>''')
+ if join:
+ extrafooter = '''<p><img src="home.png">
+ <a href="http://%s/?time=%s">Back to local games</a></p>''' % (
+ join, time.time())
+ else:
+ extrafooter = ''
+ bottommsg = '\n'.join(bottommsg)
+ tbfiles = [s for s in os.listdir('.') if s.startswith('tb-')]
+ if tbfiles:
+ tbfiles = len(tbfiles)
+ else:
+ tbfiles = ''
+ data.append(FOOTER % locals())
+ return StringIO(''.join(data)), 'text/html'
+
+def icoloader(key, **options):
+ srv = metaserver.getserver(key[0])
+ iconformat = txtfilter(srv.serverinfo['iconformat'], 32)
+ return StringIO(srv.serverinfo['icon']), 'image/' + iconformat
+
+httpserver.register('', indexloader)
+httpserver.register('index.html', indexloader)
+httpserver.register('bub-n-bros.html', indexloader)
+httpserver.register('ico', icoloader)
+httpserver.register('mbub.png', httpserver.fileloader('mbub.png', 'image/png'))
+httpserver.register('home.png', httpserver.fileloader('home.png', 'image/png'))
+
+def openhttpsocket(port = META_SERVER_HTTP_PORT):
+ from BaseHTTPServer import HTTPServer
+ class ServerClass(HTTPServer):
+ def get_request(self):
+ sock, addr = self.socket.accept()
+ sock.settimeout(5.0)
+ return sock, addr
+ HandlerClass = httpserver.MiniHandler
+ server_address = ('', port)
+ httpd = ServerClass(server_address, HandlerClass)
+ s = httpd.socket
+ serversockets[s] = httpd.handle_request, None
+
+
+# ____________________________________________________________
+
+def mainloop():
+ while 1:
+ iwtd = serversockets.keys()
+ iwtd, owtd, ewtd = select(iwtd, [], iwtd)
+ #print iwtd, owtd, ewtd, serversockets
+ for s in iwtd:
+ if s in serversockets:
+ input, close = serversockets[s]
+ try:
+ input()
+ except:
+ import traceback
+ print "-"*60
+ traceback.print_exc()
+ print "-"*60
+ for s in ewtd:
+ if s in serversockets:
+ input, close = serversockets[s]
+ try:
+ close()
+ except:
+ import traceback
+ print "-"*60
+ traceback.print_exc()
+ print "-"*60
+
+
+if __name__ == '__main__':
+ metaserver = MetaServer()
+ if sys.argv[1:2] == ['-f']:
+ metaserver.detach()
+ try:
+ openhttpsocket()
+ print 'listening to client port tcp %d / http %d / udp %d.' % (
+ META_SERVER_PORT,
+ META_SERVER_HTTP_PORT,
+ META_SERVER_UDP_PORT)
+ mainloop()
+ finally:
+ if metaserver.ServersList:
+ print '*** servers still connected, waiting 5 seconds'
+ time.sleep(5)
+ print '*** leaving at', time.ctime()
diff --git a/metaserver/metastruct.py b/metaserver/metastruct.py
new file mode 100644
index 0000000..13240cc
--- /dev/null
+++ b/metaserver/metastruct.py
@@ -0,0 +1,75 @@
+import sys, os
+LOCALDIR = __file__
+LOCALDIR = os.path.abspath(os.path.dirname(LOCALDIR))
+sys.path.insert(0, os.path.dirname(LOCALDIR))
+
+from common.msgstruct import *
+from socket import error
+
+MMSG_INFO = 'I'
+MMSG_START = '+'
+MMSG_STOP = '-'
+MMSG_LIST = 'L'
+MMSG_ROUTE = 'R'
+MMSG_TRACEBACK= 'T'
+MMSG_UDP_ADDR = 'U'
+
+RMSG_WAKEUP = 'w'
+RMSG_PING = 'p'
+RMSG_PONG = 'o'
+RMSG_SYNC = 'y'
+RMSG_CONNECT = 'c'
+RMSG_LIST = 'l'
+RMSG_UDP_ADDR = 'u'
+RMSG_UDP_CONN = 'd'
+RMSG_NO_HOST = '?'
+
+
+def encodedict(dict):
+ data = []
+ for key, value in dict.items():
+ data.append(message('#', key, value))
+ return ''.join(data)
+
+def encodelist(list):
+ return message('[', *list)
+
+def decodedict(buffer):
+ result = {}
+ while 1:
+ msg, buffer = decodemessage(buffer)
+ if msg is None or len(msg) < 3 or msg[0] != '#':
+ break
+ result[msg[1]] = msg[2]
+ return result
+
+def decodelist(buffer):
+ msg, buffer = decodemessage(buffer)
+ assert msg[0] == '['
+ return list(msg[1:])
+
+
+class MessageSocket:
+
+ def __init__(self, s):
+ self.s = s
+ self.buffer = ""
+
+ def receive(self):
+ try:
+ data = self.s.recv(2048)
+ except error:
+ data = ''
+ if not data:
+ self.disconnect()
+ return
+ self.buffer += data
+ while 1:
+ msg, self.buffer = decodemessage(self.buffer)
+ if msg is None:
+ break
+ if msg[0] not in self.MESSAGES:
+ print >> sys.stderr, 'unknown message %r' % (msg[0],)
+ else:
+ fn = self.MESSAGES[msg[0]]
+ fn(self, *msg[1:])
diff --git a/metaserver/pipelayer.py b/metaserver/pipelayer.py
new file mode 100644
index 0000000..9baf78a
--- /dev/null
+++ b/metaserver/pipelayer.py
@@ -0,0 +1,337 @@
+#import os
+import struct
+from collections import deque
+from zlib import crc32
+
+
+class InvalidPacket(Exception):
+ pass
+
+
+FLAG_NAK1 = 0xE0
+FLAG_NAK = 0xE1
+FLAG_REG = 0xE2
+FLAG_CFRM = 0xE3
+
+FLAG_RANGE_START = 0xE0
+FLAG_RANGE_STOP = 0xE4
+
+max_old_packets = 200 # must be <= 256
+
+
+class PipeLayer(object):
+ timeout = 1
+ headersize = 4
+
+ def __init__(self, initialcrcs=(0, 0)):
+ #self.localid = os.urandom(4)
+ #self.remoteid = None
+ self.cur_time = 0
+ self.out_queue = deque()
+ self.out_nextseqid = 0
+ self.out_nextrepeattime = None
+ self.in_nextseqid = 0
+ self.in_outoforder = {}
+ self.out_oldpackets = deque()
+ self.out_flags = FLAG_REG
+ self.out_resend = 0
+ self.out_resend_skip = False
+ self.in_crc, self.out_crc = initialcrcs
+
+ def queue(self, data):
+ if data:
+ self.out_queue.appendleft(data)
+
+ def queue_size(self):
+ total = 0
+ for data in self.out_queue:
+ total += len(data)
+ return total
+
+ def in_sync(self):
+ return not self.out_queue and self.out_nextrepeattime is None
+
+ def settime(self, curtime):
+ self.cur_time = curtime
+ if self.out_queue:
+ if len(self.out_oldpackets) < max_old_packets:
+ return 0 # more data to send now
+ if self.out_nextrepeattime is not None:
+ return max(0, self.out_nextrepeattime - curtime)
+ else:
+ return None
+
+ def is_congested(self):
+ return len(self.out_oldpackets) >= max_old_packets
+
+ def encode(self, maxlength):
+ #print ' '*self._dump_indent, '--- OUTQ', self.out_resend, self.out_queue
+ if len(self.out_oldpackets) >= max_old_packets:
+ # congestion, stalling
+ payload = 0
+ else:
+ payload = maxlength - 8
+ if payload <= 0:
+ raise ValueError("encode(): buffer too small")
+ if (self.out_nextrepeattime is not None and
+ self.out_nextrepeattime <= self.cur_time):
+ # no ACK received so far, send a packet (possibly empty)
+ if not self.out_queue:
+ payload = 0
+ else:
+ if not self.out_queue: # no more data to send
+ return None
+ if payload == 0: # congestion
+ return None
+ # prepare a packet
+ seqid = self.out_nextseqid
+ flags = self.out_flags
+ self.out_flags = FLAG_REG # clear out the flags for the next time
+ #if flags in (FLAG_NAK, FLAG_NAK1):
+ # print 'out_flags NAK', hex(flags)
+ if payload > 0:
+ self.out_nextseqid = (seqid + 1) & 0xFFFF
+ data = self.out_queue.pop()
+ packetlength = len(data)
+ if self.out_resend > 0:
+ if packetlength > payload + 4:
+ raise ValueError("XXX need constant buffer size for now")
+ self.out_resend -= 1
+ if self.out_resend_skip:
+ if self.out_resend > 0:
+ self.out_queue.pop()
+ self.out_resend -= 1
+ self.out_nextseqid = (seqid + 2) & 0xFFFF
+ self.out_resend_skip = False
+ packetpayload = data
+ else:
+ packet = []
+ while packetlength <= payload:
+ packet.append(data)
+ if not self.out_queue:
+ break
+ data = self.out_queue.pop()
+ packetlength += len(data)
+ else:
+ rest = len(data) + payload - packetlength
+ packet.append(data[:rest])
+ self.out_queue.append(data[rest:])
+ packetpayload = ''.join(packet)
+ self.out_crc = crc32(packetpayload, self.out_crc)
+ packetpayload += struct.pack("!I", self.out_crc & 0xffffffff)
+ self.out_oldpackets.appendleft(packetpayload)
+ #print ' '*self._dump_indent, '--- OLDPK', self.out_oldpackets
+ else:
+ # a pure ACK packet, no payload
+ if self.out_oldpackets and flags == FLAG_REG:
+ flags = FLAG_CFRM
+ packetpayload = ''
+ packet = struct.pack("!BBH", flags,
+ self.in_nextseqid & 0xFF,
+ seqid) + packetpayload
+ if self.out_oldpackets:
+ self.out_nextrepeattime = self.cur_time + self.timeout
+ else:
+ self.out_nextrepeattime = None
+ #self.dump('OUT', packet)
+ return packet
+
+ def decode(self, rawdata):
+ if len(rawdata) < 4:
+ raise InvalidPacket
+ #print ' '*self._dump_indent, '------ out %d (+%d) in %d' % (self.out_nextseqid, self.out_resend, self.in_nextseqid)
+ #self.dump('IN ', rawdata)
+ in_flags, ack_seqid, in_seqid = struct.unpack("!BBH", rawdata[:4])
+ if not (FLAG_RANGE_START <= in_flags < FLAG_RANGE_STOP):
+ raise InvalidPacket
+ in_diff = (in_seqid - self.in_nextseqid ) & 0xFFFF
+ ack_diff = (self.out_nextseqid + self.out_resend - ack_seqid) & 0xFF
+ if in_diff >= max_old_packets:
+ return '' # invalid, but can occur as a late repetition
+ if ack_diff != len(self.out_oldpackets):
+ # forget all acknowledged packets
+ if ack_diff > len(self.out_oldpackets):
+ return '' # invalid, but can occur with packet reordering
+ while len(self.out_oldpackets) > ack_diff:
+ #print ' '*self._dump_indent, '--- POP', repr(self.out_oldpackets[-1])
+ self.out_oldpackets.pop()
+ if self.out_oldpackets:
+ self.out_nextrepeattime = self.cur_time + self.timeout
+ else:
+ self.out_nextrepeattime = None # all packets ACKed
+ if in_flags == FLAG_NAK or in_flags == FLAG_NAK1:
+ #print 'recv NAK', hex(in_flags)
+ # this is a NAK: resend the old packets as far as they've not
+ # also been ACK'ed in the meantime (can occur with reordering)
+ while self.out_resend < len(self.out_oldpackets):
+ self.out_queue.append(self.out_oldpackets[self.out_resend])
+ self.out_resend += 1
+ self.out_nextseqid = (self.out_nextseqid - 1) & 0xFFFF
+ #print ' '*self._dump_indent, '--- REP', self.out_nextseqid, repr(self.out_queue[-1])
+ self.out_resend_skip = in_flags == FLAG_NAK1
+ elif in_flags == FLAG_CFRM:
+ # this is a CFRM: request for confirmation
+ self.out_nextrepeattime = self.cur_time
+ # receive this packet's payload if it is the next in the sequence
+ if in_diff == 0:
+ if len(rawdata) > 8:
+ #print ' '*self._dump_indent, 'RECV ', self.in_nextseqid, repr(rawdata[4:])
+ payload = rawdata[4:-4]
+ crc, = struct.unpack("!I", rawdata[-4:])
+ if crc != (crc32(payload, self.in_crc) & 0xffffffff):
+ self.bad_crc()
+ return '' # bad crc! drop packet
+ self.in_nextseqid = (self.in_nextseqid + 1) & 0xFFFF
+ self.in_crc = crc
+ result = [payload]
+ while self.in_nextseqid in self.in_outoforder:
+ rawdata = self.in_outoforder.pop(self.in_nextseqid)
+ payload = rawdata[4:-4]
+ crc, = struct.unpack("!I", rawdata[-4:])
+ if crc != (crc32(payload, self.in_crc) & 0xffffffff):
+ # bad crc! clear all out-of-order packets
+ self.bad_crc()
+ break
+ self.in_nextseqid = (self.in_nextseqid + 1) & 0xFFFF
+ self.in_crc = crc
+ result.append(payload)
+ return ''.join(result)
+ else:
+ # we missed at least one intermediate packet: send a NAK
+ if len(rawdata) > 4:
+ self.in_outoforder[in_seqid] = rawdata
+ if ((self.in_nextseqid + 1) & 0xFFFF) in self.in_outoforder:
+ self.out_flags = FLAG_NAK1
+ else:
+ self.out_flags = FLAG_NAK
+ self.out_nextrepeattime = self.cur_time
+ return ''
+
+ def bad_crc(self):
+ import sys
+ print >> sys.stderr, "warning: bad crc on udp connexion"
+ self.in_outoforder.clear()
+ self.out_flags = FLAG_NAK
+ self.out_nextrepeattime = self.cur_time
+
+ _dump_indent = 0
+ def dump(self, dir, rawdata):
+ in_flags, ack_seqid, in_seqid = struct.unpack("!BBH", rawdata[:4])
+ print ' ' * self._dump_indent, dir,
+ if in_flags == FLAG_NAK:
+ print 'NAK',
+ elif in_flags == FLAG_NAK1:
+ print 'NAK1',
+ elif in_flags == FLAG_CFRM:
+ print 'CFRM',
+ #print ack_seqid, in_seqid, '(%d bytes)' % (len(rawdata)-4,)
+ print ack_seqid, in_seqid, repr(rawdata[4:])
+
+
+def pipe_over_udp(udpsock, send_fd=-1, recv_fd=-1,
+ timeout=1.0, inactivity_timeout=None):
+ """Example: send all data showing up in send_fd over the given UDP
+ socket, and write incoming data into recv_fd. The send_fd and
+ recv_fd are plain file descriptors. When an EOF is read from
+ send_fd, this function returns (after making sure that all data was
+ received by the remote side).
+ """
+ import os
+ from select import select
+ from time import time
+ p = PipeLayer()
+ p.timeout = timeout
+ iwtdlist = [udpsock]
+ if send_fd >= 0:
+ iwtdlist.append(send_fd)
+ running = True
+ while running or not p.in_sync():
+ delay = delay1 = p.settime(time())
+ if delay is None:
+ delay = inactivity_timeout
+ iwtd, owtd, ewtd = select(iwtdlist, [], [], delay)
+ if iwtd:
+ if send_fd in iwtd:
+ data = os.read(send_fd, 1500 - p.headersize)
+ if not data:
+ # EOF
+ iwtdlist.remove(send_fd)
+ running = False
+ else:
+ #print 'queue', len(data)
+ p.queue(data)
+ if udpsock in iwtd:
+ packet = udpsock.recv(65535)
+ #print 'decode', len(packet)
+ p.settime(time())
+ data = p.decode(packet)
+ i = 0
+ while i < len(data):
+ i += os.write(recv_fd, data[i:])
+ elif delay1 is None:
+ break # long inactivity
+ p.settime(time())
+ packet = p.encode(1500)
+ if packet:
+ #print 'send', len(packet)
+ #if os.urandom(1) >= '\x08': # emulate packet losses
+ udpsock.send(packet)
+
+
+class PipeOverUdp(object):
+
+ def __init__(self, udpsock, timeout=1.0):
+ import thread, os
+ self.os = os
+ self.sendpipe = os.pipe()
+ self.recvpipe = os.pipe()
+ thread.start_new_thread(pipe_over_udp, (udpsock,
+ self.sendpipe[0],
+ self.recvpipe[1],
+ timeout))
+
+ def __del__(self):
+ os = self.os
+ if self.sendpipe:
+ os.close(self.sendpipe[0])
+ os.close(self.sendpipe[1])
+ self.sendpipe = None
+ if self.recvpipe:
+ os.close(self.recvpipe[0])
+ os.close(self.recvpipe[1])
+ self.recvpipe = None
+
+ close = __del__
+
+ def send(self, data):
+ if not self.sendpipe:
+ raise IOError("I/O operation on a closed PipeOverUdp")
+ return self.os.write(self.sendpipe[1], data)
+
+ def sendall(self, data):
+ i = 0
+ while i < len(data):
+ i += self.send(data[i:])
+
+ def recv(self, bufsize):
+ if not self.recvpipe:
+ raise IOError("I/O operation on a closed PipeOverUdp")
+ return self.os.read(self.recvpipe[0], bufsize)
+
+ def recvall(self, bufsize):
+ buf = []
+ while bufsize > 0:
+ data = self.recv(bufsize)
+ buf.append(data)
+ bufsize -= len(data)
+ return ''.join(buf)
+
+ def fileno(self):
+ if not self.recvpipe:
+ raise IOError("I/O operation on a closed PipeOverUdp")
+ return self.recvpipe[0]
+
+ def ofileno(self):
+ if not self.sendpipe:
+ raise IOError("I/O operation on a closed PipeOverUdp")
+ return self.sendpipe[1]
diff --git a/metaserver/socketoverudp.py b/metaserver/socketoverudp.py
new file mode 100644
index 0000000..2df4a75
--- /dev/null
+++ b/metaserver/socketoverudp.py
@@ -0,0 +1,174 @@
+from time import time as now
+from pipelayer import PipeLayer, InvalidPacket
+from pipelayer import FLAG_RANGE_START, FLAG_RANGE_STOP
+import socket, struct
+
+SOU_RANGE_START = FLAG_RANGE_START
+SOU_MIXED_DATA = FLAG_RANGE_STOP + 0
+SOU_SHUTDOWN = FLAG_RANGE_STOP + 1
+SOU_RANGE_STOP = FLAG_RANGE_STOP + 2
+
+SHUTDOWN_PACKET = chr(SOU_SHUTDOWN) + '**' # < 4 characters
+
+CONGESTION_TIMEOUT = 20.0
+#CONSOLIDATE_DELAY = 0.1
+
+
+class SocketOverUdp(object):
+ RECV_CAN_RETURN_EMPTY = True
+ PACKETSIZE = 996
+ MIXEDPACKETSIZE = 1080
+
+ def __init__(self, udpsock, initialcrcs):
+ self.udpsock = udpsock
+ self.pl = PipeLayer(initialcrcs)
+ self.congested_since = None
+ #self.consolidate_sends = None
+ #self.encode_delayed_until = now()
+
+ def close(self):
+ try:
+ self.udpsock.send(SHUTDOWN_PACKET)
+ except socket.error:
+ pass
+ self.udpsock.close()
+
+ def _progress(self):
+ if self.pl.settime(now()) == 0.0:
+ self._encode()
+
+ def _encode(self):
+ #if self.consolidate_sends:
+ # if self.pl.cur_time < self.encode_delayed_until:
+ # return False
+ # self.encode_delayed_until = self.pl.cur_time + CONSOLIDATE_DELAY
+ packet = self.pl.encode(self.PACKETSIZE)
+ if packet is not None:
+ #print 'send:', repr(packet)
+ if self.pl.is_congested():
+ if self.congested_since is None:
+ self.congested_since = now()
+ else:
+ if now() > self.congested_since + CONGESTION_TIMEOUT:
+ self.udpsock.send(SHUTDOWN_PACKET)
+ raise socket.error("peer not responding, timing out")
+ else:
+ self.congested_since = None
+ #print repr(packet[:10])
+ #print "out:", len(packet)
+ #print ' ---'
+ self.udpsock.send(packet)
+
+ def _decode(self, packet):
+ try:
+ data = self.pl.decode(packet)
+ #print ' ~~~'
+ return data
+ except InvalidPacket:
+ if len(packet) >= 4:
+ hdr, reserved, size = struct.unpack("!BBH", packet[:4])
+ if hdr == SOU_MIXED_DATA:
+ #print ' ~~~[unmix%d/%d]' % (len(packet[4+size:]),
+ # len(packet))
+ self.udp_over_udp_decoder(packet[4:4+size])
+ return self._decode(packet[4+size:])
+ else:
+ # non-tiny packets with no recognized hdr byte are
+ # assumed to be pure video traffic
+ #print ' ~~~[video]'
+ self.udp_over_udp_decoder(packet)
+ return ''
+ elif packet == SHUTDOWN_PACKET:
+ raise socket.error("received an end-of-connexion packet")
+ else:
+ #print ' ~~~[INVALID%d]' % (len(packet),)
+ return ''
+
+ def fileno(self):
+ self._progress()
+ return self.udpsock.fileno()
+
+ def flush(self):
+ while self.pl.settime(now()) == 0.0:
+ #self.encode_delayed_until = self.pl.cur_time
+ self._encode()
+
+ def recv(self, _ignoredbufsize=None):
+ #print 'recv:'
+ packet = self.udpsock.recv(65535)
+ #print " in:", len(packet), hex(ord(packet[0]))
+ #print repr(packet)
+ self.pl.settime(now())
+ data = self._decode(packet)
+ #print 'which is really', repr(data)
+ self._encode()
+ #if data:
+ # print " IN:", len(data)
+ return data
+
+ def sendall(self, data):
+ #print 'queuing', repr(data)
+ #print ' OUT:', len(data)
+ self.pl.queue(data)
+ #self._progress()
+ return len(data)
+
+ send = sendall
+
+ def send_video_data(self, udpdata):
+ forced_embedded = SOU_RANGE_START <= ord(udpdata[0]) < SOU_RANGE_STOP
+ self.pl.settime(now())
+ packet = self.pl.encode(self.PACKETSIZE) or ''
+ if not forced_embedded and not packet:
+ # no PipeLayer packet, send as plain udp data
+ datagram = udpdata
+ elif len(packet) + len(udpdata) <= self.MIXEDPACKETSIZE:
+ # fits in a single mixed data packet
+ datagram = (struct.pack("!BBH", SOU_MIXED_DATA, 0, len(udpdata))
+ + udpdata + packet)
+ #print ' ---[mix%d/%d]' % (len(packet), len(datagram))
+ else:
+ # two packets needed
+ #print repr(packet[:10])
+ #print "out:", len(packet)
+ #print ' ---'
+ self.udpsock.send(packet)
+ datagram = udpdata
+ #print repr(datagram[:10])
+ #print "out:", len(datagram), hex(ord(datagram[0]))
+ self.udpsock.send(datagram)
+ #self.encode_delayed_until = self.pl.cur_time + CONSOLIDATE_DELAY
+ #if self.consolidate_sends is None:
+ # self.consolidate_sends = True
+ return len(udpdata)
+
+ def udp_over_udp_mixer(self):
+ return UdpOverUdpMixer(self)
+
+ def udp_over_udp_decoder(self, data):
+ pass # method overridden by pclient.py
+
+ def getpeername(self):
+ return self.udpsock.getpeername()
+
+ def getsockname(self):
+ return self.udpsock.getsockname()
+
+ def setsockopt(self, level, opt, value):
+ # note that TCP_NODELAY is set by the bub-n-bros client, not the server
+ #if level == socket.SOL_TCP and opt == socket.TCP_NODELAY:
+ # self.consolidate_sends = not value
+ #else:
+ # ignored
+ pass
+
+ def setblocking(self, _ignored):
+ pass # XXX good enough for common/gamesrv.py
+
+
+class UdpOverUdpMixer(object):
+ def __init__(self, sockoverudp):
+ self.send = sockoverudp.send_video_data
+
+ def setsockopt(self, *args):
+ pass # ignored