from struct import pack, unpack, calcsize import sys try: from localmsg import PORTS except ImportError: PORTS = {} try: from localmsg import HOSTNAME except ImportError: from socket import gethostname HOSTNAME = gethostname() MSG_WELCOME = b"Welcome to gamesrv.py(3) !\n" MSG_BROADCAST_PORT= b"*" MSG_DEF_PLAYFIELD = b"p" MSG_DEF_KEY = b"k" MSG_DEF_ICON = b"r" MSG_DEF_BITMAP = b"m" MSG_DEF_SAMPLE = b"w" MSG_DEF_MUSIC = b"z" MSG_PLAY_MUSIC = b"Z" MSG_FADEOUT = b"f" MSG_PLAYER_JOIN = b"+" MSG_PLAYER_KILL = b"-" MSG_PLAYER_ICON = b"i" MSG_PING = b"g" MSG_PONG = b"G" MSG_INLINE_FRAME = b"\\" MSG_PATCH_FILE = MSG_DEF_MUSIC MSG_ZPATCH_FILE = b"P" MSG_MD5_FILE = b"M" MSG_RECORDED = b"\x00" CMSG_PROTO_VERSION= b"v" CMSG_KEY = b"k" CMSG_ADD_PLAYER = b"+" CMSG_REMOVE_PLAYER= b"-" CMSG_UDP_PORT = b"<" CMSG_ENABLE_SOUND = b"s" CMSG_ENABLE_MUSIC = b"m" CMSG_PING = b"g" CMSG_PONG = b"G" CMSG_DATA_REQUEST = b"M" CMSG_PLAYER_NAME = b"n" BROADCAST_MESSAGE = b"game!" # less than 6 bytes def message(tp, *values): print(f"tp={tp} values={values}") strtype = type('') bytestype = type(b'') typecodes = [''] for v in values: print( f"message: type(v) = {type(v)} , value = {v}", file=sys.stderr ) if type(v) is strtype or type(v) is bytestype: typecodes.append('%ds' % len(v)) elif 0 <= v < 256: typecodes.append('B') else: typecodes.append('l') typecodes = ''.join(typecodes) assert len(typecodes) < 256 print(f"typecodes={typecodes}",file=sys.stderr) fmt= ("!B%dsc" % len(typecodes)) + typecodes # print(f"fmt={fmt} typecodes.encode() = {typecodes.encode()}",file=sys.stderr ) # print(f"type(typecodes.encode()) = {type(typecodes.encode())}", file=sys.stderr ) try: ret = pack(fmt, len(typecodes), typecodes.encode(), tp, *values) except Exception as e: print("Exception!",file=sys.stderr) print(f"fmt={fmt} typecodes.encode() = {typecodes.encode()}",file=sys.stderr ) print(f"type(typecodes.encode()) = {type(typecodes.encode())}", file=sys.stderr ) raise e return ret def decodemessage(data): if data: limit = data[0] + 1 if len(data) >= limit: typecodes = b"!c" + data[1:limit] end = limit + calcsize(typecodes) if len(data) >= end: return unpack(typecodes, data[limit:end]), data[end:] elif end > 1000000: raise OverflowError return None, data