import base64 import json import math import struct import sys import zlib from pathlib import Path class IterFile: def __init__(self, iterator): self.iterator = iterator self.buf = b'' def read(self, size): try: while size < 0 or len(self.buf) < size: self.buf += next(self.iterator) except StopIteration: pass result = self.buf[:size] self.buf = self.buf[size:] return result class Rgss3aReader: def __init__(self, fh): self.fh = fh def read_int32(self): return struct.unpack('I', self.fh.read(4))[0] def decrypt_bytes(self, size, key): raw = self.fh.read(size) k = struct.pack('I', key) return bytes([b ^ k[i % 4] for i, b in enumerate(raw)]) def iter_chunks(self, offset, size, initial_key): pos = 0 key = initial_key while True: n = min(4, size - pos) if n == 0: break pos += n yield self.decrypt_bytes(n, key) key = (key * 7 + 3) % (1 << 32) def read_toc(self): assert self.fh.read(7) == b'RGSSAD\0' assert self.fh.read(1) == b'\x03' key = self.read_int32() * 9 + 3 toc = {} while True: offset = self.read_int32() ^ key size = self.read_int32() ^ key file_key = self.read_int32() ^ key name_size = self.read_int32() ^ key if offset == 0: break name = self.decrypt_bytes(name_size, key).decode('utf-8') toc[name] = IterFile(self.iter_chunks(offset, size, file_key)) return toc class RubyMarshalReader: # https://docs.ruby-lang.org/en/2.2.0/marshal_rdoc.html def __init__(self, fh): self.fh = fh self.symbols = [] self.objects = [] def check_version(self): version = self.fh.read(2) if version: assert version[0] == 4 assert version[1] <= 8 return True else: return False def read_bytes(self, size): b = self.fh.read(size) return { 'type': 'Bytes', 'value': base64.b64encode(b).decode('ascii'), } def read_long(self): first = struct.unpack('b', self.fh.read(1))[0] if first == 0: return 0 elif -4 <= first <= 4: k = 0 for b in reversed(self.fh.read(abs(first))): k = k << 8 | b if first < 0: k -= 1 << (8 * abs(first)) return k elif first > 0: return first - 5 else: return first + 5 def read_symbol(self): typ, value = self._read_obj() assert typ == b':', typ return value def read_obj(self): _, obj = self._read_obj() return obj def read_multidict(self): # can have non-string keys result = [] size = self.read_long() for _ in range(size): key = self.read_obj() result.append((key, self.read_obj())) return result def read_dict(self): result = {} size = self.read_long() for _ in range(size): key = self.read_symbol() result[key] = self.read_obj() return result def _read_complex_obj(self, typ): if typ == b'I': t, obj = self._read_obj() data = self.read_dict() if t == b'"' and data == {'E': True}: assert obj['type'] == 'Bytes' b = base64.b64decode(obj['value']) try: return b.decode() except UnicodeDecodeError: return zlib.decompress(b).decode() return { 'type': 'Instance', 'instance': obj, 'vars': data, } elif typ == b'e': return { 'type': 'Extended', 'obj': self.read_obj(), 'mod': self.read_obj(), # should be symbol!? } elif typ == b'[': size = self.read_long() return [self.read_obj() for _ in range(size)] elif typ == b'l': sign = self.fh.read(1) assert sign in [b'+', b'-'] size = self.read_long() value = 0 for b in reversed(self.fh.read(size * 2)): value = value << 8 | b if sign == b'-': value = -value return value elif typ == b'"': size = self.read_long() return self.read_bytes(size) elif typ == b'f': size = self.read_long() s = self.fh.read(size) if s == b'inf': return math.inf elif s == b'-inf': return -math.inf elif s == b'nan': return math.nan else: return float(s.split(b'\0', 1)[0].decode()) elif typ in [b'c', b'm', b'M']: size = self.read_long() return { 'type': { b'c': 'ClassRef', b'm': 'ModuleRef', b'M': 'ClassRef/ModuleRef', }[typ], 'data': self.read_bytes(size), } elif typ in [b'{', b'}']: return { 'type': 'Dict', 'items': self.read_multidict(), 'default': self.read_obj() if typ == b'}' else None, } elif typ in [b'd', b'C', b'U']: return { 'type': { b'd': 'Data', b'C': 'Class', b'U': 'User Marshal' }[typ], 'name': self.read_symbol(), 'data': self.read_obj(), } elif typ in [b'o', b'S']: return { 'type': {b'o': 'Object', b'S': 'Struct'}[typ], 'class': self.read_symbol(), 'data': self.read_dict(), } elif typ == b'/': size = self.read_long() return { 'type': 'regex', 'value': self.fh.read(size).decode(), 'opts': self.fh.read(1)[0], } elif typ == b'u': name = self.read_symbol() size = self.read_long() return { 'type': 'User Defined', 'name': name, 'data': self.read_bytes(size), } else: raise ValueError(typ) def _read_obj(self): typ = self.fh.read(1) if typ == b'T': value = True elif typ == b'F': value = False elif typ == b'0': value = None elif typ == b'i': value = self.read_long() elif typ == b':': size = self.read_long() s = self.fh.read(size).decode() self.symbols.append(s) value = s elif typ == b';': i = self.read_long() return b':', self.symbols[i] elif typ == b'@': i = self.read_long() value = self.objects[i] else: i = len(self.objects) self.objects.append(None) value = self._read_complex_obj(typ) self.objects[i] = value return typ, value @classmethod def read_all(cls, fh): objs = [] while True: r = cls(fh) if not r.check_version(): break objs.append(r.read_obj()) return objs def log(path, i, total): progress = i * 100 // total print(f'[{progress: 3}%] {path}') def extract_scripts(root, scripts, i, total): root.mkdir(parents=True, exist_ok=True) for j, (id, title, content) in enumerate(scripts): if not content: continue path = root / f'{j:06}_{id}_{title.replace("/", "_")}.rb' log(path, i, total) with path.open('w') as fh: fh.write(content) if __name__ == '__main__': root = Path(sys.argv[1]) with open(root / 'Game.rgss3a', 'rb') as fh: reader = Rgss3aReader(fh) toc = reader.read_toc() total = len(toc) for i, (name, fi) in enumerate(toc.items()): path = root / name.replace('\\', '/') path.parent.mkdir(parents=True, exist_ok=True) if path.suffix == '.rvdata2': objs = RubyMarshalReader.read_all(fi) if name == 'Data\\Scripts.rvdata2': assert len(objs) == 1 extract_scripts(root / 'Scripts', objs[0], i, total) else: log(path.with_suffix(".json"), i, total) with path.with_suffix('.json').open('w') as fo: json.dump(objs, fo, indent=2) else: log(path, i, total) with path.open('wb') as fo: while chunk := fi.read(4): fo.write(chunk)