import http.server
from http.server import SimpleHTTPRequestHandler
import urllib.parse, cgi, html.entities
import sys, os, time
from io import BytesIO, StringIO
class Translator:
"""For use with format strings.
'formatstring % translator' will evaluate all %(xxx)s expressions
found in the format string in the given globals/locals.
Multiline expressions are assumed to be one or several complete
statements; they are executed and whatever they print is inserted back
into the format string."""
def __init__(self, globals, locals):
self.globals = globals
self.locals = locals
def __getitem__(self, expr):
# DEBUG: print(f"expr = {expr}",file=sys.stderr)
if '\n' in expr:
if not expr.endswith('\n'):
expr += '\n'
prevstdout = sys.stdout
try:
sys.stdout = f = StringIO()
exec(expr, self.globals, self.locals)
finally:
sys.stdout = prevstdout
return f.getvalue()
else:
return eval(expr, self.globals, self.locals)
class TranslatorIO:
"Lazy version of Translator."
def __init__(self, fmt, d):
self.gen = self.generate(fmt, d)
def read(self, ignored=None):
for text in self.gen:
if text:
return text
return ''
def close(self):
self.gen = ()
def generate(self, fmt, d):
t = Translator(d, d)
for data in fmt.split('\x0c'):
# DEBUG: print(f"data={data}",file=sys.stderr) # only for debug
yield (data % t).encode()
# HTML quoting
text_to_html = {}
for key, value in list(html.entities.entitydefs.items()):
text_to_html[value] = '&' + key + ';'
def htmlquote(s):
return ''.join([text_to_html.get(c, c) for c in s]).encode()
# HTTP Request Handler
pathloaders = {}
def canonicalpath(url):
if url.startswith('/'):
url = url[1:]
return url.lower()
def register(url, loader):
pathloaders[canonicalpath(url)] = loader
def is_registered(url):
return canonicalpath(url) in pathloaders
def load(filename, mimetype=None, locals=None):
if mimetype and mimetype.startswith('text/'):
mode = 'r'
else:
mode = 'rb'
f = open(filename, mode)
if locals is not None:
data = f.read()
f.close()
#data = data.replace('%"', '%%"')
d = globals().copy()
d.update(locals)
f = TranslatorIO(data, d)
return f, mimetype
def fileloader(filename, mimetype=None):
def loader(**options):
return load(filename, mimetype)
return loader
class HTTPRequestError(Exception):
pass
class MiniHandler(SimpleHTTPRequestHandler):
def send_head(self, query=''):
addr, host, path, query1, fragment = urllib.parse.urlsplit(self.path)
path = canonicalpath(path)
if path not in pathloaders:
if path + '/' in pathloaders:
return self.redirect(path + '/')
self.send_error(404)
return None
kwds = {}
for q in [query1, query]:
if q:
kwds.update(cgi.parse_qs(q))
loader = pathloaders[path]
try:
hdr = self.headers
hdr['remote host'] = self.client_address[0]
f, ctype = loader(headers=hdr, **kwds)
except IOError as e:
print( e, file=sys.stderr )
self.send_error(404, b"I/O error: " + str(e).encode())
return None
except HTTPRequestError as e:
print( e, file=sys.stderr )
self.send_error(500, str(e).encode())
return None
except:
f = StringIO()
import traceback
traceback.print_exc(file=sys.stderr)
traceback.print_exc(file=f)
data = htmlquote(f.getvalue())
data = data.replace(b'\n', b'
\n')
self.send_error(500)
return BytesIO(b'
'+data+b'
') if ctype is None: ctype = self.guess_type(self.translate_path(self.path)) elif f is None: return self.redirect(ctype) self.send_response(200) self.send_header("Content-type", ctype) self.end_headers() return f def redirect(self, url): self.send_response(302) self.send_header("Content-type", 'text/html') self.send_header("Location", url) self.end_headers() return BytesIO(b''' Please click here to continue. ''' % url.encode()) def do_POST(self): print("do_POST", sys.stderr) try: nbytes = int(self.headers.getheader('content-length')) except: nbytes = 0 query = self.rfile.read(nbytes).strip() f = self.send_head(query) if f: self.copyfile(f, self.wfile) f.close() def parse_request(self): if self.raw_requestline == '': return False else: return SimpleHTTPRequestHandler.parse_request(self) def address_string(self): """Override to avoid DNS lookups""" return "%s:%d" % self.client_address def finish(self): SimpleHTTPRequestHandler.finish(self) self.connection.close() while actions_when_finished: actions_when_finished.pop(0)() actions_when_finished = [] def my_host(): from . import gamesrv port = gamesrv.socketports[gamesrv.openhttpsocket()] return '127.0.0.1:%d' % port