diff options
author | Diego Roversi <diegor@tiscali.it> | 2019-09-08 18:12:27 +0200 |
---|---|---|
committer | Diego Roversi <diegor@tiscali.it> | 2019-09-08 18:12:27 +0200 |
commit | 1d9925c287b318ec21343e2682b51ab6a36ae8db (patch) | |
tree | 17d1c0ac21eea6f291146520afa8381db4586fb4 /metaserver |
initial commit from cvs 1.6.2
Diffstat (limited to 'metaserver')
-rw-r--r-- | metaserver/.cvsignore | 4 | ||||
-rw-r--r-- | metaserver/__init__.py | 1 | ||||
-rw-r--r-- | metaserver/home.png | bin | 0 -> 1283 bytes | |||
-rw-r--r-- | metaserver/index.html | 51 | ||||
-rw-r--r-- | metaserver/mbub.png | bin | 0 -> 1138 bytes | |||
-rw-r--r-- | metaserver/metaclient.py | 596 | ||||
-rw-r--r-- | metaserver/metaserver.py | 365 | ||||
-rw-r--r-- | metaserver/metastruct.py | 75 | ||||
-rw-r--r-- | metaserver/pipelayer.py | 337 | ||||
-rw-r--r-- | metaserver/socketoverudp.py | 174 |
10 files changed, 1603 insertions, 0 deletions
diff --git a/metaserver/.cvsignore b/metaserver/.cvsignore new file mode 100644 index 0000000..687980a --- /dev/null +++ b/metaserver/.cvsignore @@ -0,0 +1,4 @@ +*.py[co] +build +*.so +*.pyd diff --git a/metaserver/__init__.py b/metaserver/__init__.py new file mode 100644 index 0000000..1bb8bf6 --- /dev/null +++ b/metaserver/__init__.py @@ -0,0 +1 @@ +# empty diff --git a/metaserver/home.png b/metaserver/home.png Binary files differnew file mode 100644 index 0000000..cc7a46e --- /dev/null +++ b/metaserver/home.png diff --git a/metaserver/index.html b/metaserver/index.html new file mode 100644 index 0000000..fdb5583 --- /dev/null +++ b/metaserver/index.html @@ -0,0 +1,51 @@ +<html> +<head><title>The Bub's Brothers Server List</title> +</head> +<body text="#000000" bgcolor="#C0FFC0" link="#0000EE" vlink="#000099" alink="#FF0000"> + +<h1>The Bub's Brothers Server List</h1> +%s +<br> + +<table border=1 cellspacing=1> +<tr><td bgcolor="#FFFFFF"> +<table border=0 cellspacing=6> +<tr> + <td bgcolor="#008000" colspan=2 align="center"><img src="mbub.png"> + <font color="#FFFF00" size=+1><strong> Running servers</strong></font></td> +</tr> + +<tr> + <td> </td> + <td> </td> +</tr> + +<tr> + <td> </td> + <td bgcolor="#FFFFFF"><table border=0 cellspacing=5> +\ +<tr> +<td valign="bottom"><font size=-1>%(stime)s</font></td> +<td valign="bottom"> %(icon)s </td> +<td bgcolor="%(bgcolor)s"><font size=+1>%(hosthtml)s</font> + <br><strong>%(desc)s</strong> (%(extradesc)s)</td> +</tr> +\ + </table></td> +</tr> + +</table> +</td></tr></table> +<font size=-2>%(tbfiles)s</font> +<br><hr><br> +<p><font size=-1> +%(bottommsg)s +<br> +<a href="javascript: location.reload()">Reload</a> this page for an up-to-date version! +</font></p> +%(extrafooter)s +<p><img src="home.png"> I click on the servers but nothing happens - <a href="http://bub-n-bros.sourceforge.net/help.html#meta">why?</a></p> +<p><img src="home.png"> Chat about the Bub's Brothers? Go to our IRC channel <a href="irc://irc.freenode.net:6667/bub-n-bros">#bub-n-bros on irc.freenode.net</a>.</p> +<p><img src="home.png"> <a href="http://bub-n-bros.sourceforge.net">The Bub's Brothers Home Page</a></p> +<p><img src="home.png"> Thanks to <a href="http://ctpug.org.za">CTPUG</a> for hosting the meta-server.</p> +</body></html> diff --git a/metaserver/mbub.png b/metaserver/mbub.png Binary files differnew file mode 100644 index 0000000..d7aaae9 --- /dev/null +++ b/metaserver/mbub.png diff --git a/metaserver/metaclient.py b/metaserver/metaclient.py new file mode 100644 index 0000000..aedb99c --- /dev/null +++ b/metaserver/metaclient.py @@ -0,0 +1,596 @@ +import sys, os, time, random +from select import select +from socket import * +from metastruct import * + +_SERVER = 'ctpug.org.za' + +METASERVER = (_SERVER, 8055) +METASERVER_UDP = (_SERVER, 8055) +METASERVER_URL = 'http://%s:8050/bub-n-bros.html' % (_SERVER,) +VERSION_TAG = 1601 + +def connect(failure=[]): + if len(failure) >= 2: + return None + print >> sys.stderr, 'Connecting to the meta-server %s:%d...' % METASERVER + try: + s = socket(AF_INET, SOCK_STREAM) + s.connect(METASERVER) + except error, e: + print >> sys.stderr, '*** cannot contact meta-server:', str(e) + failure.append(e) + return None + else: + print >> sys.stderr, 'connected.' + return s + +sys.setcheckinterval(4096) + + +def float2str(f): + # don't trust locale issues and write a string with a '.' + s = str(long(f*1000000.0)) + return s[:-6] + '.' + s[-6:] + +def str2float(s): + try: + return float(s) + except: + # locale issues may prevent float() from decoding the string + s = s.strip() + try: + i = s.index('.') + except ValueError: + try: + i = s.index(',') + except ValueError: + i = len(s) + frac = s[i+1:] + return float(s[:i] or '0') + float(frac or '0')/(10**len(frac)) + + +# ____________________________________________________________ +# Game Servers + +class MetaClientSrv(MessageSocket): + + def __init__(self, s, game): + MessageSocket.__init__(self, s) + self.game = game + self.lastwakeup = None + self.synsockets = {} + import gamesrv + gamesrv.addsocket('META', s, self.receive) + self.closed = 0 + + def close(self): + if not self.closed: + self.disconnect() + try: + self.s.shutdown(2) + except: + pass + + def disconnect(self): + import gamesrv + gamesrv.removesocket('META', self.s) + self.closed = 1 + print >> sys.stderr, 'disconnected from the meta-server' + + def send_traceback(self): + if not self.closed: + import traceback, cStringIO, sys + f = cStringIO.StringIO() + print >> f, sys.version + print >> f, 'platform: ', sys.platform + print >> f, 'executable: ', sys.executable + print >> f, 'argv: ', sys.argv + print >> f, 'cwd: ', os.getcwd() + print >> f, 'version tag:', VERSION_TAG + print >> f + traceback.print_exc(file = f) + self.s.sendall(message(MMSG_TRACEBACK, f.getvalue())) + + def msg_wakeup(self, origin, *rest): + if self.lastwakeup is None or time.time()-self.lastwakeup > 4.0: + def fastresponses(wakeup): + sys.setcheckinterval(64) + time.sleep(12.01) + if self.lastwakeup == wakeup: + sys.setcheckinterval(4096) + self.synsockets.clear() + import thread + self.lastwakeup = time.time() + thread.start_new_thread(fastresponses, (self.lastwakeup,)) + + def msg_connect(self, origin, port, *rest): + def connect(origin, port): + host, _ = origin.split(':') + addr = host, port + s = socket(AF_INET, SOCK_STREAM) + print >> sys.stderr, 'backconnecting to', addr + try: + s.connect(addr) + except error, e: + print >> sys.stderr, 'backconnecting:', str(e) + else: + self.game.newclient_threadsafe(s, addr) + import thread + thread.start_new_thread(connect, (origin, port)) + + def msg_udp_conn(self, origin, secret, port, *rest): + def connect(origin, secret, port): + host, _ = origin.split(':') + addr = host, port + s = socket(AF_INET, SOCK_DGRAM) + print >> sys.stderr, 'udp connecting to', addr + s.connect(addr) + mysecret = random.randrange(0, 65536) + packet = ('B' + chr( secret & 0xFF) + chr( secret >> 8) + + chr(mysecret & 0xFF) + chr(mysecret >> 8)) + from socketoverudp import SocketOverUdp + from socketoverudp import SOU_RANGE_START, SOU_RANGE_STOP + for i in range(5): + #print 'sending', repr(packet) + s.send(packet) + iwtd, owtd, ewtd = select([s], [], [], 0.25) + if s in iwtd: + #print 'reading' + try: + inbuf = s.recv(SocketOverUdp.PACKETSIZE) + except error: + inbuf = '' + # try again? + iwtd, owtd, ewtd = select([s], [], [], 0.35) + if s in iwtd: + try: + inbuf = s.recv(SocketOverUdp.PACKETSIZE) + except error: + pass + #print 'got', repr(inbuf) + if (inbuf and + SOU_RANGE_START <= ord(inbuf[0]) < SOU_RANGE_STOP): + break + else: + print >> sys.stderr, 'udp connecting: no answer, giving up' + return + sock = SocketOverUdp(s, (mysecret, secret)) + data = sock._decode(inbuf) + #print 'decoded as', repr(data) + expected = '[bnb c->s]' + packet[3:5] + while len(data) < len(expected) + 2: + #print 'waiting for more' + iwtd, owtd, ewtd = select([sock], [], [], 5.0) + if sock not in iwtd: + print >> sys.stderr, 'udp connecting: timed out' + return + #print 'decoding more' + data += sock.recv() + #print 'now data is', repr(data) + if data[:-2] != expected: + print >> sys.stderr, 'udp connecting: bad data' + return + sock.sendall('[bnb s->c]' + data[-2:]) + sock.flush() + #print 'waiting for the last dot...' + while 1: + iwtd, owtd, ewtd = select([sock], [], [], 5.0) + if sock not in iwtd: + print >> sys.stderr, 'udp connecting: timed out' + return + data = sock.recv(200) + if data: + break + if data != '^': + print >> sys.stderr, 'udp connecting: bad data' + return + #print 'done!' + self.game.newclient_threadsafe(sock, addr) + + import thread + thread.start_new_thread(connect, (origin, secret, port)) + + def msg_ping(self, origin, *rest): + # ping time1 --> pong time2 time1 + self.s.sendall(message(MMSG_ROUTE, origin, + RMSG_PONG, float2str(time.time()), *rest)) + + def msg_sync(self, origin, clientport, time3, time2, time1, *rest): + time4 = time.time() + s = socket(AF_INET, SOCK_STREAM) + s.bind(('', INADDR_ANY)) + _, serverport = s.getsockname() + self.s.sendall(message(MMSG_ROUTE, origin, + RMSG_CONNECT, serverport, clientport)) + #print 'times:', time1, time2, time3, time4 + doubleping = (str2float(time3)-str2float(time1)) + (time4-str2float(time2)) + connecttime = time4 + doubleping / 4.0 + def connect(origin, port, connecttime, s): + host, _ = origin.split(':') + addr = host, port + delay = connecttime - time.time() + #print 'sleep(%r)' % delay + if 0.0 <= delay <= 10.0: + time.sleep(delay) + print >> sys.stderr, 'synconnecting to', addr + try: + s.connect(addr) + except error, e: + print >> sys.stderr, 'synconnecting:', str(e) + else: + self.game.newclient_threadsafe(s, addr) + import thread + thread.start_new_thread(connect, (origin, clientport, connecttime, s)) + + MESSAGES = { + RMSG_CONNECT: msg_connect, + RMSG_WAKEUP: msg_wakeup, + RMSG_PING: msg_ping, + RMSG_SYNC: msg_sync, + RMSG_UDP_CONN:msg_udp_conn, + } + +metaclisrv = None + +def meta_register(game): + global metaclisrv + import gamesrv + info = {} + if game.FnDesc: + info['desc'] = game.FnDesc or '' + info['extradesc'] = game.FnExtraDesc() or '' + + s = gamesrv.opentcpsocket() + hs = gamesrv.openhttpsocket() + port = int(gamesrv.displaysockport(s)) + info['httpport'] = gamesrv.displaysockport(hs) + + if not metaclisrv or metaclisrv.closed: + s = connect() + if not s: + return + metaclisrv = MetaClientSrv(s, game) + metaclisrv.s.sendall(message(MMSG_INFO, encodedict(info)) + + message(MMSG_START, port)) + +def meta_unregister(game): + global metaclisrv + if metaclisrv: + metaclisrv.close() + metaclisrv = None + + +# ____________________________________________________________ +# Game Clients + +class Event: + def __init__(self): + import thread + self.lock = thread.allocate_lock() + self.lock.acquire() + def signal(self): + try: + self.lock.release() + except: + pass + def wait1(self): + self.lock.acquire() + + +class MetaClientCli: + fatalerror = False + + def __init__(self, serverkey, backconnectport): + self.resultsocket = None + self.serverkey = serverkey + self.backconnectport = backconnectport + self.threads = {} + + def run(self): + import thread + print >> sys.stderr, 'Trying to connect to', self.serverkey + self.ev = Event() + self.ev2 = Event() + self.buffer = "" + self.sendlock = thread.allocate_lock() + self.recvlock = thread.allocate_lock() + self.inputmsgqueue = [] + self.gotudpport = None + if not (PORTS.get('CLIENT') or PORTS.get('sendudpto')): + self.s = connect() + thread.start_new_thread(self.acquire_udp_port, ()) + else: + self.s = None + self.ev2.signal() + self.startthread(self.try_udp_connect) + + thread.start_new_thread(self.bipbip, ()) + self.startthread(self.try_direct_connect, 0.75) + self.startthread(self.try_indirect_connect, 1.50) + while self.resultsocket is None: + self.threadsleft() + self.ev.wait1() + self.ev2.wait1() + return self.resultsocket + + def done(self): + sys.setcheckinterval(4096) + + def bipbip(self): + while self.resultsocket is None: + time.sleep(0.31416) + self.ev.signal() + + def startthread(self, fn, sleep=0.0, args=()): + import thread + def bootstrap(fn, atom, sleep, args): + try: + time.sleep(sleep) + if self.resultsocket is None: + fn(*args) + finally: + del self.threads[atom] + self.ev.signal() + atom = object() + self.threads[atom] = time.time() + thread.start_new_thread(bootstrap, (fn, atom, sleep, args)) + + def threadsleft(self): + if self.fatalerror: + sys.exit(1) + now = time.time() + TIMEOUT = 11 + for starttime in self.threads.values(): + if now < starttime + TIMEOUT: + break + else: + if self.threads: + print >> sys.stderr, '*** time out, giving up.' + else: + print >> sys.stderr, '*** failed to connect.' + sys.exit(1) + + def try_direct_connect(self): + host, port = self.serverkey.split(':') + port = int(port) + s = socket(AF_INET, SOCK_STREAM) + try: + s.connect((host, port)) + except error, e: + print >> sys.stderr, 'direct connexion failed:', str(e) + else: + print >> sys.stderr, 'direct connexion accepted.' + self.resultsocket = s + + def try_indirect_connect(self): + import thread, time + if not self.s: + self.s = connect() + if not self.s: + return + self.routemsg(RMSG_WAKEUP) + self.startthread(self.try_backconnect) + self.socketcache = {} + tries = [0.6, 0.81, 1.2, 1.69, 2.6, 3.6, 4.9, 6.23] + for delay in tries: + self.startthread(self.send_ping, delay) + while self.resultsocket is None: + msg = self.inputmsg() + now = time.time() + if self.resultsocket is not None: + break + if msg[0] == RMSG_CONNECT: + # connect serverport clientport + self.startthread(self.try_synconnect, args=msg[1:]) + if msg[0] == RMSG_PONG: + # pong time2 time1 --> sync port time3 time2 time1 + if len(self.socketcache) < len(tries): + s = socket(AF_INET, SOCK_STREAM) + s.bind(('', INADDR_ANY)) + _, port = s.getsockname() + self.socketcache[port] = s + self.routemsg(RMSG_SYNC, port, float2str(now), *msg[2:]) + + def sendmsg(self, data): + self.sendlock.acquire() + try: + self.s.sendall(data) + finally: + self.sendlock.release() + + def routemsg(self, *rest): + self.sendmsg(message(MMSG_ROUTE, self.serverkey, *rest)) + + def _readnextmsg(self, blocking): + self.recvlock.acquire() + try: + while 1: + msg, self.buffer = decodemessage(self.buffer) + if msg is not None: + if msg[0] == RMSG_UDP_ADDR: + if len(msg) > 2: + self.gotudpport = msg[1], int(msg[2]) + continue + if msg[0] == RMSG_NO_HOST and msg[1] == self.serverkey: + print >> sys.stderr, ('*** server %r is not registered' + ' on the meta-server' % (msg[1],)) + self.fatalerror = True + sys.exit() + self.inputmsgqueue.append(msg) + return + iwtd, owtd, ewtd = select([self.s], [], [], 0) + if not iwtd: + if self.inputmsgqueue or not blocking: + return + data = self.s.recv(2048) + if not data: + print >> sys.stderr, 'disconnected from the meta-server' + sys.exit() + self.buffer += data + finally: + self.recvlock.release() + + def inputmsg(self): + self._readnextmsg(blocking=True) + return self.inputmsgqueue.pop(0) + + def try_backconnect(self): + s1 = socket(AF_INET, SOCK_STREAM) + s1.bind(('', self.backconnectport or INADDR_ANY)) + s1.listen(1) + _, port = s1.getsockname() + self.routemsg(RMSG_CONNECT, port) + print >> sys.stderr, 'listening for backward connection' + iwtd, owtd, ewtd = select([s1], [], [], 7.5) + if s1 in iwtd: + s, addr = s1.accept() + print >> sys.stderr, 'accepted backward connection from', addr + self.resultsocket = s + + def send_ping(self): + sys.stderr.write('. ') + self.routemsg(RMSG_PING, float2str(time.time())) + + def try_synconnect(self, origin, remoteport, localport, *rest): + sys.stderr.write('+ ') + s = self.socketcache[localport] + remotehost, _ = origin.split(':') + remoteaddr = remotehost, remoteport + try: + s.connect(remoteaddr) + except error, e: + print >> sys.stderr, 'SYN connect failed:', str(e) + return + print >> sys.stderr, ('simultaneous SYN connect succeeded with %s:%d' % + remoteaddr) + self.resultsocket = s + + def try_udp_connect(self): + for i in range(3): # three attempts + self.attempt_udp_connect() + if self.resultsocket is not None: + break + + def attempt_udp_connect(self): + if '*udpsock*' in PORTS: + s, (host, port) = PORTS['*udpsock*'] + else: + s = socket(AF_INET, SOCK_DGRAM) + s.bind(('', PORTS.get('CLIENT', INADDR_ANY))) + host, port = s.getsockname() + if 'sendudpto' in PORTS: + host = PORTS['sendudpto'] + secret = originalsecret = random.randrange(0, 65536) + self.routemsg(RMSG_UDP_CONN, secret, port) + secret = 'B' + chr(secret & 0xFF) + chr(secret >> 8) + while True: + iwtd, owtd, ewtd = select([s], [], [], 2.94) + if s not in iwtd: + return + packet, addr = s.recvfrom(200) + if packet.startswith(secret) and len(packet) == 5: + break + s.connect(addr) + #print 'got', repr(packet) + remotesecret = ord(packet[3]) | (ord(packet[4]) << 8) + secret = random.randrange(0, 65536) + secret = chr(secret & 0xFF) + chr(secret >> 8) + packet = '[bnb c->s]' + packet[3:5] + secret + for name in ('*udpsock*', 'CLIENT'): + if name in PORTS: + del PORTS[name] + from socketoverudp import SocketOverUdp + sock = SocketOverUdp(s, (originalsecret, remotesecret)) + #print 'sending', repr(packet) + sock.sendall(packet) + sock.flush() + data = '' + expected = '[bnb s->c]' + secret + while len(data) < len(expected): + #print 'waiting' + iwtd, owtd, ewtd = select([sock], [], [], 2.5) + if sock not in iwtd: + print >> sys.stderr, 'socket-over-udp timed out' + return + #print 'we get:' + data += sock.recv() + #print repr(data) + if data != expected: + print >> sys.stderr, 'bad udp data from', addr + else: + sock.sendall('^') + sock.flush() + print 'udp connexion handshake succeeded' + self.resultsocket = sock + + def acquire_udp_port(self): + try: + s = socket(AF_INET, SOCK_DGRAM) + s.bind(('', INADDR_ANY)) + randomdata = hex(random.randrange(0, sys.maxint)) + for i in range(5): + s.sendto(randomdata, METASERVER_UDP) + time.sleep(0.05) + self.sendmsg(message(MMSG_UDP_ADDR, randomdata)) + time.sleep(0.05) + self._readnextmsg(blocking=False) + if self.gotudpport: + PORTS['*udpsock*'] = s, self.gotudpport + udphost, udpport = self.gotudpport + print >> sys.stderr, ('udp port %d is visible from ' + 'outside on %s:%d' % ( + s.getsockname()[1], udphost, udpport)) + self.startthread(self.try_udp_connect) + break + finally: + self.ev2.signal() + + +def meta_connect(serverkey, backconnectport=None): + global METASERVER + if PORTS.get('SSH_RELAY'): + METASERVER = PORTS['SSH_RELAY'] + c = MetaClientCli(serverkey, backconnectport) + s = c.run() + c.done() + return s + +def print_server_list(): + s = connect() + if s is not None: + s.sendall(message(MMSG_LIST)) + buffer = "" + while decodemessage(buffer)[0] is None: + buffer += s.recv(8192) + s.close() + msg = decodemessage(buffer)[0] + assert msg[0] == RMSG_LIST + entries = decodedict(msg[1]) + if not entries: + print >> sys.stderr, 'No registered server.' + else: + print + print ' %-25s | %-30s | %s' % ( + 'server', 'game', 'players') + print '-'*27+'+'+'-'*32+'+'+'-'*11 + for key, value in entries.items(): + if ':' in key: + try: + addr, _, _ = gethostbyaddr(key[:key.index(':')]) + except: + pass + else: + addr = '%-27s' % (addr,) + if len(addr) < 28: addr += '|' + addr = '%-60s' % (addr,) + if len(addr) < 61: addr += '|' + print addr + value = decodedict(value) + print ' %-25s | %-30s | %s' % ( + key, value.get('desc', '<no description>'), + value.get('extradesc', '')) + print + +if __name__ == '__main__': + print_server_list() diff --git a/metaserver/metaserver.py b/metaserver/metaserver.py new file mode 100644 index 0000000..cb4ea19 --- /dev/null +++ b/metaserver/metaserver.py @@ -0,0 +1,365 @@ +from socket import * +import os, sys, time, random +from select import select +from cStringIO import StringIO +from weakref import WeakValueDictionary +from metastruct import * +from common import httpserver, stdlog + +if __name__ == '__main__': + os.chdir(os.path.dirname(sys.argv[0]) or os.curdir) + +META_SERVER_HTTP_PORT = 8050 +META_SERVER_PORT = 8055 +META_SERVER_UDP_PORT = 8055 +IMAGE_DIR = "../bubbob/doc/images" +ICONS = [open(os.path.join(IMAGE_DIR, s), 'rb').read() + for s in os.listdir(IMAGE_DIR) if s.endswith('.png')] +assert ICONS, "you need to run ../bubbob/doc/bonus-doc.py" +MAX_SERVERS = 50 +MAX_CONNEXIONS = 60 + + +serversockets = {} + +class MetaServer: + + def __init__(self, port=META_SERVER_PORT, udpport=META_SERVER_UDP_PORT): + s = socket(AF_INET, SOCK_STREAM) + s.bind(('', port)) + s.listen(5) + self.parentsock = s + self.ServersDict = {} + self.ServersList = [] + serversockets[s] = self.clientconnect, sys.exit + self.udpsock = socket(AF_INET, SOCK_DGRAM) + self.udpsock.bind(('', udpport)) + serversockets[self.udpsock] = self.udp_message, None + self.udpdata = [] + + def detach(self): + pid = os.fork() + if pid: + print pid + os._exit(0) + # in the child process + os.setsid() + logfile = stdlog.LogFile(limitsize=131072) + if logfile: + print >> logfile + print "Logging to", logfile.filename + fd = logfile.f.fileno() + try: + # detach from parent + os.dup2(fd, 1) + os.dup2(fd, 2) + os.dup2(fd, 0) + except OSError: + pass + logfile.close() + # record pid + f = open('pid', 'w') + print >> f, os.getpid() + f.close() + + def clientconnect(self): + s, addr = self.parentsock.accept() + Connexion(s, addr) + + def publish(self, server): + key = server.serverkey + if key in self.ServersDict: + current = self.ServersDict[key] + if current is server: + return + self.ServersList.remove(current) + elif len(self.ServersDict) >= MAX_SERVERS: + raise OverflowError + self.ServersList.append(server) + self.ServersDict[key] = server + print '+', key + + def unpublish(self, server): + key = server.serverkey + if key in self.ServersDict: + current = self.ServersDict[key] + if current is server: + del self.ServersDict[key] + self.ServersList.remove(server) + print '-', key + + def makelist(self): + items = {} + for srv in self.ServersList: + items[srv.serverkey] = encodedict(srv.serverinfo) + return encodedict(items) + + def getserver(self, key): + return self.ServersDict[key] + + def udp_message(self): + data, addr = self.udpsock.recvfrom(32) + self.udpdata.append((data, addr)) + if len(self.udpdata) > 50: + del self.udpdata[0] + + +class Connexion(MessageSocket): + + def __init__(self, s, addr): + MessageSocket.__init__(self, s) + self.serverinfo = { + 'time': int(time.time()), + 'icon': random.choice(ICONS), + 'iconformat': 'png', + } + self.addr = addr + self.key = '%s:%d' % addr + self.serverkey = None + print '[', self.key + self.backlinks = WeakValueDictionary() + if len(serversockets) >= MAX_CONNEXIONS: + self.disconnect() + raise OverflowError + serversockets[s] = self.receive, self.disconnect + + def disconnect(self): + metaserver.unpublish(self) + try: + del serversockets[self.s] + except KeyError: + pass + print ']', self.key + + def msg_serverinfo(self, info, *rest): + print '|', self.key + if len(info) > 15000: + raise OverflowError + info = decodedict(info) + self.serverinfo.update(info) + + def msg_startserver(self, port, *rest): + serverkey = '%s:%d' % (self.addr[0], port) + if self.serverkey and self.serverkey != serverkey: + metaserver.unpublish(self) + self.serverkey = serverkey + metaserver.publish(self) + + def msg_stopserver(self, *rest): + metaserver.unpublish(self) + + def msg_list(self, *rest): + self.s.sendall(message(RMSG_LIST, metaserver.makelist())) + + def msg_route(self, targetkey, *rest): + try: + target = metaserver.getserver(targetkey) + except KeyError: + try: + target = self.backlinks[targetkey] + except KeyError: + self.s.sendall(message(RMSG_NO_HOST, targetkey)) + return + target.route(self, *rest) + + def route(self, origin, msgcode, *rest): + self.backlinks[origin.key] = origin + self.s.sendall(message(msgcode, origin.key, *rest)) + + def msg_traceback(self, tb, *rest): + f = stdlog.LogFile('tb-%s.log' % (self.addr[0],)) + if f: + print >> f, tb + f.close() + + def msg_udp_addr(self, pattern, *rest): + for data, addr in metaserver.udpdata: + if data == pattern: + try: + host, port = addr + port = int(port) + except ValueError: + continue + self.s.sendall(message(RMSG_UDP_ADDR, host, port)) + return + else: + self.s.sendall(message(RMSG_UDP_ADDR)) + + MESSAGES = { + MMSG_INFO: msg_serverinfo, + MMSG_START: msg_startserver, + MMSG_STOP: msg_stopserver, + MMSG_LIST: msg_list, + MMSG_ROUTE: msg_route, + MMSG_TRACEBACK: msg_traceback, + MMSG_UDP_ADDR: msg_udp_addr, + } + + +# ____________________________________________________________ + +import htmlentitydefs +text_to_html = {} +for key, value in htmlentitydefs.entitydefs.items(): + text_to_html[value] = '&' + key + ';' +for i in range(32): + text_to_html[chr(i)] = '?' +def htmlquote(s, getter=text_to_html.get): + lst = [getter(c, c) for c in s if ' ' <= c < '\x7F'] + return ''.join(lst) +def txtfilter(s, maxlen=200): + s = str(s)[:maxlen] + l = [c for c in s if c in "!$*,-.0123456789:@ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "^_`abcdefghijklmnopqrstuvwxyz{|}"] + return ''.join(l) + +f = open('index.html', 'r') +HEADER, ROW, FOOTER = f.read().split('\\') +f.close() + +def makehosthtml(srv, bottommsg, join): + info = srv.serverinfo + hostname, port = srv.serverkey.split(':') + try: + fullhostname = gethostbyaddr(hostname)[0] + except: + fullhostname = hostname + url = None + if join: + url = "http://%s/join.html?host=%s&port=%s&httpport=%s&m=%s" % ( + join, hostname, port, info.get('httpport') or 'off', time.time()) + else: + try: + httpport = int(info.get('httpport')) + except: + pass + else: + url = "http://%s:%s/" % (hostname, httpport) + javamsg = """ +<p>Click on a server above to join the game. This only works if: +<ul><li>your browser understands Java; + <li>the server is not behind a firewall; + <li>you don't mind not hearing the nice background music. +</ul></p> +<p>Alternatively, install the +<a href="http://bub-n-bros.sourceforge.net/download.html">Python version</a> +of the client, which can cope with all of the above problems.</p> +<br>""" + if javamsg not in bottommsg: + bottommsg.append(javamsg) + result = '<strong>%s</strong>:%s' % (fullhostname, port) + if url: + result = '<a href="%s">%s</a>' % (url, result) + return result + +def indexloader(headers, join=[], head=[], **options): + if join: + join = join[0] + data = [HEADER % (head and head[0] or '')] + bottommsg = [] + if metaserver.ServersList: + counter = 0 + for srv in metaserver.ServersList: + info = srv.serverinfo + icon = '<img src="ico?key=%s">' % srv.serverkey + bgcolor = ('#C0D0D0', '#E0D0A8')[counter&1] + hosthtml = makehosthtml(srv, bottommsg, join) + desc = htmlquote(info.get('desc')) or '' + extradesc = htmlquote(info.get('extradesc')) or '' + if isinstance(info.get('time'), int): + stime = time.strftime('%a %b %d<br>%H:%M GMT', + time.gmtime(info['time'])) + else: + stime = '' + data.append(ROW % locals()) + counter += 1 + else: + data.append('''<tr><td bgcolor="#FFFFFF"> + Sorry, there is no registered server at the moment. + </td></tr>''') + if join: + extrafooter = '''<p><img src="home.png"> + <a href="http://%s/?time=%s">Back to local games</a></p>''' % ( + join, time.time()) + else: + extrafooter = '' + bottommsg = '\n'.join(bottommsg) + tbfiles = [s for s in os.listdir('.') if s.startswith('tb-')] + if tbfiles: + tbfiles = len(tbfiles) + else: + tbfiles = '' + data.append(FOOTER % locals()) + return StringIO(''.join(data)), 'text/html' + +def icoloader(key, **options): + srv = metaserver.getserver(key[0]) + iconformat = txtfilter(srv.serverinfo['iconformat'], 32) + return StringIO(srv.serverinfo['icon']), 'image/' + iconformat + +httpserver.register('', indexloader) +httpserver.register('index.html', indexloader) +httpserver.register('bub-n-bros.html', indexloader) +httpserver.register('ico', icoloader) +httpserver.register('mbub.png', httpserver.fileloader('mbub.png', 'image/png')) +httpserver.register('home.png', httpserver.fileloader('home.png', 'image/png')) + +def openhttpsocket(port = META_SERVER_HTTP_PORT): + from BaseHTTPServer import HTTPServer + class ServerClass(HTTPServer): + def get_request(self): + sock, addr = self.socket.accept() + sock.settimeout(5.0) + return sock, addr + HandlerClass = httpserver.MiniHandler + server_address = ('', port) + httpd = ServerClass(server_address, HandlerClass) + s = httpd.socket + serversockets[s] = httpd.handle_request, None + + +# ____________________________________________________________ + +def mainloop(): + while 1: + iwtd = serversockets.keys() + iwtd, owtd, ewtd = select(iwtd, [], iwtd) + #print iwtd, owtd, ewtd, serversockets + for s in iwtd: + if s in serversockets: + input, close = serversockets[s] + try: + input() + except: + import traceback + print "-"*60 + traceback.print_exc() + print "-"*60 + for s in ewtd: + if s in serversockets: + input, close = serversockets[s] + try: + close() + except: + import traceback + print "-"*60 + traceback.print_exc() + print "-"*60 + + +if __name__ == '__main__': + metaserver = MetaServer() + if sys.argv[1:2] == ['-f']: + metaserver.detach() + try: + openhttpsocket() + print 'listening to client port tcp %d / http %d / udp %d.' % ( + META_SERVER_PORT, + META_SERVER_HTTP_PORT, + META_SERVER_UDP_PORT) + mainloop() + finally: + if metaserver.ServersList: + print '*** servers still connected, waiting 5 seconds' + time.sleep(5) + print '*** leaving at', time.ctime() diff --git a/metaserver/metastruct.py b/metaserver/metastruct.py new file mode 100644 index 0000000..13240cc --- /dev/null +++ b/metaserver/metastruct.py @@ -0,0 +1,75 @@ +import sys, os +LOCALDIR = __file__ +LOCALDIR = os.path.abspath(os.path.dirname(LOCALDIR)) +sys.path.insert(0, os.path.dirname(LOCALDIR)) + +from common.msgstruct import * +from socket import error + +MMSG_INFO = 'I' +MMSG_START = '+' +MMSG_STOP = '-' +MMSG_LIST = 'L' +MMSG_ROUTE = 'R' +MMSG_TRACEBACK= 'T' +MMSG_UDP_ADDR = 'U' + +RMSG_WAKEUP = 'w' +RMSG_PING = 'p' +RMSG_PONG = 'o' +RMSG_SYNC = 'y' +RMSG_CONNECT = 'c' +RMSG_LIST = 'l' +RMSG_UDP_ADDR = 'u' +RMSG_UDP_CONN = 'd' +RMSG_NO_HOST = '?' + + +def encodedict(dict): + data = [] + for key, value in dict.items(): + data.append(message('#', key, value)) + return ''.join(data) + +def encodelist(list): + return message('[', *list) + +def decodedict(buffer): + result = {} + while 1: + msg, buffer = decodemessage(buffer) + if msg is None or len(msg) < 3 or msg[0] != '#': + break + result[msg[1]] = msg[2] + return result + +def decodelist(buffer): + msg, buffer = decodemessage(buffer) + assert msg[0] == '[' + return list(msg[1:]) + + +class MessageSocket: + + def __init__(self, s): + self.s = s + self.buffer = "" + + def receive(self): + try: + data = self.s.recv(2048) + except error: + data = '' + if not data: + self.disconnect() + return + self.buffer += data + while 1: + msg, self.buffer = decodemessage(self.buffer) + if msg is None: + break + if msg[0] not in self.MESSAGES: + print >> sys.stderr, 'unknown message %r' % (msg[0],) + else: + fn = self.MESSAGES[msg[0]] + fn(self, *msg[1:]) diff --git a/metaserver/pipelayer.py b/metaserver/pipelayer.py new file mode 100644 index 0000000..9baf78a --- /dev/null +++ b/metaserver/pipelayer.py @@ -0,0 +1,337 @@ +#import os +import struct +from collections import deque +from zlib import crc32 + + +class InvalidPacket(Exception): + pass + + +FLAG_NAK1 = 0xE0 +FLAG_NAK = 0xE1 +FLAG_REG = 0xE2 +FLAG_CFRM = 0xE3 + +FLAG_RANGE_START = 0xE0 +FLAG_RANGE_STOP = 0xE4 + +max_old_packets = 200 # must be <= 256 + + +class PipeLayer(object): + timeout = 1 + headersize = 4 + + def __init__(self, initialcrcs=(0, 0)): + #self.localid = os.urandom(4) + #self.remoteid = None + self.cur_time = 0 + self.out_queue = deque() + self.out_nextseqid = 0 + self.out_nextrepeattime = None + self.in_nextseqid = 0 + self.in_outoforder = {} + self.out_oldpackets = deque() + self.out_flags = FLAG_REG + self.out_resend = 0 + self.out_resend_skip = False + self.in_crc, self.out_crc = initialcrcs + + def queue(self, data): + if data: + self.out_queue.appendleft(data) + + def queue_size(self): + total = 0 + for data in self.out_queue: + total += len(data) + return total + + def in_sync(self): + return not self.out_queue and self.out_nextrepeattime is None + + def settime(self, curtime): + self.cur_time = curtime + if self.out_queue: + if len(self.out_oldpackets) < max_old_packets: + return 0 # more data to send now + if self.out_nextrepeattime is not None: + return max(0, self.out_nextrepeattime - curtime) + else: + return None + + def is_congested(self): + return len(self.out_oldpackets) >= max_old_packets + + def encode(self, maxlength): + #print ' '*self._dump_indent, '--- OUTQ', self.out_resend, self.out_queue + if len(self.out_oldpackets) >= max_old_packets: + # congestion, stalling + payload = 0 + else: + payload = maxlength - 8 + if payload <= 0: + raise ValueError("encode(): buffer too small") + if (self.out_nextrepeattime is not None and + self.out_nextrepeattime <= self.cur_time): + # no ACK received so far, send a packet (possibly empty) + if not self.out_queue: + payload = 0 + else: + if not self.out_queue: # no more data to send + return None + if payload == 0: # congestion + return None + # prepare a packet + seqid = self.out_nextseqid + flags = self.out_flags + self.out_flags = FLAG_REG # clear out the flags for the next time + #if flags in (FLAG_NAK, FLAG_NAK1): + # print 'out_flags NAK', hex(flags) + if payload > 0: + self.out_nextseqid = (seqid + 1) & 0xFFFF + data = self.out_queue.pop() + packetlength = len(data) + if self.out_resend > 0: + if packetlength > payload + 4: + raise ValueError("XXX need constant buffer size for now") + self.out_resend -= 1 + if self.out_resend_skip: + if self.out_resend > 0: + self.out_queue.pop() + self.out_resend -= 1 + self.out_nextseqid = (seqid + 2) & 0xFFFF + self.out_resend_skip = False + packetpayload = data + else: + packet = [] + while packetlength <= payload: + packet.append(data) + if not self.out_queue: + break + data = self.out_queue.pop() + packetlength += len(data) + else: + rest = len(data) + payload - packetlength + packet.append(data[:rest]) + self.out_queue.append(data[rest:]) + packetpayload = ''.join(packet) + self.out_crc = crc32(packetpayload, self.out_crc) + packetpayload += struct.pack("!I", self.out_crc & 0xffffffff) + self.out_oldpackets.appendleft(packetpayload) + #print ' '*self._dump_indent, '--- OLDPK', self.out_oldpackets + else: + # a pure ACK packet, no payload + if self.out_oldpackets and flags == FLAG_REG: + flags = FLAG_CFRM + packetpayload = '' + packet = struct.pack("!BBH", flags, + self.in_nextseqid & 0xFF, + seqid) + packetpayload + if self.out_oldpackets: + self.out_nextrepeattime = self.cur_time + self.timeout + else: + self.out_nextrepeattime = None + #self.dump('OUT', packet) + return packet + + def decode(self, rawdata): + if len(rawdata) < 4: + raise InvalidPacket + #print ' '*self._dump_indent, '------ out %d (+%d) in %d' % (self.out_nextseqid, self.out_resend, self.in_nextseqid) + #self.dump('IN ', rawdata) + in_flags, ack_seqid, in_seqid = struct.unpack("!BBH", rawdata[:4]) + if not (FLAG_RANGE_START <= in_flags < FLAG_RANGE_STOP): + raise InvalidPacket + in_diff = (in_seqid - self.in_nextseqid ) & 0xFFFF + ack_diff = (self.out_nextseqid + self.out_resend - ack_seqid) & 0xFF + if in_diff >= max_old_packets: + return '' # invalid, but can occur as a late repetition + if ack_diff != len(self.out_oldpackets): + # forget all acknowledged packets + if ack_diff > len(self.out_oldpackets): + return '' # invalid, but can occur with packet reordering + while len(self.out_oldpackets) > ack_diff: + #print ' '*self._dump_indent, '--- POP', repr(self.out_oldpackets[-1]) + self.out_oldpackets.pop() + if self.out_oldpackets: + self.out_nextrepeattime = self.cur_time + self.timeout + else: + self.out_nextrepeattime = None # all packets ACKed + if in_flags == FLAG_NAK or in_flags == FLAG_NAK1: + #print 'recv NAK', hex(in_flags) + # this is a NAK: resend the old packets as far as they've not + # also been ACK'ed in the meantime (can occur with reordering) + while self.out_resend < len(self.out_oldpackets): + self.out_queue.append(self.out_oldpackets[self.out_resend]) + self.out_resend += 1 + self.out_nextseqid = (self.out_nextseqid - 1) & 0xFFFF + #print ' '*self._dump_indent, '--- REP', self.out_nextseqid, repr(self.out_queue[-1]) + self.out_resend_skip = in_flags == FLAG_NAK1 + elif in_flags == FLAG_CFRM: + # this is a CFRM: request for confirmation + self.out_nextrepeattime = self.cur_time + # receive this packet's payload if it is the next in the sequence + if in_diff == 0: + if len(rawdata) > 8: + #print ' '*self._dump_indent, 'RECV ', self.in_nextseqid, repr(rawdata[4:]) + payload = rawdata[4:-4] + crc, = struct.unpack("!I", rawdata[-4:]) + if crc != (crc32(payload, self.in_crc) & 0xffffffff): + self.bad_crc() + return '' # bad crc! drop packet + self.in_nextseqid = (self.in_nextseqid + 1) & 0xFFFF + self.in_crc = crc + result = [payload] + while self.in_nextseqid in self.in_outoforder: + rawdata = self.in_outoforder.pop(self.in_nextseqid) + payload = rawdata[4:-4] + crc, = struct.unpack("!I", rawdata[-4:]) + if crc != (crc32(payload, self.in_crc) & 0xffffffff): + # bad crc! clear all out-of-order packets + self.bad_crc() + break + self.in_nextseqid = (self.in_nextseqid + 1) & 0xFFFF + self.in_crc = crc + result.append(payload) + return ''.join(result) + else: + # we missed at least one intermediate packet: send a NAK + if len(rawdata) > 4: + self.in_outoforder[in_seqid] = rawdata + if ((self.in_nextseqid + 1) & 0xFFFF) in self.in_outoforder: + self.out_flags = FLAG_NAK1 + else: + self.out_flags = FLAG_NAK + self.out_nextrepeattime = self.cur_time + return '' + + def bad_crc(self): + import sys + print >> sys.stderr, "warning: bad crc on udp connexion" + self.in_outoforder.clear() + self.out_flags = FLAG_NAK + self.out_nextrepeattime = self.cur_time + + _dump_indent = 0 + def dump(self, dir, rawdata): + in_flags, ack_seqid, in_seqid = struct.unpack("!BBH", rawdata[:4]) + print ' ' * self._dump_indent, dir, + if in_flags == FLAG_NAK: + print 'NAK', + elif in_flags == FLAG_NAK1: + print 'NAK1', + elif in_flags == FLAG_CFRM: + print 'CFRM', + #print ack_seqid, in_seqid, '(%d bytes)' % (len(rawdata)-4,) + print ack_seqid, in_seqid, repr(rawdata[4:]) + + +def pipe_over_udp(udpsock, send_fd=-1, recv_fd=-1, + timeout=1.0, inactivity_timeout=None): + """Example: send all data showing up in send_fd over the given UDP + socket, and write incoming data into recv_fd. The send_fd and + recv_fd are plain file descriptors. When an EOF is read from + send_fd, this function returns (after making sure that all data was + received by the remote side). + """ + import os + from select import select + from time import time + p = PipeLayer() + p.timeout = timeout + iwtdlist = [udpsock] + if send_fd >= 0: + iwtdlist.append(send_fd) + running = True + while running or not p.in_sync(): + delay = delay1 = p.settime(time()) + if delay is None: + delay = inactivity_timeout + iwtd, owtd, ewtd = select(iwtdlist, [], [], delay) + if iwtd: + if send_fd in iwtd: + data = os.read(send_fd, 1500 - p.headersize) + if not data: + # EOF + iwtdlist.remove(send_fd) + running = False + else: + #print 'queue', len(data) + p.queue(data) + if udpsock in iwtd: + packet = udpsock.recv(65535) + #print 'decode', len(packet) + p.settime(time()) + data = p.decode(packet) + i = 0 + while i < len(data): + i += os.write(recv_fd, data[i:]) + elif delay1 is None: + break # long inactivity + p.settime(time()) + packet = p.encode(1500) + if packet: + #print 'send', len(packet) + #if os.urandom(1) >= '\x08': # emulate packet losses + udpsock.send(packet) + + +class PipeOverUdp(object): + + def __init__(self, udpsock, timeout=1.0): + import thread, os + self.os = os + self.sendpipe = os.pipe() + self.recvpipe = os.pipe() + thread.start_new_thread(pipe_over_udp, (udpsock, + self.sendpipe[0], + self.recvpipe[1], + timeout)) + + def __del__(self): + os = self.os + if self.sendpipe: + os.close(self.sendpipe[0]) + os.close(self.sendpipe[1]) + self.sendpipe = None + if self.recvpipe: + os.close(self.recvpipe[0]) + os.close(self.recvpipe[1]) + self.recvpipe = None + + close = __del__ + + def send(self, data): + if not self.sendpipe: + raise IOError("I/O operation on a closed PipeOverUdp") + return self.os.write(self.sendpipe[1], data) + + def sendall(self, data): + i = 0 + while i < len(data): + i += self.send(data[i:]) + + def recv(self, bufsize): + if not self.recvpipe: + raise IOError("I/O operation on a closed PipeOverUdp") + return self.os.read(self.recvpipe[0], bufsize) + + def recvall(self, bufsize): + buf = [] + while bufsize > 0: + data = self.recv(bufsize) + buf.append(data) + bufsize -= len(data) + return ''.join(buf) + + def fileno(self): + if not self.recvpipe: + raise IOError("I/O operation on a closed PipeOverUdp") + return self.recvpipe[0] + + def ofileno(self): + if not self.sendpipe: + raise IOError("I/O operation on a closed PipeOverUdp") + return self.sendpipe[1] diff --git a/metaserver/socketoverudp.py b/metaserver/socketoverudp.py new file mode 100644 index 0000000..2df4a75 --- /dev/null +++ b/metaserver/socketoverudp.py @@ -0,0 +1,174 @@ +from time import time as now +from pipelayer import PipeLayer, InvalidPacket +from pipelayer import FLAG_RANGE_START, FLAG_RANGE_STOP +import socket, struct + +SOU_RANGE_START = FLAG_RANGE_START +SOU_MIXED_DATA = FLAG_RANGE_STOP + 0 +SOU_SHUTDOWN = FLAG_RANGE_STOP + 1 +SOU_RANGE_STOP = FLAG_RANGE_STOP + 2 + +SHUTDOWN_PACKET = chr(SOU_SHUTDOWN) + '**' # < 4 characters + +CONGESTION_TIMEOUT = 20.0 +#CONSOLIDATE_DELAY = 0.1 + + +class SocketOverUdp(object): + RECV_CAN_RETURN_EMPTY = True + PACKETSIZE = 996 + MIXEDPACKETSIZE = 1080 + + def __init__(self, udpsock, initialcrcs): + self.udpsock = udpsock + self.pl = PipeLayer(initialcrcs) + self.congested_since = None + #self.consolidate_sends = None + #self.encode_delayed_until = now() + + def close(self): + try: + self.udpsock.send(SHUTDOWN_PACKET) + except socket.error: + pass + self.udpsock.close() + + def _progress(self): + if self.pl.settime(now()) == 0.0: + self._encode() + + def _encode(self): + #if self.consolidate_sends: + # if self.pl.cur_time < self.encode_delayed_until: + # return False + # self.encode_delayed_until = self.pl.cur_time + CONSOLIDATE_DELAY + packet = self.pl.encode(self.PACKETSIZE) + if packet is not None: + #print 'send:', repr(packet) + if self.pl.is_congested(): + if self.congested_since is None: + self.congested_since = now() + else: + if now() > self.congested_since + CONGESTION_TIMEOUT: + self.udpsock.send(SHUTDOWN_PACKET) + raise socket.error("peer not responding, timing out") + else: + self.congested_since = None + #print repr(packet[:10]) + #print "out:", len(packet) + #print ' ---' + self.udpsock.send(packet) + + def _decode(self, packet): + try: + data = self.pl.decode(packet) + #print ' ~~~' + return data + except InvalidPacket: + if len(packet) >= 4: + hdr, reserved, size = struct.unpack("!BBH", packet[:4]) + if hdr == SOU_MIXED_DATA: + #print ' ~~~[unmix%d/%d]' % (len(packet[4+size:]), + # len(packet)) + self.udp_over_udp_decoder(packet[4:4+size]) + return self._decode(packet[4+size:]) + else: + # non-tiny packets with no recognized hdr byte are + # assumed to be pure video traffic + #print ' ~~~[video]' + self.udp_over_udp_decoder(packet) + return '' + elif packet == SHUTDOWN_PACKET: + raise socket.error("received an end-of-connexion packet") + else: + #print ' ~~~[INVALID%d]' % (len(packet),) + return '' + + def fileno(self): + self._progress() + return self.udpsock.fileno() + + def flush(self): + while self.pl.settime(now()) == 0.0: + #self.encode_delayed_until = self.pl.cur_time + self._encode() + + def recv(self, _ignoredbufsize=None): + #print 'recv:' + packet = self.udpsock.recv(65535) + #print " in:", len(packet), hex(ord(packet[0])) + #print repr(packet) + self.pl.settime(now()) + data = self._decode(packet) + #print 'which is really', repr(data) + self._encode() + #if data: + # print " IN:", len(data) + return data + + def sendall(self, data): + #print 'queuing', repr(data) + #print ' OUT:', len(data) + self.pl.queue(data) + #self._progress() + return len(data) + + send = sendall + + def send_video_data(self, udpdata): + forced_embedded = SOU_RANGE_START <= ord(udpdata[0]) < SOU_RANGE_STOP + self.pl.settime(now()) + packet = self.pl.encode(self.PACKETSIZE) or '' + if not forced_embedded and not packet: + # no PipeLayer packet, send as plain udp data + datagram = udpdata + elif len(packet) + len(udpdata) <= self.MIXEDPACKETSIZE: + # fits in a single mixed data packet + datagram = (struct.pack("!BBH", SOU_MIXED_DATA, 0, len(udpdata)) + + udpdata + packet) + #print ' ---[mix%d/%d]' % (len(packet), len(datagram)) + else: + # two packets needed + #print repr(packet[:10]) + #print "out:", len(packet) + #print ' ---' + self.udpsock.send(packet) + datagram = udpdata + #print repr(datagram[:10]) + #print "out:", len(datagram), hex(ord(datagram[0])) + self.udpsock.send(datagram) + #self.encode_delayed_until = self.pl.cur_time + CONSOLIDATE_DELAY + #if self.consolidate_sends is None: + # self.consolidate_sends = True + return len(udpdata) + + def udp_over_udp_mixer(self): + return UdpOverUdpMixer(self) + + def udp_over_udp_decoder(self, data): + pass # method overridden by pclient.py + + def getpeername(self): + return self.udpsock.getpeername() + + def getsockname(self): + return self.udpsock.getsockname() + + def setsockopt(self, level, opt, value): + # note that TCP_NODELAY is set by the bub-n-bros client, not the server + #if level == socket.SOL_TCP and opt == socket.TCP_NODELAY: + # self.consolidate_sends = not value + #else: + # ignored + pass + + def setblocking(self, _ignored): + pass # XXX good enough for common/gamesrv.py + + +class UdpOverUdpMixer(object): + def __init__(self, sockoverudp): + self.send = sockoverudp.send_video_data + + def setsockopt(self, *args): + pass # ignored |