Crusader_Decomp/tools/extract_eusecode_flx.py

2302 lines
102 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""Extractor for Crusader's EUSECODE.FLX container.
Current validated layout:
- 0x80-byte header area
- little-endian entry count at file offset 0x54
- entry table begins at 0x80
- each entry is 8 bytes: <u32 data_offset, u32 declared_size>
The exact semantics of the payload records are still under RE, so the extractor dumps
all non-zero entries and emits human-readable sidecars (.strings.txt and index files)
to support the next decoding pass.
"""
from __future__ import annotations
import argparse
import hashlib
import json
import pathlib
import struct
from dataclasses import asdict, dataclass
DEFAULT_INPUT = pathlib.Path(r"k:\ghidra\Crusader_Decomp\USECODE\EUSECODE.FLX")
DEFAULT_OUTPUT = pathlib.Path(r"k:\ghidra\Crusader_Decomp\USECODE\EUSECODE_extracted")
@dataclass(frozen=True)
class CandidateEntry:
table_offset: int
data_offset: int
declared_size: int
@dataclass
class ExtractedChunk:
index: int
table_offset: int
object_index: int | None
data_offset: int
declared_size: int
next_offset: int | None
extracted_size: int
overlap_with_next: bool
text_like: bool
printable_ratio: float
zero_ratio: float
preview: str
raw_path: str
strings_path: str
text_path: str | None
primary_label: str | None
field_names: list[str]
field_tags: list[str]
class_id: int | None = None
class_name_hint: str | None = None
raw_code_base_u32: int | None = None
code_base_minus_one: int | None = None
conservative_event_count: int | None = None
event_table_end: int | None = None
class_parse_status: str | None = None
@dataclass(frozen=True)
class ClassEventRow:
entry_index: int
object_index: int
class_id: int
class_name_hint: str
slot: int
event_name_hint: str | None
raw_event_entry_word: int
raw_code_offset: int
derived_body_start: int | None
derived_body_end: int | None
derived_body_length: int | None
@dataclass(frozen=True)
class FlxTable:
entry_count: int
table_offset: int
table_end: int
entries: list[CandidateEntry]
@dataclass(frozen=True)
class FamilyArtifactSpec:
output_stem: str
title: str
labels: tuple[str, ...]
@dataclass(frozen=True)
class RepeatedFamilyRowExpectation:
class_name: str
slot: int
raw_event_entry_word: int
raw_code_offset: int
derived_body_start: int
derived_body_end: int
derived_body_length: int
repeated_template_status: str
def read_u32_le(data: bytes, offset: int) -> int:
return struct.unpack_from("<I", data, offset)[0]
def read_u16_le(data: bytes, offset: int) -> int:
return struct.unpack_from("<H", data, offset)[0]
def ascii_preview(data: bytes, limit: int = 64) -> str:
preview = []
for byte in data[:limit]:
if 0x20 <= byte <= 0x7E:
preview.append(chr(byte))
else:
preview.append(".")
return "".join(preview)
def printable_ratio(data: bytes) -> float:
if not data:
return 0.0
printable = sum(1 for byte in data if byte in (0x09, 0x0A, 0x0D) or 0x20 <= byte <= 0x7E)
return printable / len(data)
def zero_ratio(data: bytes) -> float:
if not data:
return 0.0
return data.count(0) / len(data)
def iter_printable_runs(data: bytes, min_len: int = 4) -> list[str]:
runs: list[str] = []
current = bytearray()
for byte in data:
if byte in (0x09, 0x0A, 0x0D) or 0x20 <= byte <= 0x7E:
current.append(byte)
continue
if len(current) >= min_len:
runs.append(current.decode("latin-1"))
current.clear()
if len(current) >= min_len:
runs.append(current.decode("latin-1"))
return runs
def summarize_descriptor(strings: list[str]) -> tuple[str | None, list[str]]:
label_counts: dict[str, int] = {}
field_names: list[str] = []
seen_fields: set[str] = set()
for value in strings:
if value.isupper() and any(ch.isalpha() for ch in value):
label_counts[value] = label_counts.get(value, 0) + 1
continue
if value and value[0].islower() and value.replace("_", "").isalnum() and value not in seen_fields:
seen_fields.add(value)
field_names.append(value)
primary_label = None
if label_counts:
primary_label = sorted(label_counts.items(), key=lambda item: (-item[1], item[0]))[0][0]
return primary_label, field_names
def extract_field_tag_records(data: bytes, field_names: list[str]) -> list[str]:
tags: list[str] = []
seen: set[str] = set()
for field_name in field_names:
needle = field_name.encode("latin-1")
start = 0
while True:
pos = data.find(needle, start)
if pos < 3:
break
tag = f"{data[pos - 3]:02X}:{data[pos - 2]:02X}{data[pos - 1]:02X}->{field_name}"
if tag not in seen:
seen.add(tag)
tags.append(tag)
start = pos + 1
tags.sort()
return tags
def has_referent_field(chunk: ExtractedChunk) -> bool:
if "referent" in chunk.field_names:
return True
return any(tag.endswith("->referent") for tag in chunk.field_tags)
def get_event_evidence(chunk: ExtractedChunk) -> list[str]:
evidence: list[str] = []
seen: set[str] = set()
for field_name in chunk.field_names:
if "event" not in field_name.lower():
continue
marker = f"field:{field_name}"
if marker not in seen:
seen.add(marker)
evidence.append(marker)
for field_tag in chunk.field_tags:
if "->event" not in field_tag.lower():
continue
marker = f"tag:{field_tag}"
if marker not in seen:
seen.add(marker)
evidence.append(marker)
return evidence
def chunk_role(chunk: ExtractedChunk) -> str:
if chunk.primary_label in {"JELYHACK", "JELYH2"}:
return "referent-anchor"
if get_event_evidence(chunk):
return "event-bearing"
if has_referent_field(chunk):
return "referent-neighbor"
return "neighbor"
def has_event_trigger_field(chunk: ExtractedChunk) -> bool:
if any("eventtrigger" == field_name.lower() for field_name in chunk.field_names):
return True
return any("->eventtrigger" in field_tag.lower() for field_tag in chunk.field_tags)
def event_tag_kind(chunk: ExtractedChunk) -> str:
if any("->eventtrigger" in field_tag.lower() for field_tag in chunk.field_tags):
return "eventTrigger"
if any(field_tag.lower().endswith("->event") for field_tag in chunk.field_tags):
return "event"
return ""
def classify_event_family(chunk: ExtractedChunk) -> str:
if event_tag_kind(chunk) == "eventTrigger":
return "callback-eventtrigger"
if event_tag_kind(chunk) != "event":
return ""
if chunk.primary_label == "EVENT":
return "event-hub"
if chunk.primary_label and chunk.primary_label.endswith("_BOOT"):
return "boot-event-core"
if chunk.field_names == ["referent", "event"]:
return "minimal-event-core"
if any(name in chunk.field_names for name in ("flame", "flame2", "fire", "fire2", "steam", "steam2")):
return "environmental-event"
if "typeNpc" in chunk.field_names:
return "npc-trigger"
return "specialized-event"
def readable_role(chunk: ExtractedChunk) -> str:
family = classify_event_family(chunk)
if family == "event-hub":
return "hub"
if chunk_role(chunk) == "referent-anchor":
return "anchor"
if event_tag_kind(chunk) == "event":
return "attach"
if event_tag_kind(chunk) == "eventTrigger":
return "callback"
if has_referent_field(chunk):
return "neighbor"
return "data"
def clean_token(value: str) -> str:
return " ".join(value.split())
def readable_subject(chunk: ExtractedChunk) -> str:
label = clean_token(chunk.primary_label or f"chunk_{chunk.index}")
fields = ",".join(clean_token(field_name) for field_name in chunk.field_names) or "?"
return f"{label}({fields})"
def readable_signature(chunk: ExtractedChunk) -> str:
return f"{readable_role(chunk)} {readable_subject(chunk)}"
VERIFIED_VM_IR_ROWS: tuple[dict[str, str], ...] = (
{
"stage_address": "000d:0988",
"ir_name": "APPEND_UNIQUE_INLINE",
"opcode_or_lane": "opcode 0x18 (implied sibling)",
"payload_shape": "inline referent-chain payload",
"evidence": "0x19/0x1a/0x1b compares in 000d:0988 imply 0x18 as append-unique inline sibling",
},
{
"stage_address": "000d:0988",
"ir_name": "APPEND_UNIQUE_INDIRECT",
"opcode_or_lane": "opcode 0x19",
"payload_shape": "indirect/string-like referent-chain payload",
"evidence": "[BP-0x32] == 0x19 path in 000d:0988 with indirect-mode flag",
},
{
"stage_address": "000d:0988",
"ir_name": "REMOVE_MATCHING_INDIRECT",
"opcode_or_lane": "opcode 0x1a",
"payload_shape": "indirect/string-like referent-chain payload",
"evidence": "[BP-0x32] == 0x1a path in 000d:0988 reaches entity_vm_referent_chain_remove_matching_from",
},
{
"stage_address": "000d:0988",
"ir_name": "REMOVE_MATCHING_INLINE",
"opcode_or_lane": "opcode 0x1b",
"payload_shape": "inline referent-chain payload",
"evidence": "[BP-0x32] == 0x1b path in 000d:0988 reaches entity_vm_referent_chain_remove_matching_from without indirect-mode flag",
},
{
"stage_address": "000d:177c",
"ir_name": "PUSH_FRAME_WORD_LITERAL",
"opcode_or_lane": "same FUN_000d_ebe3 sequencer family",
"payload_shape": "word scalar pushed to stream stack",
"evidence": "000d:177c subtracts 2 from [context+0xcc] and stores frame-local word before entity_vm_opcode_finish",
},
{
"stage_address": "000d:1acb",
"ir_name": "COMPARE_STREAM_DWORD_AND_PUSH_BOOL",
"opcode_or_lane": "same FUN_000d_ebe3 sequencer family",
"payload_shape": "stream dword pair consumed, predicate word emitted",
"evidence": "000d:1acb reads one 32-bit pair from stream, compares against AX:DX, pushes boolean word result",
},
{
"stage_address": "000d:208b",
"ir_name": "MATERIALIZE_OR_FORWARD_VALUE",
"opcode_or_lane": "slot-backed context consumer",
"payload_shape": "materialized slot value or forwarded object result",
"evidence": "000d:208b builds one VM context then forwards immediate or object-backed value through shared epilogue",
},
{
"stage_address": "000d:21ed",
"ir_name": "PREPEND_INLINE_PAYLOAD",
"opcode_or_lane": "inline payload substage",
"payload_shape": "caller-owned blob copied into context +0x102 buffer",
"evidence": "000d:21ed prepends caller bytes into backward-growing context buffer before metadata-driven follow-on work",
},
{
"stage_address": "000d:22bc",
"ir_name": "BUILD_ENTITY_LINK_MATRIX",
"opcode_or_lane": "inline payload follow-on stage",
"payload_shape": "two signed metadata bytes plus streamed entity/link ids",
"evidence": "000d:22bc consumes two signed bytes from +0xd6/+0xd8 and streamed words for repeated entity_link calls",
},
{
"stage_address": "000d:22bc",
"ir_name": "EMIT_OR_PUSHBACK_RESULT",
"opcode_or_lane": "inline payload follow-on stage",
"payload_shape": "stream writeback filter",
"evidence": "000d:23da..2421 pushes back only results without 0x0400 list flag before opcode finish",
},
{
"stage_address": "000d:2104",
"ir_name": "FINALIZE_MIXED_VALUE_TO_OUTPTR",
"opcode_or_lane": "same FUN_000d_ebe3 sequencer family",
"payload_shape": "mixed immediate/object scalar return",
"evidence": "000d:2104 writes either frame-local dword or object word with high word cleared to caller out-ptr",
},
)
VERIFIED_MASK_LADDER_ROWS: tuple[dict[str, str], ...] = (
{
"wrapper_address": "0005:27a4",
"mask_pair": "0x0001:0000",
"caller_anchor": "000c:a09e entity +0x5b bit 0x0004 branch",
"descriptor_bias": "generic active-event-biased lane; no direct class-id bridge",
},
{
"wrapper_address": "0005:2867",
"mask_pair": "0x0002:0001",
"caller_anchor": "stores result into entity field +0x39",
"descriptor_bias": "active-event ecosystem bias stronger than referent-anchor bias",
},
{
"wrapper_address": "0005:2ae2",
"mask_pair": "0x0004:0002",
"caller_anchor": "same verified local mask ladder around entity_vm_context_try_create_masked_for_entity",
"descriptor_bias": "active-event ecosystem bias stronger than referent-anchor bias",
},
{
"wrapper_address": "0005:2c9b",
"mask_pair": "0x0010:0004",
"caller_anchor": "direct callers at 0005:5946 and 0005:59e9",
"descriptor_bias": "active-event ecosystem bias stronger than referent-anchor bias",
},
{
"wrapper_address": "0005:2918",
"mask_pair": "0x0020:0005",
"caller_anchor": "+0x3c == 0x20b object lane, carries caller fields +0x36/+0x38",
"descriptor_bias": "candidate active-event field bridge; still not descriptor-specific",
},
{
"wrapper_address": "0005:2c06",
"mask_pair": "0x0200:0009",
"caller_anchor": "direct caller anchor at 0005:0292",
"descriptor_bias": "active-event ecosystem bias stronger than referent-anchor bias",
},
{
"wrapper_address": "0005:2c35",
"mask_pair": "0x0400:000a",
"caller_anchor": "xref-dark signed slot-offset wrapper forwarding context +0x34",
"descriptor_bias": "offset-specialized masked context creation; descriptor family unresolved",
},
{
"wrapper_address": "0005:2c68",
"mask_pair": "0x0800:000b",
"caller_anchor": "xref-dark signed slot-offset wrapper forwarding context +0x34",
"descriptor_bias": "offset-specialized masked context creation; descriptor family unresolved",
},
{
"wrapper_address": "0005:2cd2",
"mask_pair": "0x1000:000c",
"caller_anchor": "direct caller anchor at 0005:0fee",
"descriptor_bias": "active-event ecosystem bias stronger than referent-anchor bias",
},
{
"wrapper_address": "0004:f05c",
"mask_pair": "0x2000:0015",
"caller_anchor": "0004:f2b3 overlap/proximity branch with entity byte +0x32 state checks",
"descriptor_bias": "gameplay-state lane feeding runtime materialization, not direct descriptor switch",
},
{
"wrapper_address": "0005:2d01",
"mask_pair": "0x4000:000e",
"caller_anchor": "direct callers at 0007:814e and 0007:822e",
"descriptor_bias": "active-event ecosystem bias stronger than referent-anchor bias",
},
{
"wrapper_address": "0004:f033",
"mask_pair": "0x8000:0007",
"caller_anchor": "local wrapper seed recovered from direct instruction evidence",
"descriptor_bias": "gameplay-side materialization lane; still descriptor-agnostic",
},
{
"wrapper_address": "0005:2d30",
"mask_pair": "0x8000:000f",
"caller_anchor": "entity id/class flag gate plus 0x0f16 / 0x20f dispatch-entry emission path",
"descriptor_bias": "strongest current active-event ecosystem candidate in the ladder",
},
)
SCUMMVM_EVENT_NAME_HINTS: tuple[str, ...] = (
"look",
"use",
"anim",
"setActivity",
"cachein",
"hit",
"gotHit",
"hatch",
"schedule",
"release",
"equip",
"unequip",
"combine",
"func0D",
"calledFromAnim",
"enterFastArea",
"leaveFastArea",
"cast",
"justMoved",
"avatarStoleSomething",
"animGetHit",
"unhatch",
"func16",
"func17",
"func18",
"func19",
"func1A",
"func1B",
"func1C",
"func1D",
"func1E",
"func1F",
)
VERIFIED_REPEATED_TEMPLATE_FAMILIES: tuple[tuple[str, tuple[str, ...]], ...] = (
("referent-anchor-twin", ("JELYHACK", "JELYH2")),
("boot-event-core", ("AND_BOOT", "BRO_BOOT", "COR_BOOT", "REE_BOOT", "VAR_BOOT")),
("callback-eventtrigger", ("SURCAMNS", "SURCAMEW")),
("environmental-event", ("FLAMEBOX", "NOSTRIL", "STEAMBOX")),
)
FAMILY_ARTIFACT_SPECS: tuple[FamilyArtifactSpec, ...] = (
FamilyArtifactSpec(
output_stem="boot_family_decompile",
title="_BOOT Family Decompiled Event Sketches",
labels=("AND_BOOT", "BRO_BOOT", "COR_BOOT", "REE_BOOT", "VAR_BOOT"),
),
FamilyArtifactSpec(
output_stem="callback_family_decompile",
title="SURCAM Callback Family Decompiled Event Sketches",
labels=("SURCAMNS", "SURCAMEW"),
),
FamilyArtifactSpec(
output_stem="environmental_family_decompile",
title="Environmental Family Decompiled Event Sketches",
labels=("FLAMEBOX", "NOSTRIL", "STEAMBOX"),
),
)
VERIFIED_REPEATED_FAMILY_ROW_EXPECTATIONS: tuple[RepeatedFamilyRowExpectation, ...] = (
RepeatedFamilyRowExpectation("JELYHACK", 0x01, 0x002A, 0x00000001, 0x00D4, 0x00FE, 42, "referent-anchor-twin/shared-slot-0x01/same-length-template"),
RepeatedFamilyRowExpectation("JELYH2", 0x01, 0x002A, 0x00000001, 0x00D4, 0x00FE, 42, "referent-anchor-twin/shared-slot-0x01/same-length-template"),
RepeatedFamilyRowExpectation("AND_BOOT", 0x0A, 0x0253, 0x00000001, 0x00D4, 0x0327, 595, "boot-event-core/shared-slot-0x0A/shared-slot-template"),
RepeatedFamilyRowExpectation("AND_BOOT", 0x0F, 0x0237, 0x00000254, 0x0327, 0x055E, 567, "boot-event-core/shared-slot-0x0F/shared-slot-template"),
RepeatedFamilyRowExpectation("AND_BOOT", 0x10, 0x003B, 0x0000048B, 0x055E, 0x0599, 59, "boot-event-core/shared-slot-0x10/same-length-template"),
RepeatedFamilyRowExpectation("BRO_BOOT", 0x0A, 0x02D5, 0x00000001, 0x00D4, 0x03A9, 725, "boot-event-core/shared-slot-0x0A/shared-slot-template"),
RepeatedFamilyRowExpectation("BRO_BOOT", 0x0F, 0x024C, 0x000002D6, 0x03A9, 0x05F5, 588, "boot-event-core/shared-slot-0x0F/shared-slot-template"),
RepeatedFamilyRowExpectation("BRO_BOOT", 0x10, 0x003B, 0x00000522, 0x05F5, 0x0630, 59, "boot-event-core/shared-slot-0x10/same-length-template"),
RepeatedFamilyRowExpectation("COR_BOOT", 0x0A, 0x0227, 0x00000001, 0x00D4, 0x02FB, 551, "boot-event-core/shared-slot-0x0A/shared-slot-template"),
RepeatedFamilyRowExpectation("COR_BOOT", 0x0F, 0x0234, 0x00000228, 0x02FB, 0x052F, 564, "boot-event-core/shared-slot-0x0F/shared-slot-template"),
RepeatedFamilyRowExpectation("COR_BOOT", 0x10, 0x003B, 0x0000045C, 0x052F, 0x056A, 59, "boot-event-core/shared-slot-0x10/same-length-template"),
RepeatedFamilyRowExpectation("REE_BOOT", 0x0A, 0x034B, 0x00000001, 0x00D4, 0x041F, 843, "boot-event-core/shared-slot-0x0A/shared-slot-template"),
RepeatedFamilyRowExpectation("REE_BOOT", 0x0F, 0x025C, 0x0000034C, 0x041F, 0x067B, 604, "boot-event-core/shared-slot-0x0F/shared-slot-template"),
RepeatedFamilyRowExpectation("REE_BOOT", 0x10, 0x003B, 0x000005A8, 0x067B, 0x06B6, 59, "boot-event-core/shared-slot-0x10/same-length-template"),
RepeatedFamilyRowExpectation("VAR_BOOT", 0x0A, 0x029A, 0x00000001, 0x00D4, 0x036E, 666, "boot-event-core/shared-slot-0x0A/shared-slot-template"),
RepeatedFamilyRowExpectation("VAR_BOOT", 0x0F, 0x0244, 0x0000029B, 0x036E, 0x05B2, 580, "boot-event-core/shared-slot-0x0F/shared-slot-template"),
RepeatedFamilyRowExpectation("VAR_BOOT", 0x10, 0x003B, 0x000004DF, 0x05B2, 0x05ED, 59, "boot-event-core/shared-slot-0x10/same-length-template"),
RepeatedFamilyRowExpectation("SURCAMNS", 0x01, 0x0051, 0x000000D2, 0x01B7, 0x0208, 81, "callback-eventtrigger/shared-slot-0x01/shared-slot-template"),
RepeatedFamilyRowExpectation("SURCAMNS", 0x0A, 0x00D1, 0x00000001, 0x00E6, 0x01B7, 209, "callback-eventtrigger/shared-slot-0x0A/same-length-template"),
RepeatedFamilyRowExpectation("SURCAMNS", 0x20, 0x02BA, 0x00000123, 0x0208, 0x04C2, 698, "callback-eventtrigger/shared-slot-0x20/same-length-template"),
RepeatedFamilyRowExpectation("SURCAMNS", 0x21, 0x0709, 0x000003DD, 0x04C2, 0x0BCB, 1801, "callback-eventtrigger/shared-slot-0x21/shared-slot-template"),
RepeatedFamilyRowExpectation("SURCAMNS", 0x22, 0x01A3, 0x00000AE6, 0x0BCB, 0x0D6E, 419, "callback-eventtrigger/shared-slot-0x22/same-length-template"),
RepeatedFamilyRowExpectation("SURCAMEW", 0x01, 0x00F7, 0x000000D2, 0x01B7, 0x02AE, 247, "callback-eventtrigger/shared-slot-0x01/shared-slot-template"),
RepeatedFamilyRowExpectation("SURCAMEW", 0x0A, 0x00D1, 0x00000001, 0x00E6, 0x01B7, 209, "callback-eventtrigger/shared-slot-0x0A/same-length-template"),
RepeatedFamilyRowExpectation("SURCAMEW", 0x20, 0x02BA, 0x000001C9, 0x02AE, 0x0568, 698, "callback-eventtrigger/shared-slot-0x20/same-length-template"),
RepeatedFamilyRowExpectation("SURCAMEW", 0x21, 0x0655, 0x00000483, 0x0568, 0x0BBD, 1621, "callback-eventtrigger/shared-slot-0x21/shared-slot-template"),
RepeatedFamilyRowExpectation("SURCAMEW", 0x22, 0x01A3, 0x00000AD8, 0x0BBD, 0x0D60, 419, "callback-eventtrigger/shared-slot-0x22/same-length-template"),
RepeatedFamilyRowExpectation("FLAMEBOX", 0x0A, 0x026A, 0x00000001, 0x00E0, 0x034A, 618, "environmental-event/shared-slot-0x0A/shared-slot-template"),
RepeatedFamilyRowExpectation("FLAMEBOX", 0x20, 0x01AC, 0x0000026B, 0x034A, 0x04F6, 428, "environmental-event/shared-slot-0x20/shared-slot-template"),
RepeatedFamilyRowExpectation("FLAMEBOX", 0x21, 0x029A, 0x00000417, 0x04F6, 0x0790, 666, "environmental-event/shared-slot-0x21/shared-slot-template"),
RepeatedFamilyRowExpectation("NOSTRIL", 0x0A, 0x00C0, 0x00000001, 0x00E0, 0x01A0, 192, "environmental-event/shared-slot-0x0A/shared-slot-template"),
RepeatedFamilyRowExpectation("NOSTRIL", 0x20, 0x0129, 0x000000C1, 0x01A0, 0x02C9, 297, "environmental-event/shared-slot-0x20/shared-slot-template"),
RepeatedFamilyRowExpectation("NOSTRIL", 0x21, 0x01BE, 0x000001EA, 0x02C9, 0x0487, 446, "environmental-event/shared-slot-0x21/shared-slot-template"),
RepeatedFamilyRowExpectation("STEAMBOX", 0x0A, 0x0266, 0x00000001, 0x00E0, 0x0346, 614, "environmental-event/shared-slot-0x0A/shared-slot-template"),
RepeatedFamilyRowExpectation("STEAMBOX", 0x20, 0x01F6, 0x00000267, 0x0346, 0x053C, 502, "environmental-event/shared-slot-0x20/shared-slot-template"),
RepeatedFamilyRowExpectation("STEAMBOX", 0x21, 0x02A7, 0x0000045D, 0x053C, 0x07E3, 679, "environmental-event/shared-slot-0x21/shared-slot-template"),
)
def scummvm_event_name_hint(slot: int) -> str | None:
if 0 <= slot < len(SCUMMVM_EVENT_NAME_HINTS):
return SCUMMVM_EVENT_NAME_HINTS[slot]
return None
def object_index_from_table_offset(table_offset: int) -> int | None:
if table_offset < 0x80:
return None
relative = table_offset - 0x80
if relative % 8 != 0:
return None
return relative // 8
def decode_name_table_entry(name_table_data: bytes, class_id: int) -> str | None:
if class_id < 0:
return None
name_offset = 4 + 13 * class_id
if name_offset + 13 > len(name_table_data):
return None
raw_name = name_table_data[name_offset:name_offset + 13]
raw_name = raw_name.split(b"\x00", 1)[0].rstrip(b"\x00 ")
if not raw_name:
return None
return raw_name.decode("latin-1", errors="replace")
def annotate_class_layout(chunks: list[ExtractedChunk]) -> None:
name_table_data: bytes | None = None
for chunk in chunks:
chunk.object_index = object_index_from_table_offset(chunk.table_offset)
if chunk.object_index == 1:
name_table_data = pathlib.Path(chunk.raw_path).read_bytes()
chunk.class_parse_status = "name-table"
for chunk in chunks:
if chunk.object_index is None:
chunk.class_parse_status = chunk.class_parse_status or "unaligned-table-offset"
continue
if chunk.object_index < 2:
chunk.class_parse_status = chunk.class_parse_status or "non-class-object"
continue
chunk.class_id = chunk.object_index - 2
if name_table_data is not None:
chunk.class_name_hint = decode_name_table_entry(name_table_data, chunk.class_id)
raw_data = pathlib.Path(chunk.raw_path).read_bytes()
if len(raw_data) < 20:
chunk.class_parse_status = "too-small-for-class-header"
continue
raw_code_base_u32 = read_u32_le(raw_data, 8)
chunk.raw_code_base_u32 = raw_code_base_u32
if raw_code_base_u32 > 0:
chunk.code_base_minus_one = raw_code_base_u32 - 1
event_region = raw_code_base_u32 - 20
if event_region < 0:
chunk.class_parse_status = "header-before-event-table"
continue
if event_region % 6 != 0:
chunk.class_parse_status = "event-region-not-divisible-by-6"
continue
event_count = event_region // 6
event_table_end = 20 + event_count * 6
if event_table_end > len(raw_data):
chunk.class_parse_status = "event-table-past-object-end"
continue
chunk.conservative_event_count = event_count
chunk.event_table_end = event_table_end
chunk.class_parse_status = "parsed-class-layout"
def derive_class_event_rows(chunk: ExtractedChunk, raw_data: bytes) -> list[ClassEventRow]:
if chunk.class_parse_status != "parsed-class-layout":
return []
if chunk.object_index is None or chunk.class_id is None or chunk.conservative_event_count is None:
return []
provisional_rows: list[tuple[int, int, int]] = []
for slot in range(chunk.conservative_event_count):
entry_offset = 20 + 6 * slot
raw_word = read_u16_le(raw_data, entry_offset)
raw_code_offset = read_u32_le(raw_data, entry_offset + 2)
provisional_rows.append((slot, raw_word, raw_code_offset))
non_zero_offsets = sorted(
{
raw_code_offset
for _, _, raw_code_offset in provisional_rows
if raw_code_offset != 0
}
)
rows: list[ClassEventRow] = []
for slot, raw_word, raw_code_offset in provisional_rows:
derived_body_start: int | None = None
derived_body_end: int | None = None
derived_body_length: int | None = None
if raw_code_offset != 0 and chunk.code_base_minus_one is not None:
body_start = chunk.code_base_minus_one + raw_code_offset
next_offsets = [offset for offset in non_zero_offsets if offset > raw_code_offset]
body_end = chunk.code_base_minus_one + next_offsets[0] if next_offsets else len(raw_data)
if 0 <= body_start <= body_end <= len(raw_data):
derived_body_start = body_start
derived_body_end = body_end
derived_body_length = body_end - body_start
rows.append(
ClassEventRow(
entry_index=chunk.index,
object_index=chunk.object_index,
class_id=chunk.class_id,
class_name_hint=chunk.class_name_hint or "",
slot=slot,
event_name_hint=scummvm_event_name_hint(slot),
raw_event_entry_word=raw_word,
raw_code_offset=raw_code_offset,
derived_body_start=derived_body_start,
derived_body_end=derived_body_end,
derived_body_length=derived_body_length,
)
)
return rows
def build_class_event_rows(
parsed_class_chunks: list[ExtractedChunk],
) -> tuple[list[ClassEventRow], dict[int, list[ClassEventRow]], dict[int, bytes]]:
all_rows: list[ClassEventRow] = []
rows_by_entry: dict[int, list[ClassEventRow]] = {}
raw_data_by_entry: dict[int, bytes] = {}
for chunk in parsed_class_chunks:
raw_data = pathlib.Path(chunk.raw_path).read_bytes()
raw_data_by_entry[chunk.index] = raw_data
rows = derive_class_event_rows(chunk, raw_data)
rows_by_entry[chunk.index] = rows
all_rows.extend(rows)
return all_rows, rows_by_entry, raw_data_by_entry
def build_repeated_template_status_map(
parsed_class_chunks: list[ExtractedChunk],
rows_by_entry: dict[int, list[ClassEventRow]],
raw_data_by_entry: dict[int, bytes],
) -> dict[tuple[int, int], str]:
status_by_row: dict[tuple[int, int], str] = {}
chunk_by_label = {
chunk.primary_label: chunk
for chunk in parsed_class_chunks
if chunk.primary_label
}
for family_name, labels in VERIFIED_REPEATED_TEMPLATE_FAMILIES:
family_chunks = [chunk_by_label[label] for label in labels if label in chunk_by_label]
if len(family_chunks) < 2:
continue
rows_by_slot: dict[int, list[tuple[ExtractedChunk, ClassEventRow, bytes]]] = {}
for chunk in family_chunks:
raw_data = raw_data_by_entry.get(chunk.index)
if raw_data is None:
continue
for row in rows_by_entry.get(chunk.index, []):
if row.raw_code_offset == 0:
continue
if row.derived_body_start is None or row.derived_body_end is None:
continue
body = raw_data[row.derived_body_start:row.derived_body_end]
rows_by_slot.setdefault(row.slot, []).append((chunk, row, body))
for slot, slot_rows in rows_by_slot.items():
if len(slot_rows) < 2:
continue
lengths = {len(body) for _, _, body in slot_rows}
bodies = {body for _, _, body in slot_rows}
if len(bodies) == 1:
status_suffix = "exact-body-clone"
elif len(lengths) == 1:
status_suffix = "same-length-template"
else:
status_suffix = "shared-slot-template"
status = f"{family_name}/shared-slot-0x{slot:02X}/{status_suffix}"
for chunk, row, _ in slot_rows:
status_by_row[(chunk.index, row.slot)] = status
return status_by_row
def format_optional_hex(value: int | None, width: int = 0) -> str:
if value is None:
return ""
if width > 0:
return f"0x{value:0{width}X}"
return f"0x{value:X}"
def hex_edge(data: bytes, width: int = 8) -> str:
if not data:
return ""
return data[:width].hex()
def hex_tail(data: bytes, width: int = 8) -> str:
if not data:
return ""
return data[-width:].hex()
def write_family_decompile_artifact(
out_dir: pathlib.Path,
parsed_class_chunks: list[ExtractedChunk],
rows_by_entry: dict[int, list[ClassEventRow]],
raw_data_by_entry: dict[int, bytes],
repeated_status_by_row: dict[tuple[int, int], str],
spec: FamilyArtifactSpec,
) -> None:
family_labels = set(spec.labels)
family_chunks = [chunk for chunk in parsed_class_chunks if chunk.primary_label in family_labels]
if not family_chunks:
return
family_chunks.sort(key=lambda chunk: chunk.primary_label or "")
tsv_lines = [
"entry_index\tclass_id\tclass_name\tslot\tevent_name_hint\traw_event_entry_word\traw_code_offset\tderived_body_start\tderived_body_end\tderived_body_length\trepeated_template_status\tbody_sha1\tbody_prefix_hex\tbody_suffix_hex"
]
md_lines = [
f"# {spec.title}",
"",
"This is a reversible per-class rendering derived directly from `class_event_index.tsv` plus the raw extracted chunk bytes.",
"ScummVM event labels remain hints only; the authoritative data here is the slot id, raw row bytes, and derived body window.",
"",
]
for chunk in family_chunks:
rows = [row for row in rows_by_entry.get(chunk.index, []) if row.raw_code_offset != 0]
if not rows:
continue
raw_data = raw_data_by_entry[chunk.index]
md_lines.extend([
f"## {chunk.primary_label}",
"",
"```yaml",
"class:",
f" entry_index: 0x{chunk.index:03X}",
f" class_id: 0x{chunk.class_id:X}",
f" class_name: {chunk.primary_label}",
f" class_object_index: 0x{chunk.object_index:X}",
f" raw_code_base_u32: 0x{chunk.raw_code_base_u32:X}",
f" code_base_minus_one: 0x{chunk.code_base_minus_one:X}",
f" conservative_event_count: {chunk.conservative_event_count}",
" events:",
])
for row in rows:
body = b""
if row.derived_body_start is not None and row.derived_body_end is not None:
body = raw_data[row.derived_body_start:row.derived_body_end]
repeated_status = repeated_status_by_row.get((row.entry_index, row.slot), "")
body_sha1 = hashlib.sha1(body).hexdigest() if body else ""
md_lines.extend([
f" - slot: 0x{row.slot:02x}",
f" event_name_hint: {row.event_name_hint or ''}",
f" raw_event_entry_word: 0x{row.raw_event_entry_word:04x}",
f" raw_code_offset: 0x{row.raw_code_offset:08x}",
f" derived_body_start: {format_optional_hex(row.derived_body_start, 4).lower() or 'null'}",
f" derived_body_end: {format_optional_hex(row.derived_body_end, 4).lower() or 'null'}",
f" derived_body_length: {row.derived_body_length if row.derived_body_length is not None else 'null'}",
f" repeated_template_status: {repeated_status or 'unique-or-unclassified'}",
f" body_sha1: {body_sha1 or 'null'}",
f" body_prefix_hex: {hex_edge(body) or 'null'}",
f" body_suffix_hex: {hex_tail(body) or 'null'}",
])
tsv_lines.append(
"{entry_index}\t0x{class_id:X}\t{class_name}\t0x{slot:02X}\t{event_name_hint}\t0x{raw_event_entry_word:04X}\t0x{raw_code_offset:08X}\t{derived_body_start}\t{derived_body_end}\t{derived_body_length}\t{repeated_template_status}\t{body_sha1}\t{body_prefix_hex}\t{body_suffix_hex}".format(
entry_index=row.entry_index,
class_id=row.class_id,
class_name=chunk.primary_label or "",
slot=row.slot,
event_name_hint=row.event_name_hint or "",
raw_event_entry_word=row.raw_event_entry_word,
raw_code_offset=row.raw_code_offset,
derived_body_start=format_optional_hex(row.derived_body_start, 4),
derived_body_end=format_optional_hex(row.derived_body_end, 4),
derived_body_length=(row.derived_body_length if row.derived_body_length is not None else ""),
repeated_template_status=repeated_status,
body_sha1=body_sha1,
body_prefix_hex=hex_edge(body),
body_suffix_hex=hex_tail(body),
)
)
md_lines.extend([
"```",
"",
])
(out_dir / f"{spec.output_stem}.md").write_text("\n".join(md_lines), encoding="utf-8")
(out_dir / f"{spec.output_stem}.tsv").write_text("\n".join(tsv_lines) + "\n", encoding="utf-8")
def validate_verified_repeated_family_regressions(
parsed_class_chunks: list[ExtractedChunk],
rows_by_entry: dict[int, list[ClassEventRow]],
repeated_status_by_row: dict[tuple[int, int], str],
) -> list[str]:
chunk_by_label = {
chunk.primary_label: chunk
for chunk in parsed_class_chunks
if chunk.primary_label
}
expected_slots_by_class: dict[str, set[int]] = {}
for expectation in VERIFIED_REPEATED_FAMILY_ROW_EXPECTATIONS:
expected_slots_by_class.setdefault(expectation.class_name, set()).add(expectation.slot)
report_lines = [
"record_type\tclass_name\tslot\texpected\tactual\tstatus"
]
errors: list[str] = []
for class_name, expected_slots in sorted(expected_slots_by_class.items()):
chunk = chunk_by_label.get(class_name)
actual_slots: set[int] = set()
if chunk is not None:
actual_slots = {
row.slot
for row in rows_by_entry.get(chunk.index, [])
if row.raw_code_offset != 0
}
status = "ok" if actual_slots == expected_slots else "mismatch"
report_lines.append(
"slot-set\t{class_name}\t*\t{expected}\t{actual}\t{status}".format(
class_name=class_name,
expected=",".join(f"0x{slot:02X}" for slot in sorted(expected_slots)),
actual=",".join(f"0x{slot:02X}" for slot in sorted(actual_slots)),
status=status,
)
)
if status != "ok":
errors.append(
f"{class_name}: expected non-zero slots {sorted(expected_slots)}, found {sorted(actual_slots)}"
)
for expectation in VERIFIED_REPEATED_FAMILY_ROW_EXPECTATIONS:
chunk = chunk_by_label.get(expectation.class_name)
if chunk is None:
errors.append(f"missing repeated-family class {expectation.class_name}")
report_lines.append(
f"row\t{expectation.class_name}\t0x{expectation.slot:02X}\tpresent\tmissing-class\tmismatch"
)
continue
row = next(
(candidate for candidate in rows_by_entry.get(chunk.index, []) if candidate.slot == expectation.slot),
None,
)
if row is None:
errors.append(f"missing row {expectation.class_name} slot 0x{expectation.slot:02X}")
report_lines.append(
f"row\t{expectation.class_name}\t0x{expectation.slot:02X}\tpresent\tmissing-row\tmismatch"
)
continue
actual_values = (
row.raw_event_entry_word,
row.raw_code_offset,
row.derived_body_start,
row.derived_body_end,
row.derived_body_length,
repeated_status_by_row.get((row.entry_index, row.slot), ""),
)
expected_values = (
expectation.raw_event_entry_word,
expectation.raw_code_offset,
expectation.derived_body_start,
expectation.derived_body_end,
expectation.derived_body_length,
expectation.repeated_template_status,
)
status = "ok" if actual_values == expected_values else "mismatch"
report_lines.append(
"row\t{class_name}\t0x{slot:02X}\t{expected}\t{actual}\t{status}".format(
class_name=expectation.class_name,
slot=expectation.slot,
expected="|".join(
[
f"0x{expectation.raw_event_entry_word:04X}",
f"0x{expectation.raw_code_offset:08X}",
f"0x{expectation.derived_body_start:04X}",
f"0x{expectation.derived_body_end:04X}",
str(expectation.derived_body_length),
expectation.repeated_template_status,
]
),
actual="|".join(
[
f"0x{row.raw_event_entry_word:04X}",
f"0x{row.raw_code_offset:08X}",
format_optional_hex(row.derived_body_start, 4),
format_optional_hex(row.derived_body_end, 4),
str(row.derived_body_length if row.derived_body_length is not None else ""),
repeated_status_by_row.get((row.entry_index, row.slot), ""),
]
),
status=status,
)
)
if status != "ok":
errors.append(
"{class_name} slot 0x{slot:02X}: expected {expected}, found {actual}".format(
class_name=expectation.class_name,
slot=expectation.slot,
expected=expected_values,
actual=actual_values,
)
)
if errors:
raise ValueError(
"repeated-family regression mismatch:\n- " + "\n- ".join(errors)
)
return report_lines
def readable_neighbor_chunks(
center: ExtractedChunk,
chunk_by_index: dict[int, ExtractedChunk],
total_chunks: int,
window: int,
) -> list[ExtractedChunk]:
neighbors: list[ExtractedChunk] = []
for neighbor_index in range(max(0, center.index - window), min(total_chunks, center.index + window + 1)):
if neighbor_index == center.index:
continue
neighbor = chunk_by_index[neighbor_index]
if not (event_tag_kind(neighbor) or has_referent_field(neighbor) or neighbor.primary_label == center.primary_label):
continue
neighbors.append(neighbor)
return neighbors
def unique_preserve_order(values: list[str]) -> list[str]:
seen: set[str] = set()
ordered: list[str] = []
for value in values:
if not value or value in seen:
continue
seen.add(value)
ordered.append(value)
return ordered
def section_runtime_ops(section_name: str) -> list[str]:
if section_name == "Callback trigger lane":
return [
"MATERIALIZE_OR_FORWARD_VALUE",
"PUSH_FRAME_WORD_LITERAL",
"COMPARE_STREAM_DWORD_AND_PUSH_BOOL",
"FINALIZE_MIXED_VALUE_TO_OUTPTR",
]
return [
"APPEND_UNIQUE_INLINE",
"APPEND_UNIQUE_INDIRECT",
"REMOVE_MATCHING_INDIRECT",
"REMOVE_MATCHING_INLINE",
"MATERIALIZE_OR_FORWARD_VALUE",
"PREPEND_INLINE_PAYLOAD",
"BUILD_ENTITY_LINK_MATRIX",
"EMIT_OR_PUSHBACK_RESULT",
"FINALIZE_MIXED_VALUE_TO_OUTPTR",
]
def section_mask_pairs(section_name: str) -> list[str]:
if section_name == "JELYHACK anchor lane":
return ["indirect-only active-event-biased ladder; no direct anchor-specific mask proven"]
if section_name == "Callback trigger lane":
return ["no callback-specific mask pair proven; current ladder favors active event carriers"]
return [
"0x0001:0000",
"0x0002:0001",
"0x0004:0002",
"0x0010:0004",
"0x0020:0005",
"0x0200:0009",
"0x0400:000a",
"0x0800:000b",
"0x1000:000c",
"0x2000:0015",
"0x4000:000e",
"0x8000:0007",
"0x8000:000f",
]
def section_bridge_note(section_name: str) -> str:
if section_name == "JELYHACK anchor lane":
return "Referent-only anchors are now readable as payload owners, but the current mask ladder still correlates more strongly with active-event descriptors than with anchor-only rows."
if section_name == "Callback trigger lane":
return "Callback/eventTrigger descriptors are structurally distinct from the active event lane, so the runtime bridge is still generic slot-backed context flow rather than a callback-specific opcode family."
if section_name == "EVENT hub lane":
return "This is the strongest current descriptor-side bridge into the active event runtime lane: the neighborhood contains explicit event cores and matches the proven payload-chain plus link-matrix VM behavior."
if section_name == "Environmental event lane":
return "Environmental descriptors share the same active event field grammar, so they likely ride the same generic VM event lane even though no hazard-specific opcode split is proven yet."
return "Descriptor-side and runtime-side evidence align only at the conservative family level."
def write_runtime_bridge_reports(
out_dir: pathlib.Path,
descriptor_chunks: list[ExtractedChunk],
chunk_by_index: dict[int, ExtractedChunk],
total_chunks: int,
) -> None:
vm_ir_lines = [
"stage_address\tir_name\topcode_or_lane\tpayload_shape\tevidence"
]
for row in VERIFIED_VM_IR_ROWS:
vm_ir_lines.append(
"{stage_address}\t{ir_name}\t{opcode_or_lane}\t{payload_shape}\t{evidence}".format(**row)
)
(out_dir / "runtime_vm_ir.tsv").write_text("\n".join(vm_ir_lines) + "\n", encoding="utf-8")
mask_lines = [
"wrapper_address\tmask_pair\tcaller_anchor\tdescriptor_bias"
]
for row in VERIFIED_MASK_LADDER_ROWS:
mask_lines.append(
"{wrapper_address}\t{mask_pair}\t{caller_anchor}\t{descriptor_bias}".format(**row)
)
(out_dir / "vm_mask_ladder.tsv").write_text("\n".join(mask_lines) + "\n", encoding="utf-8")
focus_sets = [
("jelyhack_anchor_attachment", "JELYHACK anchor lane", {"JELYHACK", "JELYH2"}, 8),
("event_hub_cluster", "EVENT hub lane", {"EVENT", "COR_BOOT", "NPCTRIG"}, 5),
("environmental_event_cluster", "Environmental event lane", {"FLAMEBOX", "NOSTRIL", "STEAMBOX"}, 5),
("callback_trigger_cluster", "Callback trigger lane", {"SURCAMNS", "SURCAMEW"}, 5),
]
script_tsv_lines = [
"template_id\tsection\tcenter_index\tcenter_label\tattach_labels\tcallback_labels\tneighbor_labels\tevent_families\truntime_ops\tmask_pairs\towner_source\tmirror_write\tselector_status\tbridge_note"
]
script_md_lines = [
"# Readable Script IR",
"",
"This report joins descriptor neighborhoods to the verified 000d VM/runtime lane.",
"It stays conservative: opcode-family and mask-family evidence is carried forward only where the binary proves it.",
"",
"## Verified Runtime Lane",
"",
"- Owner path: `entity_vm_runtime_init_from_path_if_configured -> entity_vm_runtime_create -> entity_vm_runtime_owner_resource_create`",
"- Slot source: `(+0x10/+0x12) + 0x0d*slot + 4` inside the runtime owner/resource object",
"- Context seed: `entity_vm_context_create_from_slot_index` copies that source into `+0xd6/+0xd8` and mirrors it to `0x39ca[slot]`",
"- Selector status: `0x19/0x1a/0x1b` are proven inside `000d:0988`; `0x18` is still implied, and the upstream seed into `[BP-0x32]` remains unresolved",
"",
"## Verified VM IR Operators",
"",
"| Stage | IR | Opcode / Lane | Payload Shape |",
"|---|---|---|---|",
]
for row in VERIFIED_VM_IR_ROWS:
script_md_lines.append(
"| {stage_address} | {ir_name} | {opcode_or_lane} | {payload_shape} |".format(**row)
)
script_md_lines.extend([
"",
"## Verified Mask Ladder",
"",
"| Wrapper | Mask | Caller Anchor | Descriptor Bias |",
"|---|---|---|---|",
])
for row in VERIFIED_MASK_LADDER_ROWS:
script_md_lines.append(
"| {wrapper_address} | {mask_pair} | {caller_anchor} | {descriptor_bias} |".format(**row)
)
owner_source = (
"000d:44df -> 000d:4c99 -> 000d:7000 -> (+0x10/+0x12) + 0x0d*slot + 4"
)
mirror_write = "entity_vm_context_create_from_slot_index writes the same source pair to 0x39ca[context_slot]"
selector_status = "0x19/0x1a/0x1b proven in 000d:0988; 0x18 implied; upstream [BP-0x32] seed unresolved"
for template_id, section_name, center_labels, window in focus_sets:
centers = [chunk for chunk in descriptor_chunks if chunk.primary_label in center_labels]
if not centers:
continue
script_md_lines.extend(["", f"## {section_name}", ""])
for center in centers:
neighbors = readable_neighbor_chunks(center, chunk_by_index, total_chunks, window)
attach_labels = unique_preserve_order([
clean_token(neighbor.primary_label or "")
for neighbor in neighbors
if event_tag_kind(neighbor) == "event"
])
callback_labels = unique_preserve_order([
clean_token(neighbor.primary_label or "")
for neighbor in neighbors
if event_tag_kind(neighbor) == "eventTrigger"
])
neighbor_labels = unique_preserve_order([
clean_token(neighbor.primary_label or "")
for neighbor in neighbors
if has_referent_field(neighbor) and event_tag_kind(neighbor) == ""
])
family_labels = unique_preserve_order([
classify_event_family(neighbor)
for neighbor in neighbors
])
runtime_ops = section_runtime_ops(section_name)
mask_pairs = section_mask_pairs(section_name)
bridge_note = section_bridge_note(section_name)
script_tsv_lines.append(
"{template_id}\t{section}\t{center_index}\t{center_label}\t{attach_labels}\t{callback_labels}\t{neighbor_labels}\t{event_families}\t{runtime_ops}\t{mask_pairs}\t{owner_source}\t{mirror_write}\t{selector_status}\t{bridge_note}".format(
template_id=template_id,
section=section_name,
center_index=center.index,
center_label=clean_token(center.primary_label or ""),
attach_labels=",".join(attach_labels),
callback_labels=",".join(callback_labels),
neighbor_labels=",".join(neighbor_labels),
event_families=",".join(family_labels),
runtime_ops=",".join(runtime_ops),
mask_pairs=",".join(mask_pairs),
owner_source=owner_source,
mirror_write=mirror_write,
selector_status=selector_status,
bridge_note=bridge_note,
)
)
script_md_lines.append(f"### {center.index}: {center.primary_label}")
script_md_lines.append("")
script_md_lines.append(f"Descriptor focus: `{readable_signature(center)}`")
script_md_lines.append("")
script_md_lines.append("Descriptor-side attachments:")
script_md_lines.append(f"- Active event neighbors: {', '.join(attach_labels) or 'none proven in window'}")
script_md_lines.append(f"- Callback neighbors: {', '.join(callback_labels) or 'none proven in window'}")
script_md_lines.append(f"- Referent-side neighbors: {', '.join(neighbor_labels) or 'none proven in window'}")
script_md_lines.append(f"- Event families present: {', '.join(family_labels) or 'none'}")
script_md_lines.append("")
script_md_lines.append("Runtime bridge:")
script_md_lines.append(f"- Runtime ops: {', '.join(runtime_ops)}")
script_md_lines.append(f"- Mask pairs: {', '.join(mask_pairs)}")
script_md_lines.append(f"- Owner source: {owner_source}")
script_md_lines.append(f"- Mirror write: {mirror_write}")
script_md_lines.append(f"- Selector status: {selector_status}")
script_md_lines.append(f"- Interpretation: {bridge_note}")
script_md_lines.append("")
script_md_lines.append("```text")
script_md_lines.append(readable_signature(center))
for label in attach_labels:
script_md_lines.append(f"attach {label}(...) # active event-bearing neighbor")
for label in callback_labels:
script_md_lines.append(f"callback {label}(...) # eventTrigger-bearing neighbor")
for label in neighbor_labels:
script_md_lines.append(f"near {label}(...) # referent-side local context")
script_md_lines.append("")
script_md_lines.append("vm_effect:")
for runtime_op in runtime_ops:
script_md_lines.append(f" {runtime_op}(...)" )
script_md_lines.append("```")
script_md_lines.append("")
(out_dir / "readable_script_ir.tsv").write_text("\n".join(script_tsv_lines) + "\n", encoding="utf-8")
(out_dir / "readable_script_ir.md").write_text("\n".join(script_md_lines), encoding="utf-8")
def chunk_bridge_family(chunk: ExtractedChunk) -> str:
event_family = classify_event_family(chunk)
if event_family:
return event_family
if chunk_role(chunk) == "referent-anchor":
return "referent-anchor"
return ""
def family_runtime_ops(family: str) -> list[str]:
if family == "callback-eventtrigger":
return [
"MATERIALIZE_OR_FORWARD_VALUE",
"PUSH_FRAME_WORD_LITERAL",
"COMPARE_STREAM_DWORD_AND_PUSH_BOOL",
"FINALIZE_MIXED_VALUE_TO_OUTPTR",
]
if family == "referent-anchor":
return [
"APPEND_UNIQUE_INLINE",
"APPEND_UNIQUE_INDIRECT",
"REMOVE_MATCHING_INDIRECT",
"REMOVE_MATCHING_INLINE",
"MATERIALIZE_OR_FORWARD_VALUE",
]
return [
"APPEND_UNIQUE_INLINE",
"APPEND_UNIQUE_INDIRECT",
"REMOVE_MATCHING_INDIRECT",
"REMOVE_MATCHING_INLINE",
"MATERIALIZE_OR_FORWARD_VALUE",
"PREPEND_INLINE_PAYLOAD",
"BUILD_ENTITY_LINK_MATRIX",
"EMIT_OR_PUSHBACK_RESULT",
"FINALIZE_MIXED_VALUE_TO_OUTPTR",
]
def family_mask_pairs(family: str) -> list[str]:
if family == "referent-anchor":
return ["anchor role uses referent registry and payload ownership; no anchor-specific slot mask proven"]
if family == "callback-eventtrigger":
return ["no callback-specific mask pair proven; verified ladder still favors active event carriers"]
return [
"0x0001:0000",
"0x0002:0001",
"0x0004:0002",
"0x0010:0004",
"0x0020:0005",
"0x0200:0009",
"0x0400:000a",
"0x0800:000b",
"0x1000:000c",
"0x2000:0015",
"0x4000:000e",
"0x8000:0007",
"0x8000:000f",
]
def family_bridge_metadata() -> tuple[dict[str, str], ...]:
return (
{
"lane_rank": "1",
"primary_runtime_lane": "active-event payload lane",
"descriptor_family": "event-hub",
"fit_strength": "strongest",
"confidence": "high",
"why": "Explicit 69:0A00 event tag plus the richest source/dest/door/link/time/counter payload shape; best current match for the VM payload-chain plus link-matrix lane.",
"exemplar": "EVENT",
},
{
"lane_rank": "2",
"primary_runtime_lane": "active-event payload lane",
"descriptor_family": "boot-event-core",
"fit_strength": "strong",
"confidence": "high",
"why": "All five _BOOT descriptors share one compact referent,event,counter,item schema and sit beside referent-heavy object islands that fit the same active-event runtime lane.",
"exemplar": "COR_BOOT",
},
{
"lane_rank": "3",
"primary_runtime_lane": "active-event payload lane",
"descriptor_family": "npc-trigger",
"fit_strength": "strong",
"confidence": "moderate-high",
"why": "NPCTRIG carries an explicit event field and sits in the same compact event-bearing core as EVENT and COR_BOOT, but its narrower field set makes it look more satellite than hub.",
"exemplar": "NPCTRIG",
},
{
"lane_rank": "4",
"primary_runtime_lane": "active-event payload lane",
"descriptor_family": "minimal-event-core",
"fit_strength": "moderate",
"confidence": "moderate",
"why": "SFXTRIG keeps the active event tag while stripping most side fields, so it still fits the live event lane but as a smaller attachment form rather than a full hub or boot core.",
"exemplar": "SFXTRIG",
},
{
"lane_rank": "5",
"primary_runtime_lane": "active-event payload lane",
"descriptor_family": "environmental-event",
"fit_strength": "moderate",
"confidence": "moderate",
"why": "FLAMEBOX, NOSTRIL, and STEAMBOX share the same active event grammar, but no hazard-specific opcode or mask split is proven yet beyond the generic active-event-biased ladder.",
"exemplar": "FLAMEBOX",
},
{
"lane_rank": "6",
"primary_runtime_lane": "referent-anchor / payload-owner lane",
"descriptor_family": "referent-anchor",
"fit_strength": "strong",
"confidence": "moderate-high",
"why": "JELYHACK and JELYH2 are still referent-only, but the VM referent registry and payload-chain machinery now make that a live anchor role rather than inert metadata.",
"exemplar": "JELYHACK",
},
{
"lane_rank": "7",
"primary_runtime_lane": "callback / attachment lane",
"descriptor_family": "callback-eventtrigger",
"fit_strength": "weak-moderate",
"confidence": "moderate",
"why": "SURCAMNS and SURCAMEW are structurally coherent callback holders with eventTrigger tags, but the current mask ladder and opcode evidence still align more strongly with active event carriers than callback-specific dispatch.",
"exemplar": "SURCAMNS",
},
)
def family_chunk_map(descriptor_chunks: list[ExtractedChunk]) -> dict[str, list[ExtractedChunk]]:
grouped: dict[str, list[ExtractedChunk]] = {}
for chunk in descriptor_chunks:
family = chunk_bridge_family(chunk)
if not family:
continue
grouped.setdefault(family, []).append(chunk)
return grouped
def choose_family_exemplar(family: str, chunks: list[ExtractedChunk], preferred_label: str) -> ExtractedChunk | None:
for chunk in chunks:
if chunk.primary_label == preferred_label:
return chunk
return chunks[0] if chunks else None
def family_script_block(exemplar: ExtractedChunk, family: str, labels: list[str]) -> list[str]:
lines = [readable_signature(exemplar)]
if family == "event-hub":
lines.extend([
"owner_slot = runtime_owner_table[slot]",
"chain = APPEND_UNIQUE_INLINE(...) or APPEND_UNIQUE_INDIRECT(...)",
"chain = REMOVE_MATCHING_INDIRECT(...) or REMOVE_MATCHING_INLINE(...)",
"payload = PREPEND_INLINE_PAYLOAD(...) when caller bytes are present",
"links = BUILD_ENTITY_LINK_MATRIX(shape_a, shape_b, entity_ids)",
"emit EVENT-style result through FINALIZE_MIXED_VALUE_TO_OUTPTR(...)",
])
elif family == "boot-event-core":
lines.extend([
"anchor referent/event/counter/item into one compact event core",
"materialize slot-backed value from runtime_owner_table[slot]",
"mutate referent payload chain via opcode_0x18_to_0x1b family",
"emit boot-style active event result",
])
elif family == "npc-trigger":
lines.extend([
"materialize slot-backed trigger payload",
"attach event plus item/item2/typeNpc side fields",
"emit NPC-trigger result through shared opcode epilogue",
])
elif family == "minimal-event-core":
lines.extend([
"bind referent to minimal event payload",
"reuse generic active-event mutation path without hub-style side fields",
])
elif family == "environmental-event":
lines.extend([
"bind referent plus event to hazard-specific side fields",
"reuse generic active-event lane; no hazard-specific opcode split proven",
])
elif family == "referent-anchor":
lines.extend([
"referent_id = registry anchor",
"payload_chain = mutable owner-side chain attached to the referent",
"neighboring event-bearing descriptor supplies live event semantics",
"likely attachments: REE_BOOT, SURCAMEW, SFXTRIG",
])
else:
lines.extend([
"callback-side attachment remains descriptor-visible",
"runtime bridge is still generic slot-backed context flow rather than callback-specific opcode dispatch",
])
return lines
def write_runtime_family_bridge_reports(out_dir: pathlib.Path, descriptor_chunks: list[ExtractedChunk]) -> None:
grouped = family_chunk_map(descriptor_chunks)
owner_source = "000d:44df -> 000d:4c99 -> 000d:7000 -> (+0x10/+0x12) + 0x0d*slot + 4"
loader_evidence = (
"0009:67b6/6916 walk helper-owned +0x10/+0x18 tables, format per-entry paths, and then open/read/close files"
)
selector_status = "0x19/0x1a/0x1b proven in 000d:0988; 0x18 implied; upstream [BP-0x32] seed unresolved"
tsv_lines = [
"lane_rank\tprimary_runtime_lane\tdescriptor_family\trepresentative_labels\tfit_strength\tconfidence\truntime_ops\tmask_pairs\towner_source\tloader_evidence\tselector_status\twhy"
]
md_lines = [
"# Runtime Descriptor Family Rankings",
"",
"This report ranks descriptor families against the currently verified 000d VM/runtime lanes.",
"It is intentionally conservative: it scores ecosystem-level fit, not a direct descriptor-id-to-opcode decode.",
"",
"## Owner Source",
"",
f"- Owner path: `{owner_source}`",
f"- Loader evidence: `{loader_evidence}`",
f"- Selector status: `{selector_status}`",
"",
"## Ranked Families",
"",
"| Rank | Runtime Lane | Descriptor Family | Labels | Fit | Confidence |",
"|---:|---|---|---|---|---|",
]
for metadata in family_bridge_metadata():
family = metadata["descriptor_family"]
family_chunks = grouped.get(family, [])
if not family_chunks:
continue
labels = unique_preserve_order([
clean_token(chunk.primary_label or "")
for chunk in family_chunks
if chunk.primary_label
])
exemplar = choose_family_exemplar(family, family_chunks, metadata["exemplar"])
if exemplar is None:
continue
runtime_ops = family_runtime_ops(family)
mask_pairs = family_mask_pairs(family)
tsv_lines.append(
"{lane_rank}\t{primary_runtime_lane}\t{descriptor_family}\t{representative_labels}\t{fit_strength}\t{confidence}\t{runtime_ops}\t{mask_pairs}\t{owner_source}\t{loader_evidence}\t{selector_status}\t{why}".format(
lane_rank=metadata["lane_rank"],
primary_runtime_lane=metadata["primary_runtime_lane"],
descriptor_family=family,
representative_labels=",".join(labels),
fit_strength=metadata["fit_strength"],
confidence=metadata["confidence"],
runtime_ops=",".join(runtime_ops),
mask_pairs=",".join(mask_pairs),
owner_source=owner_source,
loader_evidence=loader_evidence,
selector_status=selector_status,
why=metadata["why"],
)
)
md_lines.append(
"| {lane_rank} | {primary_runtime_lane} | {descriptor_family} | {labels} | {fit_strength} | {confidence} |".format(
lane_rank=metadata["lane_rank"],
primary_runtime_lane=metadata["primary_runtime_lane"],
descriptor_family=family,
labels=", ".join(labels),
fit_strength=metadata["fit_strength"],
confidence=metadata["confidence"],
)
)
md_lines.extend([
"",
f"## {metadata['lane_rank']}. {family}",
"",
f"- Runtime lane: {metadata['primary_runtime_lane']}",
f"- Labels: {', '.join(labels)}",
f"- Fit: {metadata['fit_strength']}",
f"- Confidence: {metadata['confidence']}",
f"- Why: {metadata['why']}",
f"- Runtime ops: {', '.join(runtime_ops)}",
f"- Mask pairs: {', '.join(mask_pairs)}",
"",
"```text",
])
md_lines.extend(family_script_block(exemplar, family, labels))
md_lines.extend([
"```",
"",
])
(out_dir / "runtime_descriptor_family_rankings.tsv").write_text("\n".join(tsv_lines) + "\n", encoding="utf-8")
(out_dir / "runtime_descriptor_family_rankings.md").write_text("\n".join(md_lines), encoding="utf-8")
def write_readable_template_reports(
out_dir: pathlib.Path,
descriptor_chunks: list[ExtractedChunk],
chunk_by_index: dict[int, ExtractedChunk],
total_chunks: int,
) -> None:
focus_sets = [
("JELYHACK anchor lane", {"JELYHACK", "JELYH2"}, 8),
("EVENT hub lane", {"EVENT", "COR_BOOT", "NPCTRIG"}, 5),
("Environmental event lane", {"FLAMEBOX", "NOSTRIL", "STEAMBOX"}, 5),
("Callback trigger lane", {"SURCAMNS", "SURCAMEW"}, 5),
]
tsv_lines = [
"section\tcenter_index\tcenter_label\trelation\tneighbor_index\tdistance\tneighbor_label\trole\tfamily\ttag_kind\tfield_names"
]
md_lines = [
"# EUSECODE Readable Descriptor Templates",
"",
"These are conservative descriptor-side pseudo-script sketches.",
"They reflect verified field grammar and local table neighborhoods, not a direct opcode dump.",
"",
]
for section_name, center_labels, window in focus_sets:
centers = [chunk for chunk in descriptor_chunks if chunk.primary_label in center_labels]
if not centers:
continue
md_lines.append(f"## {section_name}")
md_lines.append("")
for center in centers:
md_lines.append(f"### {center.index}: {center.primary_label}")
md_lines.append("")
md_lines.append("```text")
md_lines.append(readable_signature(center))
for neighbor_index in range(max(0, center.index - window), min(total_chunks, center.index + window + 1)):
if neighbor_index == center.index:
continue
neighbor = chunk_by_index[neighbor_index]
if not (event_tag_kind(neighbor) or has_referent_field(neighbor) or neighbor.primary_label in center_labels):
continue
distance = neighbor.index - center.index
relation = "near"
if event_tag_kind(neighbor) == "event":
relation = "attach"
elif event_tag_kind(neighbor) == "eventTrigger":
relation = "callback"
md_lines.append(
"{relation} {subject} # offset {distance:+d}{family_suffix}".format(
relation=relation,
subject=readable_subject(neighbor),
distance=distance,
family_suffix=(
f", family={classify_event_family(neighbor)}"
if classify_event_family(neighbor)
else ""
),
)
)
tsv_lines.append(
"{section}\t{center_index}\t{center_label}\t{relation}\t{neighbor_index}\t{distance:+d}\t{neighbor_label}\t{role}\t{family}\t{tag_kind}\t{field_names}".format(
section=section_name,
center_index=center.index,
center_label=clean_token(center.primary_label or ""),
relation=relation,
neighbor_index=neighbor.index,
distance=distance,
neighbor_label=clean_token(neighbor.primary_label or ""),
role=chunk_role(neighbor),
family=classify_event_family(neighbor),
tag_kind=event_tag_kind(neighbor),
field_names=",".join(clean_token(field_name) for field_name in neighbor.field_names),
)
)
md_lines.append("```")
md_lines.append("")
family_lines = [
"## Family Signatures",
"",
"| Family | Label | Signature |",
"|---|---|---|",
]
for chunk in sorted(descriptor_chunks, key=lambda value: (classify_event_family(value), value.index)):
family = classify_event_family(chunk)
if not family:
continue
family_lines.append(
"| {family} | {label} | {signature} |".format(
family=family,
label=clean_token(chunk.primary_label or ""),
signature=readable_signature(chunk).replace("|", "/"),
)
)
md_lines.extend(family_lines)
md_lines.append("")
(out_dir / "readable_descriptor_templates.md").write_text("\n".join(md_lines), encoding="utf-8")
(out_dir / "readable_descriptor_templates.tsv").write_text("\n".join(tsv_lines) + "\n", encoding="utf-8")
def header_u16_words(data: bytes, count: int = 16) -> list[str]:
limit = min(len(data) // 2, count)
return [f"0x{read_u16_le(data, index * 2):04X}" for index in range(limit)]
def header_u32_words(data: bytes, count: int = 8) -> list[str]:
limit = min(len(data) // 4, count)
return [f"0x{read_u32_le(data, index * 4):08X}" for index in range(limit)]
def interesting_printable_markers(data: bytes) -> list[str]:
markers: list[str] = []
seen: set[str] = set()
for run in iter_printable_runs(data, min_len=3):
if not any(token in run for token in ("wx[", "wt$[", "t$t=t@", "$Q", "?\n", "?\r")):
continue
if run not in seen:
seen.add(run)
markers.append(run)
return markers[:8]
def write_island_graph(
out_dir: pathlib.Path,
output_name: str,
title: str,
center_labels: set[str],
descriptor_chunks: list[ExtractedChunk],
chunk_by_index: dict[int, ExtractedChunk],
total_chunks: int,
window: int = 5,
) -> None:
centers = [chunk for chunk in descriptor_chunks if chunk.primary_label in center_labels]
if not centers:
return
island_indices = sorted(
{
neighbor_index
for center in centers
for neighbor_index in range(max(0, center.index - window), min(total_chunks, center.index + window + 1))
}
)
island_lines = [f"# {title}", "", "## Nodes", "", "| Index | Label | Role | Fields | Event Evidence |", "|---:|---|---|---|---|"]
for index in island_indices:
chunk = chunk_by_index[index]
island_lines.append(
"| {index} | {label} | {role} | {fields} | {evidence} |".format(
index=index,
label=chunk.primary_label or "",
role=chunk_role(chunk),
fields=",".join(chunk.field_names) or "-",
evidence=",".join(get_event_evidence(chunk)) or "-",
)
)
island_lines.extend(["", "## Edges", "", "| Source | Relation | Target | Evidence |", "|---|---|---|---|"])
for center in centers:
for neighbor_index in range(max(0, center.index - window), min(total_chunks, center.index + window + 1)):
if neighbor_index == center.index:
continue
neighbor = chunk_by_index[neighbor_index]
relation = f"table-neighbor({neighbor.index - center.index:+d})"
event_evidence = get_event_evidence(neighbor)
if event_evidence:
relation = f"possible-event-attachment({neighbor.index - center.index:+d})"
island_lines.append(
"| {source} ({source_index}) | {relation} | {target} ({target_index}) | {evidence} |".format(
source=center.primary_label,
source_index=center.index,
relation=relation,
target=neighbor.primary_label or "",
target_index=neighbor.index,
evidence=",".join(event_evidence) or "same local extraction neighborhood",
)
)
(out_dir / output_name).write_text("\n".join(island_lines) + "\n", encoding="utf-8")
def write_descriptor_compare(
out_dir: pathlib.Path,
output_name: str,
labels: set[str],
descriptor_chunks: list[ExtractedChunk],
) -> None:
compare_lines = [
"entry_index\tlabel\trole\tdata_offset\tdeclared_size\theader_u16\theader_u32\tprintable_markers\tfield_tags"
]
for chunk in descriptor_chunks:
if chunk.primary_label not in labels:
continue
raw_data = pathlib.Path(chunk.raw_path).read_bytes()
compare_lines.append(
"{index}\t{label}\t{role}\t0x{data_offset:X}\t0x{declared_size:X}\t{header_u16}\t{header_u32}\t{markers}\t{field_tags}".format(
index=chunk.index,
label=chunk.primary_label,
role=chunk_role(chunk),
data_offset=chunk.data_offset,
declared_size=chunk.declared_size,
header_u16=",".join(header_u16_words(raw_data)),
header_u32=",".join(header_u32_words(raw_data)),
markers="|".join(interesting_printable_markers(raw_data)),
field_tags=",".join(chunk.field_tags),
)
)
(out_dir / output_name).write_text("\n".join(compare_lines) + "\n", encoding="utf-8")
def write_event_family_reports(
out_dir: pathlib.Path,
descriptor_chunks: list[ExtractedChunk],
chunk_by_index: dict[int, ExtractedChunk],
total_chunks: int,
) -> None:
family_lines = [
"entry_index\tlabel\tfamily\ttag_kind\trole\tfield_count\tfield_names\tfield_tags\tdata_offset\tdeclared_size\tlocal_event_neighbors"
]
families: dict[str, list[ExtractedChunk]] = {}
for chunk in descriptor_chunks:
family = classify_event_family(chunk)
if not family:
continue
families.setdefault(family, []).append(chunk)
local_event_neighbors = 0
for neighbor_index in range(max(0, chunk.index - 5), min(total_chunks, chunk.index + 6)):
if neighbor_index == chunk.index:
continue
neighbor = chunk_by_index[neighbor_index]
if event_tag_kind(neighbor):
local_event_neighbors += 1
family_lines.append(
"{index}\t{label}\t{family}\t{tag_kind}\t{role}\t{field_count}\t{field_names}\t{field_tags}\t0x{data_offset:X}\t0x{declared_size:X}\t{local_event_neighbors}".format(
index=chunk.index,
label=chunk.primary_label or "",
family=family,
tag_kind=event_tag_kind(chunk),
role=chunk_role(chunk),
field_count=len(chunk.field_names),
field_names=",".join(chunk.field_names),
field_tags=",".join(chunk.field_tags),
data_offset=chunk.data_offset,
declared_size=chunk.declared_size,
local_event_neighbors=local_event_neighbors,
)
)
(out_dir / "event_family_index.tsv").write_text("\n".join(family_lines) + "\n", encoding="utf-8")
summary_lines = ["# Event Family Summary", ""]
family_order = [
"event-hub",
"boot-event-core",
"npc-trigger",
"minimal-event-core",
"environmental-event",
"specialized-event",
"callback-eventtrigger",
]
for family in family_order:
family_chunks = families.get(family, [])
if not family_chunks:
continue
summary_lines.append(f"## {family}")
summary_lines.append("")
summary_lines.append("| Index | Label | Tag Kind | Fields | Size | Local Event Neighbors |")
summary_lines.append("|---:|---|---|---|---:|---:|")
for chunk in sorted(family_chunks, key=lambda value: value.index):
local_event_neighbors = 0
for neighbor_index in range(max(0, chunk.index - 5), min(total_chunks, chunk.index + 6)):
if neighbor_index == chunk.index:
continue
neighbor = chunk_by_index[neighbor_index]
if event_tag_kind(neighbor):
local_event_neighbors += 1
summary_lines.append(
"| {index} | {label} | {tag_kind} | {fields} | 0x{declared_size:X} | {local_event_neighbors} |".format(
index=chunk.index,
label=chunk.primary_label or "",
tag_kind=event_tag_kind(chunk),
fields=",".join(chunk.field_names) or "-",
declared_size=chunk.declared_size,
local_event_neighbors=local_event_neighbors,
)
)
summary_lines.append("")
(out_dir / "event_family_summary.md").write_text("\n".join(summary_lines), encoding="utf-8")
def looks_text_like(data: bytes) -> bool:
if not data:
return False
ratio = printable_ratio(data)
if ratio < 0.80:
return False
if b"\r\n" in data or b"\n" in data:
return True
return zero_ratio(data) < 0.05
def parse_flx_table(data: bytes, table_offset: int = 0x80, count_offset: int = 0x54) -> FlxTable:
file_size = len(data)
entry_count = read_u32_le(data, count_offset)
table_end = table_offset + entry_count * 8
if table_end > file_size:
raise ValueError(
f"FLX table extends past EOF: entry_count={entry_count} table_end=0x{table_end:X} file_size=0x{file_size:X}"
)
entries: list[CandidateEntry] = []
for index in range(entry_count):
offset = table_offset + index * 8
data_offset = read_u32_le(data, offset)
declared_size = read_u32_le(data, offset + 4)
if data_offset == 0 and declared_size == 0:
continue
if data_offset <= 0 or data_offset > file_size:
continue
if declared_size <= 0:
continue
entries.append(CandidateEntry(offset, data_offset, declared_size))
return FlxTable(
entry_count=entry_count,
table_offset=table_offset,
table_end=table_end,
entries=entries,
)
def dump_chunk(
base_dir: pathlib.Path, chunk_name: str, data: bytes
) -> tuple[str, str, str | None, bool, float, float, str, str | None, list[str], list[str]]:
raw_path = base_dir / f"{chunk_name}.bin"
strings_path = base_dir / f"{chunk_name}.strings.txt"
text_path = base_dir / f"{chunk_name}.txt"
raw_path.write_bytes(data)
runs = iter_printable_runs(data)
strings_path.write_text("\n".join(runs) + ("\n" if runs else ""), encoding="utf-8")
primary_label, field_names = summarize_descriptor(runs)
field_tags = extract_field_tag_records(data, field_names)
text_like = looks_text_like(data)
actual_text_path: str | None = None
if text_like:
text_path.write_text(data.decode("latin-1", errors="replace"), encoding="utf-8")
actual_text_path = str(text_path)
return (
str(raw_path),
str(strings_path),
actual_text_path,
text_like,
printable_ratio(data),
zero_ratio(data),
ascii_preview(data),
primary_label,
field_names,
field_tags,
)
def extract_candidates(data: bytes, out_dir: pathlib.Path, entries: list[CandidateEntry]) -> list[ExtractedChunk]:
chunks_dir = out_dir / "chunks"
chunks_dir.mkdir(parents=True, exist_ok=True)
extracted: list[ExtractedChunk] = []
file_size = len(data)
sorted_entries = sorted(enumerate(entries), key=lambda item: (item[1].data_offset, item[0]))
next_by_original_index: dict[int, int | None] = {}
for position, (original_index, entry) in enumerate(sorted_entries):
next_offset = sorted_entries[position + 1][1].data_offset if position + 1 < len(sorted_entries) else None
next_by_original_index[original_index] = next_offset
for index, entry in enumerate(entries):
next_offset = next_by_original_index.get(index)
chunk_end = min(file_size, entry.data_offset + entry.declared_size)
chunk_data = data[entry.data_offset:chunk_end]
overlap = next_offset is not None and (entry.data_offset + entry.declared_size) > next_offset
chunk_name = (
f"chunk_{index:03d}_table_{entry.table_offset:04X}_off_{entry.data_offset:06X}_len_{entry.declared_size:06X}"
)
raw_path, strings_path, text_path, text_like, print_ratio, z_ratio, preview, primary_label, field_names, field_tags = dump_chunk(
chunks_dir, chunk_name, chunk_data
)
extracted.append(
ExtractedChunk(
index=index,
table_offset=entry.table_offset,
object_index=object_index_from_table_offset(entry.table_offset),
data_offset=entry.data_offset,
declared_size=entry.declared_size,
next_offset=next_offset,
extracted_size=len(chunk_data),
overlap_with_next=overlap,
text_like=text_like,
printable_ratio=round(print_ratio, 4),
zero_ratio=round(z_ratio, 4),
preview=preview,
raw_path=raw_path,
strings_path=strings_path,
text_path=text_path,
primary_label=primary_label,
field_names=field_names,
field_tags=field_tags,
)
)
annotate_class_layout(extracted)
return extracted
def write_summary(out_dir: pathlib.Path, input_path: pathlib.Path, data: bytes, entries: list[CandidateEntry], chunks: list[ExtractedChunk]) -> None:
summary = {
"input_path": str(input_path),
"file_size": len(data),
"header_preview_hex": data[:128].hex(),
"header_preview_ascii": ascii_preview(data[:128], 128),
"candidate_entries": [asdict(entry) for entry in entries],
"chunks": [asdict(chunk) for chunk in chunks],
}
(out_dir / "summary.json").write_text(json.dumps(summary, indent=2), encoding="utf-8")
index_lines = [
"entry_index\ttable_offset\tobject_index\tclass_id\tclass_name_hint\traw_code_base_u32\tcode_base_minus_one\tconservative_event_count\tevent_table_end\tclass_parse_status\tdata_offset\tdeclared_size\textracted_size\ttext_like\tprintable_ratio\tzero_ratio\toverlap_with_next\tprimary_label\tfield_names\tfield_tags\tpreview"
]
for chunk in chunks:
index_lines.append(
"{index}\t{table_offset}\t{object_index}\t{class_id}\t{class_name_hint}\t{raw_code_base_u32}\t{code_base_minus_one}\t{conservative_event_count}\t{event_table_end}\t{class_parse_status}\t{data_offset}\t{declared_size}\t{extracted_size}\t{text_like}\t{printable_ratio:.4f}\t{zero_ratio:.4f}\t{overlap}\t{primary_label}\t{field_names}\t{field_tags}\t{preview}".format(
index=chunk.index,
table_offset=(f"0x{chunk.table_offset:X}"),
object_index=(f"0x{chunk.object_index:X}" if chunk.object_index is not None else ""),
class_id=(f"0x{chunk.class_id:X}" if chunk.class_id is not None else ""),
class_name_hint=chunk.class_name_hint or "",
raw_code_base_u32=(f"0x{chunk.raw_code_base_u32:X}" if chunk.raw_code_base_u32 is not None else ""),
code_base_minus_one=(f"0x{chunk.code_base_minus_one:X}" if chunk.code_base_minus_one is not None else ""),
conservative_event_count=(chunk.conservative_event_count if chunk.conservative_event_count is not None else ""),
event_table_end=(f"0x{chunk.event_table_end:X}" if chunk.event_table_end is not None else ""),
class_parse_status=chunk.class_parse_status or "",
data_offset=f"0x{chunk.data_offset:X}",
declared_size=f"0x{chunk.declared_size:X}",
extracted_size=f"0x{chunk.extracted_size:X}",
text_like=int(chunk.text_like),
printable_ratio=chunk.printable_ratio,
zero_ratio=chunk.zero_ratio,
overlap=int(chunk.overlap_with_next),
primary_label=chunk.primary_label or "",
field_names=",".join(chunk.field_names),
field_tags=",".join(chunk.field_tags),
preview=chunk.preview.replace("\t", " "),
)
)
(out_dir / "entry_index.tsv").write_text("\n".join(index_lines) + "\n", encoding="utf-8")
descriptor_lines = [
"entry_index\tobject_index\tclass_id\tclass_name_hint\traw_code_base_u32\tcode_base_minus_one\tconservative_event_count\tevent_table_end\tclass_parse_status\tprimary_label\tfield_names\tfield_tags\tdata_offset\tdeclared_size"
]
descriptor_chunks = [chunk for chunk in chunks if chunk.primary_label or chunk.field_names]
for chunk in descriptor_chunks:
descriptor_lines.append(
"{index}\t{object_index}\t{class_id}\t{class_name_hint}\t{raw_code_base_u32}\t{code_base_minus_one}\t{conservative_event_count}\t{event_table_end}\t{class_parse_status}\t{primary_label}\t{field_names}\t{field_tags}\t0x{data_offset:X}\t0x{declared_size:X}".format(
index=chunk.index,
object_index=(f"0x{chunk.object_index:X}" if chunk.object_index is not None else ""),
class_id=(f"0x{chunk.class_id:X}" if chunk.class_id is not None else ""),
class_name_hint=chunk.class_name_hint or "",
raw_code_base_u32=(f"0x{chunk.raw_code_base_u32:X}" if chunk.raw_code_base_u32 is not None else ""),
code_base_minus_one=(f"0x{chunk.code_base_minus_one:X}" if chunk.code_base_minus_one is not None else ""),
conservative_event_count=(chunk.conservative_event_count if chunk.conservative_event_count is not None else ""),
event_table_end=(f"0x{chunk.event_table_end:X}" if chunk.event_table_end is not None else ""),
class_parse_status=chunk.class_parse_status or "",
primary_label=chunk.primary_label or "",
field_names=",".join(chunk.field_names),
field_tags=",".join(chunk.field_tags),
data_offset=chunk.data_offset,
declared_size=chunk.declared_size,
)
)
(out_dir / "descriptor_index.tsv").write_text("\n".join(descriptor_lines) + "\n", encoding="utf-8")
class_layout_lines = [
"entry_index\tobject_index\tclass_id\tclass_name_hint\traw_code_base_u32\tcode_base_minus_one\tconservative_event_count\tevent_table_end\tclass_parse_status\tdata_offset\tdeclared_size\tprimary_label"
]
parsed_class_chunks = [chunk for chunk in chunks if chunk.class_parse_status == "parsed-class-layout"]
class_event_rows, rows_by_entry, raw_data_by_entry = build_class_event_rows(parsed_class_chunks)
repeated_status_by_row = build_repeated_template_status_map(
parsed_class_chunks,
rows_by_entry,
raw_data_by_entry,
)
repeated_family_regression_lines = validate_verified_repeated_family_regressions(
parsed_class_chunks,
rows_by_entry,
repeated_status_by_row,
)
for chunk in parsed_class_chunks:
class_layout_lines.append(
"{index}\t0x{object_index:X}\t0x{class_id:X}\t{class_name_hint}\t0x{raw_code_base_u32:X}\t0x{code_base_minus_one:X}\t{conservative_event_count}\t0x{event_table_end:X}\t{class_parse_status}\t0x{data_offset:X}\t0x{declared_size:X}\t{primary_label}".format(
index=chunk.index,
object_index=chunk.object_index,
class_id=chunk.class_id,
class_name_hint=chunk.class_name_hint or "",
raw_code_base_u32=chunk.raw_code_base_u32,
code_base_minus_one=chunk.code_base_minus_one,
conservative_event_count=chunk.conservative_event_count,
event_table_end=chunk.event_table_end,
class_parse_status=chunk.class_parse_status,
data_offset=chunk.data_offset,
declared_size=chunk.declared_size,
primary_label=chunk.primary_label or "",
)
)
(out_dir / "class_layout_index.tsv").write_text("\n".join(class_layout_lines) + "\n", encoding="utf-8")
class_event_lines = [
"entry_index\tobject_index\tclass_id\tclass_name_hint\tslot\tevent_name_hint\traw_event_entry_word\traw_code_offset\tderived_body_start\tderived_body_end\tderived_body_length\trepeated_template_status"
]
for row in class_event_rows:
class_event_lines.append(
"{entry_index}\t0x{object_index:X}\t0x{class_id:X}\t{class_name_hint}\t0x{slot:02X}\t{event_name_hint}\t0x{raw_event_entry_word:04X}\t0x{raw_code_offset:08X}\t{derived_body_start}\t{derived_body_end}\t{derived_body_length}\t{repeated_template_status}".format(
entry_index=row.entry_index,
object_index=row.object_index,
class_id=row.class_id,
class_name_hint=row.class_name_hint,
slot=row.slot,
event_name_hint=row.event_name_hint or "",
raw_event_entry_word=row.raw_event_entry_word,
raw_code_offset=row.raw_code_offset,
derived_body_start=format_optional_hex(row.derived_body_start, 4),
derived_body_end=format_optional_hex(row.derived_body_end, 4),
derived_body_length=(row.derived_body_length if row.derived_body_length is not None else ""),
repeated_template_status=repeated_status_by_row.get((row.entry_index, row.slot), ""),
)
)
(out_dir / "class_event_index.tsv").write_text("\n".join(class_event_lines) + "\n", encoding="utf-8")
for family_artifact_spec in FAMILY_ARTIFACT_SPECS:
write_family_decompile_artifact(
out_dir,
parsed_class_chunks,
rows_by_entry,
raw_data_by_entry,
repeated_status_by_row,
family_artifact_spec,
)
(out_dir / "repeated_family_regressions.tsv").write_text(
"\n".join(repeated_family_regression_lines) + "\n",
encoding="utf-8",
)
neighborhood_lines = [
"center_index\tneighbor_index\tprimary_label\tfield_names\tfield_tags"
]
interesting = {"JELYHACK", "JELYH2", "NPCTRIG", "CRUZTRIG", "TRIGPAD", "SPECIAL", "EVENT", "SFXTRIG"}
interesting_indices = [chunk.index for chunk in chunks if chunk.primary_label in interesting]
seen_pairs: set[tuple[int, int]] = set()
chunk_by_index = {chunk.index: chunk for chunk in chunks}
for center_index in interesting_indices:
for neighbor_index in range(max(0, center_index - 4), min(len(chunks), center_index + 5)):
pair = (center_index, neighbor_index)
if pair in seen_pairs:
continue
seen_pairs.add(pair)
chunk = chunk_by_index[neighbor_index]
neighborhood_lines.append(
"{center_index}\t{neighbor_index}\t{primary_label}\t{field_names}\t{field_tags}".format(
center_index=center_index,
neighbor_index=neighbor_index,
primary_label=chunk.primary_label or "",
field_names=",".join(chunk.field_names),
field_tags=",".join(chunk.field_tags),
)
)
(out_dir / "descriptor_neighborhoods.tsv").write_text("\n".join(neighborhood_lines) + "\n", encoding="utf-8")
anchor_graph_lines = [
"anchor_index\tanchor_label\tanchor_fields\tneighbor_index\tdistance\tneighbor_label\tneighbor_fields\tneighbor_role\tevent_evidence"
]
for anchor in descriptor_chunks:
if not anchor.primary_label or not has_referent_field(anchor):
continue
for neighbor_index in range(max(0, anchor.index - 8), min(len(chunks), anchor.index + 9)):
if neighbor_index == anchor.index:
continue
neighbor = chunk_by_index[neighbor_index]
event_evidence = get_event_evidence(neighbor)
if not event_evidence:
continue
anchor_graph_lines.append(
"{anchor_index}\t{anchor_label}\t{anchor_fields}\t{neighbor_index}\t{distance:+d}\t{neighbor_label}\t{neighbor_fields}\t{neighbor_role}\t{event_evidence}".format(
anchor_index=anchor.index,
anchor_label=anchor.primary_label,
anchor_fields=",".join(anchor.field_names),
neighbor_index=neighbor.index,
distance=neighbor.index - anchor.index,
neighbor_label=neighbor.primary_label or "",
neighbor_fields=",".join(neighbor.field_names),
neighbor_role=chunk_role(neighbor),
event_evidence=",".join(event_evidence),
)
)
(out_dir / "referent_anchor_event_graph.tsv").write_text("\n".join(anchor_graph_lines) + "\n", encoding="utf-8")
write_island_graph(
out_dir,
"jelyhack_island_graph.md",
"JELYHACK Island Graph",
{"JELYHACK", "JELYH2"},
descriptor_chunks,
chunk_by_index,
len(chunks),
window=8,
)
write_descriptor_compare(
out_dir,
"jelyhack_descriptor_compare.tsv",
{"JELYHACK", "JELYH2", "REE_BOOT", "SURCAMEW", "SFXTRIG"},
descriptor_chunks,
)
write_island_graph(
out_dir,
"event_island_graph.md",
"EVENT Cluster Graph",
{"EVENT", "COR_BOOT", "NPCTRIG", "ROLL_NS", "CRUZTRIG"},
descriptor_chunks,
chunk_by_index,
len(chunks),
)
write_descriptor_compare(
out_dir,
"event_descriptor_compare.tsv",
{"ROLL_NS", "COR_BOOT", "EVENT", "NPCTRIG", "CRUZTRIG", "NPC_ONLY", "VMAIL"},
descriptor_chunks,
)
write_island_graph(
out_dir,
"boot_frontier_graph.md",
"AND/BRO Boot Frontier Graph",
{"AND_BOOT", "BRO_BOOT"},
descriptor_chunks,
chunk_by_index,
len(chunks),
window=6,
)
write_descriptor_compare(
out_dir,
"boot_family_compare.tsv",
{"AND_BOOT", "BRO_BOOT", "COR_BOOT", "VAR_BOOT", "REE_BOOT"},
descriptor_chunks,
)
write_island_graph(
out_dir,
"environmental_event_graph.md",
"Environmental Event Graph",
{"FLAMEBOX", "NOSTRIL", "STEAMBOX"},
descriptor_chunks,
chunk_by_index,
len(chunks),
window=5,
)
write_descriptor_compare(
out_dir,
"environmental_family_compare.tsv",
{"FLAMEBOX", "NOSTRIL", "STEAMBOX"},
descriptor_chunks,
)
write_descriptor_compare(
out_dir,
"callback_trigger_compare.tsv",
{"SURCAMNS", "SURCAMEW"},
descriptor_chunks,
)
write_event_family_reports(out_dir, descriptor_chunks, chunk_by_index, len(chunks))
write_readable_template_reports(out_dir, descriptor_chunks, chunk_by_index, len(chunks))
write_runtime_bridge_reports(out_dir, descriptor_chunks, chunk_by_index, len(chunks))
write_runtime_family_bridge_reports(out_dir, descriptor_chunks)
lines = []
lines.append("# EUSECODE.FLX First-Pass Extraction")
lines.append("")
lines.append(f"Input: {input_path}")
lines.append(f"File size: 0x{len(data):X} ({len(data)} bytes)")
lines.append(f"Candidate entries: {len(entries)}")
lines.append("")
lines.append("## Header Preview")
lines.append("")
lines.append(f"ASCII: `{ascii_preview(data[:128], 128)}`")
lines.append("")
lines.append("## Chunks")
lines.append("")
lines.append("| # | Table Off | Data Off | Declared Size | Next Off | Text | Overlap | Preview |")
lines.append("|---:|---:|---:|---:|---:|:---:|:---:|---|")
for chunk in chunks:
next_off = f"0x{chunk.next_offset:X}" if chunk.next_offset is not None else "-"
lines.append(
"| {index} | 0x{table_offset:X} | 0x{data_offset:X} | 0x{declared_size:X} | {next_off} | {text_like} | {overlap} | {preview} |".format(
index=chunk.index,
table_offset=chunk.table_offset,
data_offset=chunk.data_offset,
declared_size=chunk.declared_size,
next_off=next_off,
text_like="yes" if chunk.text_like else "no",
overlap="yes" if chunk.overlap_with_next else "no",
preview=chunk.preview.replace("|", "/"),
)
)
lines.append("")
lines.append("## Notes")
lines.append("")
lines.append("- The extractor now parses the validated FLX table directly: entry count at `0x54`, table at `0x80`, 8 bytes per entry.")
lines.append("- Overlapping declared sizes likely mean some entries are counts or record spans rather than exact chunk lengths.")
lines.append("- `.strings.txt` files are the main human-readable output for now; `.txt` files are emitted only for chunks that look text-like.")
lines.append("- `descriptor_index.tsv` summarizes guessed class labels, field names, and compact tag patterns for descriptor-like chunks.")
lines.append("- `class_layout_index.tsv` records the conservative owner-loaded class parsing state: object index, class id, class-name hint, raw bytes-8..11 field, derived code-base-minus-one, and event-count/table-end values when the local divisibility and bounds checks succeed.")
lines.append("- `class_event_index.tsv` now also emits derived body-window columns (`derived_body_start`, `derived_body_end`, `derived_body_length`) plus conservative `repeated_template_status` tags for verified repeated families.")
lines.append("- `boot_family_decompile.md` / `.tsv`, `callback_family_decompile.md` / `.tsv`, and `environmental_family_decompile.md` / `.tsv` now provide reversible per-class decompile artifacts for the `_BOOT`, `SURCAM*`, and environmental repeated-family lanes.")
lines.append("- `repeated_family_regressions.tsv` enforces the current repeated-family slot sets plus the verified raw-row and derived body-window fields for `JELYHACK/JELYH2`, `_BOOT`, `SURCAM*`, and `FLAMEBOX/NOSTRIL/STEAMBOX`.")
lines.append("- `descriptor_neighborhoods.tsv` captures local table neighborhoods around trigger/event-related classes such as `JELYHACK`, `NPCTRIG`, `CRUZTRIG`, `TRIGPAD`, and `SPECIAL`.")
lines.append("- `referent_anchor_event_graph.tsv` groups referent-bearing descriptors with nearby event-bearing neighbors so the attachment model can be inspected without ad hoc grepping.")
lines.append("- `jelyhack_island_graph.md` now uses a wider local window so the `JELYHACK` / `JELYH2` anchors can be inspected alongside the nearby event-bearing `REE_BOOT`, `SURCAMEW`, and `SFXTRIG` descriptors rather than stopping at the referent-only neighbors.")
lines.append("- `jelyhack_descriptor_compare.tsv` captures the first 16 header words, first 8 dwords, and a few odd printable markers for the core JELYHACK-island descriptors so structural similarity can be compared without raw hex dumps.")
lines.append("- `event_island_graph.md` renders the denser `EVENT` / `COR_BOOT` / `NPCTRIG` / `ROLL_NS` / `CRUZTRIG` island, which currently looks like the strongest event-explicit neighborhood outside the JELYHACK anchor case.")
lines.append("- `event_descriptor_compare.tsv` captures the same header-word and printable-marker comparison for the `EVENT` island so large event-bearing descriptors can be contrasted with neighboring trigger and referent records.")
lines.append("- `boot_frontier_graph.md` renders the upstream referent neighborhood feeding `AND_BOOT` / `BRO_BOOT`, which is currently the clearest unexplored boot-event frontier.")
lines.append("- `boot_family_compare.tsv` compares the five `_BOOT` event cores (`AND_BOOT`, `BRO_BOOT`, `COR_BOOT`, `VAR_BOOT`, `REE_BOOT`) by header words, markers, and field tags.")
lines.append("- `environmental_event_graph.md` renders the three hazard/event islands centered on `FLAMEBOX`, `NOSTRIL`, and `STEAMBOX`, each surrounded by its own referent-heavy local neighborhood.")
lines.append("- `environmental_family_compare.tsv` compares the environmental event trio so the shared hazard pattern (`referent,event,<hazard>,<hazard2>,direction,count`) can be contrasted directly.")
lines.append("- `callback_trigger_compare.tsv` compares `SURCAMNS` and `SURCAMEW` directly so the callback-only `eventTrigger` lane can be checked against the active `event` families without raw hex dumps.")
lines.append("- `event_family_index.tsv` and `event_family_summary.md` classify all current `event` and `eventTrigger` descriptors into reusable families such as boot-event cores, minimal event cores, environmental events, and callback-only surveillance triggers.")
lines.append("- `readable_descriptor_templates.md` and `readable_descriptor_templates.tsv` emit conservative pseudo-script sketches for the strongest current anchor, event-hub, environmental, and callback lanes so USECODE neighborhoods can be read as structured attachments instead of only raw descriptor rows.")
lines.append("- `runtime_vm_ir.tsv` captures the currently verified 000d VM operator vocabulary as machine-readable rows with stage addresses, opcode/lane status, payload shape, and evidence anchors.")
lines.append("- `vm_mask_ladder.tsv` records the current `entity_vm_context_try_create_masked_for_entity` wrapper ladder in machine-readable form so gameplay mask lanes can be compared against descriptor-side families without reopening the notes.")
lines.append("- `readable_script_ir.md` and `readable_script_ir.tsv` join descriptor neighborhoods, the verified VM IR, the runtime owner/source path, and the current mask-family hints into one conservative script-facing bridge artifact.")
lines.append("- `runtime_descriptor_family_rankings.md` and `runtime_descriptor_family_rankings.tsv` rank descriptor families against the verified runtime lanes so the current human-readable script bridge is searchable by family fit rather than only by neighborhood dumps.")
(out_dir / "README.md").write_text("\n".join(lines) + "\n", encoding="utf-8")
all_strings = iter_printable_runs(data)
(out_dir / "all_strings.txt").write_text("\n".join(all_strings) + ("\n" if all_strings else ""), encoding="utf-8")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("input", nargs="?", type=pathlib.Path, default=DEFAULT_INPUT)
parser.add_argument("output", nargs="?", type=pathlib.Path, default=DEFAULT_OUTPUT)
return parser.parse_args()
def main() -> int:
args = parse_args()
data = args.input.read_bytes()
args.output.mkdir(parents=True, exist_ok=True)
flx_table = parse_flx_table(data)
entries = flx_table.entries
chunks = extract_candidates(data, args.output, entries)
write_summary(args.output, args.input, data, entries, chunks)
print(
f"Parsed {flx_table.entry_count} table slots with {len(chunks)} non-zero entries; extracted to {args.output}"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())