Crusader_Decomp/tools/extract_eusecode_flx.py
MaddoScientisto daa363c3d2 Add 'annotate-usecode' command to import USECODE IR JSON annotations
- Introduced a new command 'annotate-usecode' to import USECODE IR JSON annotation hints as Ghidra comments on compiled anchors.
- Added argument parsing for multiple IR JSON files, comment type selection, and a dry-run option.
- Implemented logic to read annotation records from the provided IR files and set comments on the corresponding addresses in Ghidra.
- Enhanced JSON schema to include response structure for the new command.
2026-03-24 18:14:20 +01:00

3044 lines
140 KiB
Python

#!/usr/bin/env python3
"""Extractor for Crusader's EUSECODE.FLX container.
Current validated layout:
- 0x80-byte header area
- little-endian entry count at file offset 0x54
- entry table begins at 0x80
- each entry is 8 bytes: <u32 data_offset, u32 declared_size>
The exact semantics of the payload records are still under RE, so the extractor dumps
all non-zero entries and emits human-readable sidecars (.strings.txt and index files)
to support the next decoding pass.
"""
from __future__ import annotations
import argparse
import hashlib
import json
import pathlib
import struct
from dataclasses import asdict, dataclass
DEFAULT_INPUT = pathlib.Path(r"k:\ghidra\Crusader_Decomp\USECODE\EUSECODE.FLX")
DEFAULT_OUTPUT = pathlib.Path(r"k:\ghidra\Crusader_Decomp\USECODE\EUSECODE_extracted")
@dataclass(frozen=True)
class CandidateEntry:
table_offset: int
data_offset: int
declared_size: int
@dataclass
class ExtractedChunk:
index: int
table_offset: int
object_index: int | None
data_offset: int
declared_size: int
next_offset: int | None
extracted_size: int
overlap_with_next: bool
text_like: bool
printable_ratio: float
zero_ratio: float
preview: str
raw_path: str
strings_path: str
text_path: str | None
primary_label: str | None
field_names: list[str]
field_tags: list[str]
class_id: int | None = None
class_name_hint: str | None = None
raw_code_base_u32: int | None = None
code_base_minus_one: int | None = None
conservative_event_count: int | None = None
event_table_end: int | None = None
class_parse_status: str | None = None
@dataclass(frozen=True)
class ClassEventRow:
entry_index: int
object_index: int
class_id: int
class_name_hint: str
slot: int
event_name_hint: str | None
raw_event_entry_word: int
raw_code_offset: int
derived_body_start: int | None
derived_body_end: int | None
derived_body_length: int | None
@dataclass(frozen=True)
class FlxTable:
entry_count: int
table_offset: int
table_end: int
entries: list[CandidateEntry]
@dataclass(frozen=True)
class FamilyArtifactSpec:
output_stem: str
title: str
labels: tuple[str, ...]
@dataclass(frozen=True)
class RepeatedFamilyRowExpectation:
class_name: str
slot: int
raw_event_entry_word: int
raw_code_offset: int
derived_body_start: int
derived_body_end: int
derived_body_length: int
repeated_template_status: str
def read_u32_le(data: bytes, offset: int) -> int:
return struct.unpack_from("<I", data, offset)[0]
def read_u16_le(data: bytes, offset: int) -> int:
return struct.unpack_from("<H", data, offset)[0]
def ascii_preview(data: bytes, limit: int = 64) -> str:
preview = []
for byte in data[:limit]:
if 0x20 <= byte <= 0x7E:
preview.append(chr(byte))
else:
preview.append(".")
return "".join(preview)
def printable_ratio(data: bytes) -> float:
if not data:
return 0.0
printable = sum(1 for byte in data if byte in (0x09, 0x0A, 0x0D) or 0x20 <= byte <= 0x7E)
return printable / len(data)
def zero_ratio(data: bytes) -> float:
if not data:
return 0.0
return data.count(0) / len(data)
def iter_printable_runs(data: bytes, min_len: int = 4) -> list[str]:
runs: list[str] = []
current = bytearray()
for byte in data:
if byte in (0x09, 0x0A, 0x0D) or 0x20 <= byte <= 0x7E:
current.append(byte)
continue
if len(current) >= min_len:
runs.append(current.decode("latin-1"))
current.clear()
if len(current) >= min_len:
runs.append(current.decode("latin-1"))
return runs
def summarize_descriptor(strings: list[str]) -> tuple[str | None, list[str]]:
label_counts: dict[str, int] = {}
field_names: list[str] = []
seen_fields: set[str] = set()
for value in strings:
if value.isupper() and any(ch.isalpha() for ch in value):
label_counts[value] = label_counts.get(value, 0) + 1
continue
if value and value[0].islower() and value.replace("_", "").isalnum() and value not in seen_fields:
seen_fields.add(value)
field_names.append(value)
primary_label = None
if label_counts:
primary_label = sorted(label_counts.items(), key=lambda item: (-item[1], item[0]))[0][0]
return primary_label, field_names
def extract_field_tag_records(data: bytes, field_names: list[str]) -> list[str]:
tags: list[str] = []
seen: set[str] = set()
for field_name in field_names:
needle = field_name.encode("latin-1")
start = 0
while True:
pos = data.find(needle, start)
if pos < 3:
break
tag = f"{data[pos - 3]:02X}:{data[pos - 2]:02X}{data[pos - 1]:02X}->{field_name}"
if tag not in seen:
seen.add(tag)
tags.append(tag)
start = pos + 1
tags.sort()
return tags
def has_referent_field(chunk: ExtractedChunk) -> bool:
if "referent" in chunk.field_names:
return True
return any(tag.endswith("->referent") for tag in chunk.field_tags)
def get_event_evidence(chunk: ExtractedChunk) -> list[str]:
evidence: list[str] = []
seen: set[str] = set()
for field_name in chunk.field_names:
if "event" not in field_name.lower():
continue
marker = f"field:{field_name}"
if marker not in seen:
seen.add(marker)
evidence.append(marker)
for field_tag in chunk.field_tags:
if "->event" not in field_tag.lower():
continue
marker = f"tag:{field_tag}"
if marker not in seen:
seen.add(marker)
evidence.append(marker)
return evidence
def chunk_role(chunk: ExtractedChunk) -> str:
if chunk.primary_label in {"JELYHACK", "JELYH2"}:
return "referent-anchor"
if get_event_evidence(chunk):
return "event-bearing"
if has_referent_field(chunk):
return "referent-neighbor"
return "neighbor"
def has_event_trigger_field(chunk: ExtractedChunk) -> bool:
if any("eventtrigger" == field_name.lower() for field_name in chunk.field_names):
return True
return any("->eventtrigger" in field_tag.lower() for field_tag in chunk.field_tags)
def event_tag_kind(chunk: ExtractedChunk) -> str:
if any("->eventtrigger" in field_tag.lower() for field_tag in chunk.field_tags):
return "eventTrigger"
if any(field_tag.lower().endswith("->event") for field_tag in chunk.field_tags):
return "event"
return ""
def classify_event_family(chunk: ExtractedChunk) -> str:
if event_tag_kind(chunk) == "eventTrigger":
return "callback-eventtrigger"
if event_tag_kind(chunk) != "event":
return ""
if chunk.primary_label == "EVENT":
return "event-hub"
if chunk.primary_label and chunk.primary_label.endswith("_BOOT"):
return "boot-event-core"
if chunk.field_names == ["referent", "event"]:
return "minimal-event-core"
if any(name in chunk.field_names for name in ("flame", "flame2", "fire", "fire2", "steam", "steam2")):
return "environmental-event"
if "typeNpc" in chunk.field_names:
return "npc-trigger"
return "specialized-event"
def readable_role(chunk: ExtractedChunk) -> str:
family = classify_event_family(chunk)
if family == "event-hub":
return "hub"
if chunk_role(chunk) == "referent-anchor":
return "anchor"
if event_tag_kind(chunk) == "event":
return "attach"
if event_tag_kind(chunk) == "eventTrigger":
return "callback"
if has_referent_field(chunk):
return "neighbor"
return "data"
def clean_token(value: str) -> str:
return " ".join(value.split())
def readable_subject(chunk: ExtractedChunk) -> str:
label = clean_token(chunk.primary_label or f"chunk_{chunk.index}")
fields = ",".join(clean_token(field_name) for field_name in chunk.field_names) or "?"
return f"{label}({fields})"
def readable_signature(chunk: ExtractedChunk) -> str:
return f"{readable_role(chunk)} {readable_subject(chunk)}"
VERIFIED_VM_IR_ROWS: tuple[dict[str, str], ...] = (
{
"stage_address": "000d:0988",
"ir_name": "APPEND_UNIQUE_INLINE",
"opcode_or_lane": "opcode 0x18 (implied sibling)",
"payload_shape": "inline referent-chain payload",
"evidence": "0x19/0x1a/0x1b compares in 000d:0988 imply 0x18 as append-unique inline sibling",
},
{
"stage_address": "000d:0988",
"ir_name": "APPEND_UNIQUE_INDIRECT",
"opcode_or_lane": "opcode 0x19",
"payload_shape": "indirect/string-like referent-chain payload",
"evidence": "[BP-0x32] == 0x19 path in 000d:0988 with indirect-mode flag",
},
{
"stage_address": "000d:0988",
"ir_name": "REMOVE_MATCHING_INDIRECT",
"opcode_or_lane": "opcode 0x1a",
"payload_shape": "indirect/string-like referent-chain payload",
"evidence": "[BP-0x32] == 0x1a path in 000d:0988 reaches entity_vm_referent_chain_remove_matching_from",
},
{
"stage_address": "000d:0988",
"ir_name": "REMOVE_MATCHING_INLINE",
"opcode_or_lane": "opcode 0x1b",
"payload_shape": "inline referent-chain payload",
"evidence": "[BP-0x32] == 0x1b path in 000d:0988 reaches entity_vm_referent_chain_remove_matching_from without indirect-mode flag",
},
{
"stage_address": "000d:177c",
"ir_name": "PUSH_FRAME_WORD_LITERAL",
"opcode_or_lane": "same FUN_000d_ebe3 sequencer family",
"payload_shape": "word scalar pushed to stream stack",
"evidence": "000d:177c subtracts 2 from [context+0xcc] and stores frame-local word before entity_vm_opcode_finish",
},
{
"stage_address": "000d:1acb",
"ir_name": "COMPARE_STREAM_DWORD_AND_PUSH_BOOL",
"opcode_or_lane": "same FUN_000d_ebe3 sequencer family",
"payload_shape": "stream dword pair consumed, predicate word emitted",
"evidence": "000d:1acb reads one 32-bit pair from stream, compares against AX:DX, pushes boolean word result",
},
{
"stage_address": "000d:208b",
"ir_name": "MATERIALIZE_OR_FORWARD_VALUE",
"opcode_or_lane": "slot-backed context consumer",
"payload_shape": "materialized slot value or forwarded object result",
"evidence": "000d:208b builds one VM context then forwards immediate or object-backed value through shared epilogue",
},
{
"stage_address": "000d:21ed",
"ir_name": "PREPEND_INLINE_PAYLOAD",
"opcode_or_lane": "inline payload substage",
"payload_shape": "caller-owned blob copied into context +0x102 buffer",
"evidence": "000d:21ed prepends caller bytes into backward-growing context buffer before metadata-driven follow-on work",
},
{
"stage_address": "000d:22bc",
"ir_name": "BUILD_ENTITY_LINK_MATRIX",
"opcode_or_lane": "inline payload follow-on stage",
"payload_shape": "two signed metadata bytes plus streamed entity/link ids",
"evidence": "000d:22bc consumes two signed bytes from +0xd6/+0xd8 and streamed words for repeated entity_link calls",
},
{
"stage_address": "000d:22bc",
"ir_name": "EMIT_OR_PUSHBACK_RESULT",
"opcode_or_lane": "inline payload follow-on stage",
"payload_shape": "stream writeback filter",
"evidence": "000d:23da..2421 pushes back only results without 0x0400 list flag before opcode finish",
},
{
"stage_address": "000d:2104",
"ir_name": "FINALIZE_MIXED_VALUE_TO_OUTPTR",
"opcode_or_lane": "same FUN_000d_ebe3 sequencer family",
"payload_shape": "mixed immediate/object scalar return",
"evidence": "000d:2104 writes either frame-local dword or object word with high word cleared to caller out-ptr",
},
)
VERIFIED_MASK_LADDER_ROWS: tuple[dict[str, str], ...] = (
{
"wrapper_address": "0005:27a4",
"mask_pair": "0x0001:0000",
"caller_anchor": "000c:a09e entity +0x5b bit 0x0004 branch",
"descriptor_bias": "generic active-event-biased lane; no direct class-id bridge",
},
{
"wrapper_address": "0005:2867",
"mask_pair": "0x0002:0001",
"caller_anchor": "stores result into entity field +0x39",
"descriptor_bias": "active-event ecosystem bias stronger than referent-anchor bias",
},
{
"wrapper_address": "0005:2ae2",
"mask_pair": "0x0004:0002",
"caller_anchor": "same verified local mask ladder around entity_vm_context_try_create_masked_for_entity",
"descriptor_bias": "active-event ecosystem bias stronger than referent-anchor bias",
},
{
"wrapper_address": "0005:2c9b",
"mask_pair": "0x0010:0004",
"caller_anchor": "direct callers at 0005:5946 and 0005:59e9",
"descriptor_bias": "active-event ecosystem bias stronger than referent-anchor bias",
},
{
"wrapper_address": "0005:2918",
"mask_pair": "0x0020:0005",
"caller_anchor": "+0x3c == 0x20b object lane, carries caller fields +0x36/+0x38",
"descriptor_bias": "candidate active-event field bridge; still not descriptor-specific",
},
{
"wrapper_address": "0005:2c06",
"mask_pair": "0x0200:0009",
"caller_anchor": "direct caller anchor at 0005:0292",
"descriptor_bias": "active-event ecosystem bias stronger than referent-anchor bias",
},
{
"wrapper_address": "0005:2c35",
"mask_pair": "0x0400:000a",
"caller_anchor": "xref-dark signed slot-offset wrapper forwarding context +0x34",
"descriptor_bias": "offset-specialized masked context creation; descriptor family unresolved",
},
{
"wrapper_address": "0005:2c68",
"mask_pair": "0x0800:000b",
"caller_anchor": "xref-dark signed slot-offset wrapper forwarding context +0x34",
"descriptor_bias": "offset-specialized masked context creation; descriptor family unresolved",
},
{
"wrapper_address": "0005:2cd2",
"mask_pair": "0x1000:000c",
"caller_anchor": "direct caller anchor at 0005:0fee",
"descriptor_bias": "active-event ecosystem bias stronger than referent-anchor bias",
},
{
"wrapper_address": "0004:f05c",
"mask_pair": "0x2000:0015",
"caller_anchor": "0004:f2b3 overlap/proximity branch with entity byte +0x32 state checks",
"descriptor_bias": "gameplay-state lane feeding runtime materialization, not direct descriptor switch",
},
{
"wrapper_address": "0005:2d01",
"mask_pair": "0x4000:000e",
"caller_anchor": "direct callers at 0007:814e and 0007:822e",
"descriptor_bias": "active-event ecosystem bias stronger than referent-anchor bias",
},
{
"wrapper_address": "0004:f033",
"mask_pair": "0x8000:0007",
"caller_anchor": "local wrapper seed recovered from direct instruction evidence",
"descriptor_bias": "gameplay-side materialization lane; still descriptor-agnostic",
},
{
"wrapper_address": "0005:2d30",
"mask_pair": "0x8000:000f",
"caller_anchor": "entity id/class flag gate plus 0x0f16 / 0x20f dispatch-entry emission path",
"descriptor_bias": "strongest current active-event ecosystem candidate in the ladder",
},
)
SCUMMVM_EVENT_NAME_HINTS: tuple[str, ...] = (
"look",
"use",
"anim",
"setActivity",
"cachein",
"hit",
"gotHit",
"hatch",
"schedule",
"release",
"equip",
"unequip",
"combine",
"func0D",
"calledFromAnim",
"enterFastArea",
"leaveFastArea",
"cast",
"justMoved",
"avatarStoleSomething",
"animGetHit",
"unhatch",
"func16",
"func17",
"func18",
"func19",
"func1A",
"func1B",
"func1C",
"func1D",
"func1E",
"func1F",
)
VERIFIED_REPEATED_TEMPLATE_FAMILIES: tuple[tuple[str, tuple[str, ...]], ...] = (
("referent-anchor-twin", ("JELYHACK", "JELYH2")),
("boot-event-core", ("AND_BOOT", "BRO_BOOT", "COR_BOOT", "REE_BOOT", "VAR_BOOT")),
("callback-eventtrigger", ("SURCAMNS", "SURCAMEW")),
("environmental-event", ("FLAMEBOX", "NOSTRIL", "STEAMBOX")),
)
FAMILY_ARTIFACT_SPECS: tuple[FamilyArtifactSpec, ...] = (
FamilyArtifactSpec(
output_stem="boot_family_decompile",
title="_BOOT Family Decompiled Event Sketches",
labels=("AND_BOOT", "BRO_BOOT", "COR_BOOT", "REE_BOOT", "VAR_BOOT"),
),
FamilyArtifactSpec(
output_stem="callback_family_decompile",
title="SURCAM Callback Family Decompiled Event Sketches",
labels=("SURCAMNS", "SURCAMEW"),
),
FamilyArtifactSpec(
output_stem="environmental_family_decompile",
title="Environmental Family Decompiled Event Sketches",
labels=("FLAMEBOX", "NOSTRIL", "STEAMBOX"),
),
)
IMMORTALITY_TARGET_LABELS: tuple[str, ...] = (
"EVENT",
"NPCTRIG",
"COR_BOOT",
"REE_BOOT",
"SFXTRIG",
"SPECIAL",
"TRIGPAD",
)
IMMORTALITY_TEMPLATE_COMPARE_LABELS: frozenset[str] = frozenset(
{"NPCTRIG", "COR_BOOT", "REE_BOOT", "SFXTRIG"}
)
IMMORTALITY_STRUCTURAL_TARGET_LABELS: frozenset[str] = frozenset({"EVENT", "NPCTRIG"})
IMMORTALITY_BODY_MOTIFS: tuple[tuple[str, bytes], ...] = (
("call_40_06_4c_02", bytes.fromhex("40 06 4c 02")),
("call_40_06_0f_04", bytes.fromhex("40 06 0f 04")),
("subheader_53_5c", bytes.fromhex("53 5c")),
("writeback_57_02", bytes.fromhex("57 02")),
("branch_59_0a", bytes.fromhex("59 0a")),
("branch_3f_0a", bytes.fromhex("3f 0a")),
("field_4b_fe_0f", bytes.fromhex("4b fe 0f")),
("field_4b_fc_0f", bytes.fromhex("4b fc 0f")),
("push_24_51", bytes.fromhex("24 51")),
("event_field_69_0a_00", bytes.fromhex("69 0a 00")),
)
VERIFIED_REPEATED_FAMILY_ROW_EXPECTATIONS: tuple[RepeatedFamilyRowExpectation, ...] = (
RepeatedFamilyRowExpectation("JELYHACK", 0x01, 0x002A, 0x00000001, 0x00D4, 0x00FE, 42, "referent-anchor-twin/shared-slot-0x01/same-length-template"),
RepeatedFamilyRowExpectation("JELYH2", 0x01, 0x002A, 0x00000001, 0x00D4, 0x00FE, 42, "referent-anchor-twin/shared-slot-0x01/same-length-template"),
RepeatedFamilyRowExpectation("AND_BOOT", 0x0A, 0x0253, 0x00000001, 0x00D4, 0x0327, 595, "boot-event-core/shared-slot-0x0A/shared-slot-template"),
RepeatedFamilyRowExpectation("AND_BOOT", 0x0F, 0x0237, 0x00000254, 0x0327, 0x055E, 567, "boot-event-core/shared-slot-0x0F/shared-slot-template"),
RepeatedFamilyRowExpectation("AND_BOOT", 0x10, 0x003B, 0x0000048B, 0x055E, 0x0599, 59, "boot-event-core/shared-slot-0x10/same-length-template"),
RepeatedFamilyRowExpectation("BRO_BOOT", 0x0A, 0x02D5, 0x00000001, 0x00D4, 0x03A9, 725, "boot-event-core/shared-slot-0x0A/shared-slot-template"),
RepeatedFamilyRowExpectation("BRO_BOOT", 0x0F, 0x024C, 0x000002D6, 0x03A9, 0x05F5, 588, "boot-event-core/shared-slot-0x0F/shared-slot-template"),
RepeatedFamilyRowExpectation("BRO_BOOT", 0x10, 0x003B, 0x00000522, 0x05F5, 0x0630, 59, "boot-event-core/shared-slot-0x10/same-length-template"),
RepeatedFamilyRowExpectation("COR_BOOT", 0x0A, 0x0227, 0x00000001, 0x00D4, 0x02FB, 551, "boot-event-core/shared-slot-0x0A/shared-slot-template"),
RepeatedFamilyRowExpectation("COR_BOOT", 0x0F, 0x0234, 0x00000228, 0x02FB, 0x052F, 564, "boot-event-core/shared-slot-0x0F/shared-slot-template"),
RepeatedFamilyRowExpectation("COR_BOOT", 0x10, 0x003B, 0x0000045C, 0x052F, 0x056A, 59, "boot-event-core/shared-slot-0x10/same-length-template"),
RepeatedFamilyRowExpectation("REE_BOOT", 0x0A, 0x034B, 0x00000001, 0x00D4, 0x041F, 843, "boot-event-core/shared-slot-0x0A/shared-slot-template"),
RepeatedFamilyRowExpectation("REE_BOOT", 0x0F, 0x025C, 0x0000034C, 0x041F, 0x067B, 604, "boot-event-core/shared-slot-0x0F/shared-slot-template"),
RepeatedFamilyRowExpectation("REE_BOOT", 0x10, 0x003B, 0x000005A8, 0x067B, 0x06B6, 59, "boot-event-core/shared-slot-0x10/same-length-template"),
RepeatedFamilyRowExpectation("VAR_BOOT", 0x0A, 0x029A, 0x00000001, 0x00D4, 0x036E, 666, "boot-event-core/shared-slot-0x0A/shared-slot-template"),
RepeatedFamilyRowExpectation("VAR_BOOT", 0x0F, 0x0244, 0x0000029B, 0x036E, 0x05B2, 580, "boot-event-core/shared-slot-0x0F/shared-slot-template"),
RepeatedFamilyRowExpectation("VAR_BOOT", 0x10, 0x003B, 0x000004DF, 0x05B2, 0x05ED, 59, "boot-event-core/shared-slot-0x10/same-length-template"),
RepeatedFamilyRowExpectation("SURCAMNS", 0x01, 0x0051, 0x000000D2, 0x01B7, 0x0208, 81, "callback-eventtrigger/shared-slot-0x01/shared-slot-template"),
RepeatedFamilyRowExpectation("SURCAMNS", 0x0A, 0x00D1, 0x00000001, 0x00E6, 0x01B7, 209, "callback-eventtrigger/shared-slot-0x0A/same-length-template"),
RepeatedFamilyRowExpectation("SURCAMNS", 0x20, 0x02BA, 0x00000123, 0x0208, 0x04C2, 698, "callback-eventtrigger/shared-slot-0x20/same-length-template"),
RepeatedFamilyRowExpectation("SURCAMNS", 0x21, 0x0709, 0x000003DD, 0x04C2, 0x0BCB, 1801, "callback-eventtrigger/shared-slot-0x21/shared-slot-template"),
RepeatedFamilyRowExpectation("SURCAMNS", 0x22, 0x01A3, 0x00000AE6, 0x0BCB, 0x0D6E, 419, "callback-eventtrigger/shared-slot-0x22/same-length-template"),
RepeatedFamilyRowExpectation("SURCAMEW", 0x01, 0x00F7, 0x000000D2, 0x01B7, 0x02AE, 247, "callback-eventtrigger/shared-slot-0x01/shared-slot-template"),
RepeatedFamilyRowExpectation("SURCAMEW", 0x0A, 0x00D1, 0x00000001, 0x00E6, 0x01B7, 209, "callback-eventtrigger/shared-slot-0x0A/same-length-template"),
RepeatedFamilyRowExpectation("SURCAMEW", 0x20, 0x02BA, 0x000001C9, 0x02AE, 0x0568, 698, "callback-eventtrigger/shared-slot-0x20/same-length-template"),
RepeatedFamilyRowExpectation("SURCAMEW", 0x21, 0x0655, 0x00000483, 0x0568, 0x0BBD, 1621, "callback-eventtrigger/shared-slot-0x21/shared-slot-template"),
RepeatedFamilyRowExpectation("SURCAMEW", 0x22, 0x01A3, 0x00000AD8, 0x0BBD, 0x0D60, 419, "callback-eventtrigger/shared-slot-0x22/same-length-template"),
RepeatedFamilyRowExpectation("FLAMEBOX", 0x0A, 0x026A, 0x00000001, 0x00E0, 0x034A, 618, "environmental-event/shared-slot-0x0A/shared-slot-template"),
RepeatedFamilyRowExpectation("FLAMEBOX", 0x20, 0x01AC, 0x0000026B, 0x034A, 0x04F6, 428, "environmental-event/shared-slot-0x20/shared-slot-template"),
RepeatedFamilyRowExpectation("FLAMEBOX", 0x21, 0x029A, 0x00000417, 0x04F6, 0x0790, 666, "environmental-event/shared-slot-0x21/shared-slot-template"),
RepeatedFamilyRowExpectation("NOSTRIL", 0x0A, 0x00C0, 0x00000001, 0x00E0, 0x01A0, 192, "environmental-event/shared-slot-0x0A/shared-slot-template"),
RepeatedFamilyRowExpectation("NOSTRIL", 0x20, 0x0129, 0x000000C1, 0x01A0, 0x02C9, 297, "environmental-event/shared-slot-0x20/shared-slot-template"),
RepeatedFamilyRowExpectation("NOSTRIL", 0x21, 0x01BE, 0x000001EA, 0x02C9, 0x0487, 446, "environmental-event/shared-slot-0x21/shared-slot-template"),
RepeatedFamilyRowExpectation("STEAMBOX", 0x0A, 0x0266, 0x00000001, 0x00E0, 0x0346, 614, "environmental-event/shared-slot-0x0A/shared-slot-template"),
RepeatedFamilyRowExpectation("STEAMBOX", 0x20, 0x01F6, 0x00000267, 0x0346, 0x053C, 502, "environmental-event/shared-slot-0x20/shared-slot-template"),
RepeatedFamilyRowExpectation("STEAMBOX", 0x21, 0x02A7, 0x0000045D, 0x053C, 0x07E3, 679, "environmental-event/shared-slot-0x21/shared-slot-template"),
)
def scummvm_event_name_hint(slot: int) -> str | None:
if 0 <= slot < len(SCUMMVM_EVENT_NAME_HINTS):
return SCUMMVM_EVENT_NAME_HINTS[slot]
return None
def object_index_from_table_offset(table_offset: int) -> int | None:
if table_offset < 0x80:
return None
relative = table_offset - 0x80
if relative % 8 != 0:
return None
return relative // 8
def decode_name_table_entry(name_table_data: bytes, class_id: int) -> str | None:
if class_id < 0:
return None
name_offset = 4 + 13 * class_id
if name_offset + 13 > len(name_table_data):
return None
raw_name = name_table_data[name_offset:name_offset + 13]
raw_name = raw_name.split(b"\x00", 1)[0].rstrip(b"\x00 ")
if not raw_name:
return None
return raw_name.decode("latin-1", errors="replace")
def annotate_class_layout(chunks: list[ExtractedChunk]) -> None:
name_table_data: bytes | None = None
for chunk in chunks:
chunk.object_index = object_index_from_table_offset(chunk.table_offset)
if chunk.object_index == 1:
name_table_data = pathlib.Path(chunk.raw_path).read_bytes()
chunk.class_parse_status = "name-table"
for chunk in chunks:
if chunk.object_index is None:
chunk.class_parse_status = chunk.class_parse_status or "unaligned-table-offset"
continue
if chunk.object_index < 2:
chunk.class_parse_status = chunk.class_parse_status or "non-class-object"
continue
chunk.class_id = chunk.object_index - 2
if name_table_data is not None:
chunk.class_name_hint = decode_name_table_entry(name_table_data, chunk.class_id)
raw_data = pathlib.Path(chunk.raw_path).read_bytes()
if len(raw_data) < 20:
chunk.class_parse_status = "too-small-for-class-header"
continue
raw_code_base_u32 = read_u32_le(raw_data, 8)
chunk.raw_code_base_u32 = raw_code_base_u32
if raw_code_base_u32 > 0:
chunk.code_base_minus_one = raw_code_base_u32 - 1
event_region = raw_code_base_u32 - 20
if event_region < 0:
chunk.class_parse_status = "header-before-event-table"
continue
if event_region % 6 != 0:
chunk.class_parse_status = "event-region-not-divisible-by-6"
continue
event_count = event_region // 6
event_table_end = 20 + event_count * 6
if event_table_end > len(raw_data):
chunk.class_parse_status = "event-table-past-object-end"
continue
chunk.conservative_event_count = event_count
chunk.event_table_end = event_table_end
chunk.class_parse_status = "parsed-class-layout"
def derive_class_event_rows(chunk: ExtractedChunk, raw_data: bytes) -> list[ClassEventRow]:
if chunk.class_parse_status != "parsed-class-layout":
return []
if chunk.object_index is None or chunk.class_id is None or chunk.conservative_event_count is None:
return []
provisional_rows: list[tuple[int, int, int]] = []
for slot in range(chunk.conservative_event_count):
entry_offset = 20 + 6 * slot
raw_word = read_u16_le(raw_data, entry_offset)
raw_code_offset = read_u32_le(raw_data, entry_offset + 2)
provisional_rows.append((slot, raw_word, raw_code_offset))
non_zero_offsets = sorted(
{
raw_code_offset
for _, _, raw_code_offset in provisional_rows
if raw_code_offset != 0
}
)
rows: list[ClassEventRow] = []
for slot, raw_word, raw_code_offset in provisional_rows:
derived_body_start: int | None = None
derived_body_end: int | None = None
derived_body_length: int | None = None
if raw_code_offset != 0 and chunk.code_base_minus_one is not None:
body_start = chunk.code_base_minus_one + raw_code_offset
next_offsets = [offset for offset in non_zero_offsets if offset > raw_code_offset]
body_end = chunk.code_base_minus_one + next_offsets[0] if next_offsets else len(raw_data)
if 0 <= body_start <= body_end <= len(raw_data):
derived_body_start = body_start
derived_body_end = body_end
derived_body_length = body_end - body_start
rows.append(
ClassEventRow(
entry_index=chunk.index,
object_index=chunk.object_index,
class_id=chunk.class_id,
class_name_hint=chunk.class_name_hint or "",
slot=slot,
event_name_hint=scummvm_event_name_hint(slot),
raw_event_entry_word=raw_word,
raw_code_offset=raw_code_offset,
derived_body_start=derived_body_start,
derived_body_end=derived_body_end,
derived_body_length=derived_body_length,
)
)
return rows
def build_class_event_rows(
parsed_class_chunks: list[ExtractedChunk],
) -> tuple[list[ClassEventRow], dict[int, list[ClassEventRow]], dict[int, bytes]]:
all_rows: list[ClassEventRow] = []
rows_by_entry: dict[int, list[ClassEventRow]] = {}
raw_data_by_entry: dict[int, bytes] = {}
for chunk in parsed_class_chunks:
raw_data = pathlib.Path(chunk.raw_path).read_bytes()
raw_data_by_entry[chunk.index] = raw_data
rows = derive_class_event_rows(chunk, raw_data)
rows_by_entry[chunk.index] = rows
all_rows.extend(rows)
return all_rows, rows_by_entry, raw_data_by_entry
def build_repeated_template_status_map(
parsed_class_chunks: list[ExtractedChunk],
rows_by_entry: dict[int, list[ClassEventRow]],
raw_data_by_entry: dict[int, bytes],
) -> dict[tuple[int, int], str]:
status_by_row: dict[tuple[int, int], str] = {}
chunk_by_label = {
chunk.primary_label: chunk
for chunk in parsed_class_chunks
if chunk.primary_label
}
for family_name, labels in VERIFIED_REPEATED_TEMPLATE_FAMILIES:
family_chunks = [chunk_by_label[label] for label in labels if label in chunk_by_label]
if len(family_chunks) < 2:
continue
rows_by_slot: dict[int, list[tuple[ExtractedChunk, ClassEventRow, bytes]]] = {}
for chunk in family_chunks:
raw_data = raw_data_by_entry.get(chunk.index)
if raw_data is None:
continue
for row in rows_by_entry.get(chunk.index, []):
if row.raw_code_offset == 0:
continue
if row.derived_body_start is None or row.derived_body_end is None:
continue
body = raw_data[row.derived_body_start:row.derived_body_end]
rows_by_slot.setdefault(row.slot, []).append((chunk, row, body))
for slot, slot_rows in rows_by_slot.items():
if len(slot_rows) < 2:
continue
lengths = {len(body) for _, _, body in slot_rows}
bodies = {body for _, _, body in slot_rows}
if len(bodies) == 1:
status_suffix = "exact-body-clone"
elif len(lengths) == 1:
status_suffix = "same-length-template"
else:
status_suffix = "shared-slot-template"
status = f"{family_name}/shared-slot-0x{slot:02X}/{status_suffix}"
for chunk, row, _ in slot_rows:
status_by_row[(chunk.index, row.slot)] = status
return status_by_row
def format_optional_hex(value: int | None, width: int = 0) -> str:
if value is None:
return ""
if width > 0:
return f"0x{value:0{width}X}"
return f"0x{value:X}"
def hex_edge(data: bytes, width: int = 8) -> str:
if not data:
return ""
return data[:width].hex()
def hex_tail(data: bytes, width: int = 8) -> str:
if not data:
return ""
return data[-width:].hex()
def find_all_offsets(haystack: bytes, needle: bytes) -> list[int]:
offsets: list[int] = []
start = 0
while True:
found = haystack.find(needle, start)
if found < 0:
return offsets
offsets.append(found)
start = found + 1
def common_prefix_len(left: bytes, right: bytes) -> int:
limit = min(len(left), len(right))
offset = 0
while offset < limit and left[offset] == right[offset]:
offset += 1
return offset
def common_suffix_len(left: bytes, right: bytes) -> int:
limit = min(len(left), len(right))
offset = 0
while offset < limit and left[-1 - offset] == right[-1 - offset]:
offset += 1
return offset
def write_family_decompile_artifact(
out_dir: pathlib.Path,
parsed_class_chunks: list[ExtractedChunk],
rows_by_entry: dict[int, list[ClassEventRow]],
raw_data_by_entry: dict[int, bytes],
repeated_status_by_row: dict[tuple[int, int], str],
spec: FamilyArtifactSpec,
) -> None:
family_labels = set(spec.labels)
family_chunks = [chunk for chunk in parsed_class_chunks if chunk.primary_label in family_labels]
if not family_chunks:
return
family_chunks.sort(key=lambda chunk: chunk.primary_label or "")
tsv_lines = [
"entry_index\tclass_id\tclass_name\tslot\tevent_name_hint\traw_event_entry_word\traw_code_offset\tderived_body_start\tderived_body_end\tderived_body_length\trepeated_template_status\tbody_sha1\tbody_prefix_hex\tbody_suffix_hex"
]
md_lines = [
f"# {spec.title}",
"",
"This is a reversible per-class rendering derived directly from `class_event_index.tsv` plus the raw extracted chunk bytes.",
"ScummVM event labels remain hints only; the authoritative data here is the slot id, raw row bytes, and derived body window.",
"",
]
for chunk in family_chunks:
rows = [row for row in rows_by_entry.get(chunk.index, []) if row.raw_code_offset != 0]
if not rows:
continue
raw_data = raw_data_by_entry[chunk.index]
md_lines.extend([
f"## {chunk.primary_label}",
"",
"```yaml",
"class:",
f" entry_index: 0x{chunk.index:03X}",
f" class_id: 0x{chunk.class_id:X}",
f" class_name: {chunk.primary_label}",
f" class_object_index: 0x{chunk.object_index:X}",
f" raw_code_base_u32: 0x{chunk.raw_code_base_u32:X}",
f" code_base_minus_one: 0x{chunk.code_base_minus_one:X}",
f" conservative_event_count: {chunk.conservative_event_count}",
" events:",
])
for row in rows:
body = b""
if row.derived_body_start is not None and row.derived_body_end is not None:
body = raw_data[row.derived_body_start:row.derived_body_end]
repeated_status = repeated_status_by_row.get((row.entry_index, row.slot), "")
body_sha1 = hashlib.sha1(body).hexdigest() if body else ""
md_lines.extend([
f" - slot: 0x{row.slot:02x}",
f" event_name_hint: {row.event_name_hint or ''}",
f" raw_event_entry_word: 0x{row.raw_event_entry_word:04x}",
f" raw_code_offset: 0x{row.raw_code_offset:08x}",
f" derived_body_start: {format_optional_hex(row.derived_body_start, 4).lower() or 'null'}",
f" derived_body_end: {format_optional_hex(row.derived_body_end, 4).lower() or 'null'}",
f" derived_body_length: {row.derived_body_length if row.derived_body_length is not None else 'null'}",
f" repeated_template_status: {repeated_status or 'unique-or-unclassified'}",
f" body_sha1: {body_sha1 or 'null'}",
f" body_prefix_hex: {hex_edge(body) or 'null'}",
f" body_suffix_hex: {hex_tail(body) or 'null'}",
])
tsv_lines.append(
"{entry_index}\t0x{class_id:X}\t{class_name}\t0x{slot:02X}\t{event_name_hint}\t0x{raw_event_entry_word:04X}\t0x{raw_code_offset:08X}\t{derived_body_start}\t{derived_body_end}\t{derived_body_length}\t{repeated_template_status}\t{body_sha1}\t{body_prefix_hex}\t{body_suffix_hex}".format(
entry_index=row.entry_index,
class_id=row.class_id,
class_name=chunk.primary_label or "",
slot=row.slot,
event_name_hint=row.event_name_hint or "",
raw_event_entry_word=row.raw_event_entry_word,
raw_code_offset=row.raw_code_offset,
derived_body_start=format_optional_hex(row.derived_body_start, 4),
derived_body_end=format_optional_hex(row.derived_body_end, 4),
derived_body_length=(row.derived_body_length if row.derived_body_length is not None else ""),
repeated_template_status=repeated_status,
body_sha1=body_sha1,
body_prefix_hex=hex_edge(body),
body_suffix_hex=hex_tail(body),
)
)
md_lines.extend([
"```",
"",
])
(out_dir / f"{spec.output_stem}.md").write_text("\n".join(md_lines), encoding="utf-8")
(out_dir / f"{spec.output_stem}.tsv").write_text("\n".join(tsv_lines) + "\n", encoding="utf-8")
def validate_verified_repeated_family_regressions(
parsed_class_chunks: list[ExtractedChunk],
rows_by_entry: dict[int, list[ClassEventRow]],
repeated_status_by_row: dict[tuple[int, int], str],
) -> list[str]:
chunk_by_label = {
chunk.primary_label: chunk
for chunk in parsed_class_chunks
if chunk.primary_label
}
expected_slots_by_class: dict[str, set[int]] = {}
for expectation in VERIFIED_REPEATED_FAMILY_ROW_EXPECTATIONS:
expected_slots_by_class.setdefault(expectation.class_name, set()).add(expectation.slot)
report_lines = [
"record_type\tclass_name\tslot\texpected\tactual\tstatus"
]
errors: list[str] = []
for class_name, expected_slots in sorted(expected_slots_by_class.items()):
chunk = chunk_by_label.get(class_name)
actual_slots: set[int] = set()
if chunk is not None:
actual_slots = {
row.slot
for row in rows_by_entry.get(chunk.index, [])
if row.raw_code_offset != 0
}
status = "ok" if actual_slots == expected_slots else "mismatch"
report_lines.append(
"slot-set\t{class_name}\t*\t{expected}\t{actual}\t{status}".format(
class_name=class_name,
expected=",".join(f"0x{slot:02X}" for slot in sorted(expected_slots)),
actual=",".join(f"0x{slot:02X}" for slot in sorted(actual_slots)),
status=status,
)
)
if status != "ok":
errors.append(
f"{class_name}: expected non-zero slots {sorted(expected_slots)}, found {sorted(actual_slots)}"
)
for expectation in VERIFIED_REPEATED_FAMILY_ROW_EXPECTATIONS:
chunk = chunk_by_label.get(expectation.class_name)
if chunk is None:
errors.append(f"missing repeated-family class {expectation.class_name}")
report_lines.append(
f"row\t{expectation.class_name}\t0x{expectation.slot:02X}\tpresent\tmissing-class\tmismatch"
)
continue
row = next(
(candidate for candidate in rows_by_entry.get(chunk.index, []) if candidate.slot == expectation.slot),
None,
)
if row is None:
errors.append(f"missing row {expectation.class_name} slot 0x{expectation.slot:02X}")
report_lines.append(
f"row\t{expectation.class_name}\t0x{expectation.slot:02X}\tpresent\tmissing-row\tmismatch"
)
continue
actual_values = (
row.raw_event_entry_word,
row.raw_code_offset,
row.derived_body_start,
row.derived_body_end,
row.derived_body_length,
repeated_status_by_row.get((row.entry_index, row.slot), ""),
)
expected_values = (
expectation.raw_event_entry_word,
expectation.raw_code_offset,
expectation.derived_body_start,
expectation.derived_body_end,
expectation.derived_body_length,
expectation.repeated_template_status,
)
status = "ok" if actual_values == expected_values else "mismatch"
report_lines.append(
"row\t{class_name}\t0x{slot:02X}\t{expected}\t{actual}\t{status}".format(
class_name=expectation.class_name,
slot=expectation.slot,
expected="|".join(
[
f"0x{expectation.raw_event_entry_word:04X}",
f"0x{expectation.raw_code_offset:08X}",
f"0x{expectation.derived_body_start:04X}",
f"0x{expectation.derived_body_end:04X}",
str(expectation.derived_body_length),
expectation.repeated_template_status,
]
),
actual="|".join(
[
f"0x{row.raw_event_entry_word:04X}",
f"0x{row.raw_code_offset:08X}",
format_optional_hex(row.derived_body_start, 4),
format_optional_hex(row.derived_body_end, 4),
str(row.derived_body_length if row.derived_body_length is not None else ""),
repeated_status_by_row.get((row.entry_index, row.slot), ""),
]
),
status=status,
)
)
if status != "ok":
errors.append(
"{class_name} slot 0x{slot:02X}: expected {expected}, found {actual}".format(
class_name=expectation.class_name,
slot=expectation.slot,
expected=expected_values,
actual=actual_values,
)
)
if errors:
raise ValueError(
"repeated-family regression mismatch:\n- " + "\n- ".join(errors)
)
return report_lines
def write_immortality_target_body_scan(
out_dir: pathlib.Path,
parsed_class_chunks: list[ExtractedChunk],
rows_by_entry: dict[int, list[ClassEventRow]],
raw_data_by_entry: dict[int, bytes],
) -> None:
chunk_by_label = {
chunk.primary_label: chunk
for chunk in parsed_class_chunks
if chunk.primary_label
}
scan_patterns = (
("le16_0410", struct.pack("<H", 0x0410)),
("le32_00000410", struct.pack("<I", 0x00000410)),
("le16_1004", struct.pack("<H", 0x1004)),
)
body_records: list[dict[str, object]] = []
comparison_records: list[tuple[int, int, int, str, int, int, str, int, int]] = []
compare_bodies: list[tuple[str, int, bytes]] = []
for label in IMMORTALITY_TARGET_LABELS:
chunk = chunk_by_label.get(label)
if chunk is None:
continue
raw_data = raw_data_by_entry.get(chunk.index)
if raw_data is None:
continue
for row in rows_by_entry.get(chunk.index, []):
if row.raw_code_offset == 0:
continue
if row.derived_body_start is None or row.derived_body_end is None:
continue
body = raw_data[row.derived_body_start:row.derived_body_end]
pattern_hits = {
pattern_name: find_all_offsets(body, needle)
for pattern_name, needle in scan_patterns
}
body_records.append(
{
"entry_index": row.entry_index,
"class_name": label,
"slot": row.slot,
"event_name_hint": row.event_name_hint or "",
"body_start": row.derived_body_start,
"body_end": row.derived_body_end,
"body_length": row.derived_body_length or len(body),
"first16": body[:16].hex(),
"last16": body[-16:].hex(),
"hits": pattern_hits,
}
)
if label in IMMORTALITY_TEMPLATE_COMPARE_LABELS:
compare_bodies.append((label, row.slot, body))
for left_index, left in enumerate(compare_bodies):
left_label, left_slot, left_body = left
for right in compare_bodies[left_index + 1:]:
right_label, right_slot, right_body = right
prefix = common_prefix_len(left_body, right_body)
suffix = common_suffix_len(left_body, right_body)
comparison_records.append(
(
prefix + suffix,
prefix,
suffix,
left_label,
left_slot,
len(left_body),
right_label,
right_slot,
len(right_body),
)
)
comparison_records.sort(reverse=True)
tsv_lines = [
"entry_index\tclass_name\tslot\tevent_name_hint\tbody_start\tbody_end\tbody_length\tle16_0410_count\tle16_0410_offsets\tle32_00000410_count\tle32_00000410_offsets\tle16_1004_count\tle16_1004_offsets\tbody_prefix_hex\tbody_suffix_hex"
]
for record in body_records:
hits = record["hits"]
tsv_lines.append(
"{entry_index}\t{class_name}\t0x{slot:02X}\t{event_name_hint}\t0x{body_start:04X}\t0x{body_end:04X}\t{body_length}\t{le16_count}\t{le16_offsets}\t{le32_count}\t{le32_offsets}\t{be16_count}\t{be16_offsets}\t{first16}\t{last16}".format(
entry_index=record["entry_index"],
class_name=record["class_name"],
slot=record["slot"],
event_name_hint=record["event_name_hint"],
body_start=record["body_start"],
body_end=record["body_end"],
body_length=record["body_length"],
le16_count=len(hits["le16_0410"]),
le16_offsets=",".join(f"0x{offset:04X}" for offset in hits["le16_0410"]),
le32_count=len(hits["le32_00000410"]),
le32_offsets=",".join(f"0x{offset:04X}" for offset in hits["le32_00000410"]),
be16_count=len(hits["le16_1004"]),
be16_offsets=",".join(f"0x{offset:04X}" for offset in hits["le16_1004"]),
first16=record["first16"],
last16=record["last16"],
)
)
(out_dir / "immortality_target_body_scan.tsv").write_text("\n".join(tsv_lines) + "\n", encoding="utf-8")
md_lines = [
"# Immortality Target Body Scan",
"",
"This report is a focused follow-up on the player-trigger immortality lane.",
"It scans the current highest-value EUSECODE candidates for inline `0x410` literals and compares the strongest active-event template bodies.",
"",
]
any_literal_hits = any(
record["hits"]["le16_0410"] or record["hits"]["le32_00000410"] or record["hits"]["le16_1004"]
for record in body_records
)
if any_literal_hits:
md_lines.append("- At least one target body contains an inline `0x410`-adjacent literal; inspect the TSV rows with non-zero hit counts.")
else:
md_lines.append("- No scanned target body contains inline little-endian `0x0410`, inline dword `0x00000410`, or byte-swapped `0x1004` literals.")
md_lines.append("- `EVENT` remains the widest unresolved active-event frontier because it still exposes one monolithic slot-`0x0A` body (`8150` bytes) with no finer body split yet.")
md_lines.append("- `NPCTRIG` remains the strongest compact player-trigger frontier because it is event-bearing and has two non-zero bodies (`0x0A`, `0x20`) but still no inline `0x410` literal.")
md_lines.append("- `_BOOT` event cores (`COR_BOOT`, `REE_BOOT`) remain near-template event families rather than special immortality emitters: their best pairings share only short common prefixes plus shared suffix-heavy tails.")
md_lines.append("- `SPECIAL` and `TRIGPAD` stay negative controls here: callable bodies exist, but the new literal scan still shows no inline `0x410` evidence.")
md_lines.append("")
md_lines.append("## Body Rows")
md_lines.append("")
md_lines.append("| Class | Slot | Hint | Body Range | Len | `0x0410` hits | `0x00000410` hits | `0x1004` hits | Prefix | Suffix |")
md_lines.append("|---|---:|---|---|---:|---|---|---|---|---|")
for record in body_records:
hits = record["hits"]
md_lines.append(
"| {class_name} | `0x{slot:02X}` | {event_name_hint} | `0x{body_start:04X}..0x{body_end:04X}` | {body_length} | {le16_count}:{le16_offsets} | {le32_count}:{le32_offsets} | {be16_count}:{be16_offsets} | `{first16}` | `{last16}` |".format(
class_name=record["class_name"],
slot=record["slot"],
event_name_hint=record["event_name_hint"] or "-",
body_start=record["body_start"],
body_end=record["body_end"],
body_length=record["body_length"],
le16_count=len(hits["le16_0410"]),
le16_offsets=",".join(f"0x{offset:04X}" for offset in hits["le16_0410"]) or "-",
le32_count=len(hits["le32_00000410"]),
le32_offsets=",".join(f"0x{offset:04X}" for offset in hits["le32_00000410"]) or "-",
be16_count=len(hits["le16_1004"]),
be16_offsets=",".join(f"0x{offset:04X}" for offset in hits["le16_1004"]) or "-",
first16=record["first16"],
last16=record["last16"],
)
)
md_lines.extend([
"",
"## Strongest Template Pairings",
"",
"These comparisons are limited to `COR_BOOT`, `REE_BOOT`, `NPCTRIG`, and `SFXTRIG` because they are the current highest-value active-event families near the immortality frontier.",
"",
"| Left | Right | Prefix | Suffix | Total |",
"|---|---|---:|---:|---:|",
])
for total, prefix, suffix, left_label, left_slot, left_len, right_label, right_slot, right_len in comparison_records[:12]:
md_lines.append(
f"| {left_label} `0x{left_slot:02X}` (`{left_len}`) | {right_label} `0x{right_slot:02X}` (`{right_len}`) | {prefix} | {suffix} | {total} |"
)
(out_dir / "immortality_target_body_scan.md").write_text("\n".join(md_lines) + "\n", encoding="utf-8")
def read_ascii_cstring(data: bytes, start: int, max_len: int = 48) -> tuple[str, int] | None:
end = start
limit = min(len(data), start + max_len)
while end < limit and data[end] != 0:
byte = data[end]
if not (0x20 <= byte <= 0x7E):
return None
end += 1
if end >= len(data) or end == start or data[end] != 0:
return None
return data[start:end].decode("latin-1"), end + 1
def parse_body_open_header(body: bytes) -> dict[str, object] | None:
if len(body) < 7:
return None
if body[0] == 0x5A and body[2] == 0x5C:
open_arg = body[1]
target_offset = 3
label_offset = 5
elif body[1] == 0x5C:
open_arg = body[0]
target_offset = 1
label_offset = 3
else:
return None
label_result = read_ascii_cstring(body, label_offset)
if label_result is None:
return None
label, offset = label_result
while offset < len(body) and body[offset] == 0:
offset += 1
event_code = body[offset + 1] if offset + 1 < len(body) and body[offset] == 0x0B else None
return {
"open_arg": open_arg,
"target": read_u16_le(body, target_offset),
"label": label,
"event_code": event_code,
}
def find_labeled_subheaders(body: bytes, label: str) -> list[tuple[int, int]]:
offsets: list[tuple[int, int]] = []
label_bytes = label.encode("latin-1")
marker = b"\x53\x5c"
search_start = 0
while True:
found = body.find(marker, search_start)
if found < 0 or found + 4 >= len(body):
return offsets
if body[found + 4:found + 4 + len(label_bytes)] == label_bytes:
offsets.append((found, read_u16_le(body, found + 2)))
search_start = found + 1
def scan_body_field_tokens(body: bytes, tail_window: int | None = None) -> list[str]:
tokens: list[str] = []
seen: set[str] = set()
start = max(0, len(body) - tail_window) if tail_window is not None else 0
for offset in range(start, len(body) - 4):
if body[offset] not in {0x24, 0x69}:
continue
field_result = read_ascii_cstring(body, offset + 3)
if field_result is None:
continue
field_name, _ = field_result
token = f"{body[offset]:02X}:{read_u16_le(body, offset + 1):04X}->{field_name}"
if token not in seen:
seen.add(token)
tokens.append(token)
return tokens
def format_offset_list(offsets: list[int], limit: int = 10) -> str:
if not offsets:
return ""
rendered = ",".join(f"0x{offset:04X}" for offset in offsets[:limit])
if len(offsets) > limit:
rendered += ",..."
return rendered
def scan_body_field_offsets(body: bytes, tail_window: int | None = None) -> list[tuple[int, str]]:
tokens: list[tuple[int, str]] = []
seen: set[tuple[int, str]] = set()
start = max(0, len(body) - tail_window) if tail_window is not None else 0
for offset in range(start, len(body) - 4):
if body[offset] not in {0x24, 0x69}:
continue
field_result = read_ascii_cstring(body, offset + 3)
if field_result is None:
continue
field_name, _ = field_result
token = f"{body[offset]:02X}:{read_u16_le(body, offset + 1):04X}->{field_name}"
entry = (offset, token)
if entry in seen:
continue
seen.add(entry)
tokens.append(entry)
return tokens
def count_offsets_in_range(offsets: list[int], start: int, end: int) -> int:
return sum(1 for offset in offsets if start <= offset < end)
def relative_offsets_in_range(offsets: list[int], start: int, end: int) -> list[int]:
return [offset - start for offset in offsets if start <= offset < end]
def format_relative_offsets(offsets: list[int], limit: int = 8) -> str:
if not offsets:
return "-"
rendered = ",".join(f"+0x{offset:02X}" for offset in offsets[:limit])
if len(offsets) > limit:
rendered += ",..."
return rendered
def find_repeated_windows(body: bytes, size: int, min_count: int = 2, max_results: int = 6) -> list[tuple[bytes, list[int]]]:
if size <= 0 or len(body) < size:
return []
offsets_by_window: dict[bytes, list[int]] = {}
for offset in range(0, len(body) - size + 1):
window = body[offset:offset + size]
if window.count(0) == len(window):
continue
offsets_by_window.setdefault(window, []).append(offset)
repeated = [
(window, offsets)
for window, offsets in offsets_by_window.items()
if len(offsets) >= min_count
]
repeated.sort(key=lambda item: (-len(item[1]), item[1][0], item[0]))
return repeated[:max_results]
def format_hex_window(window: bytes) -> str:
return " ".join(f"{byte:02X}" for byte in window)
def build_npctrig_clause_segments(
body: bytes,
subheaders: list[tuple[int, int]],
) -> tuple[list[tuple[str, int, int]], int]:
first_subheader = subheaders[0][0] if subheaders else 0
tail_fields = scan_body_field_offsets(body, tail_window=min(len(body), 192))
tail_start = tail_fields[0][0] if tail_fields else len(body)
if tail_start <= first_subheader:
tail_start = len(body)
segments: list[tuple[str, int, int]] = []
if first_subheader > 0:
segments.append(("prefix", 0, first_subheader))
for index, (start, _) in enumerate(subheaders):
next_start = subheaders[index + 1][0] if index + 1 < len(subheaders) else tail_start
segments.append((f"clause_{index + 1}", start, next_start))
if tail_start < len(body):
segments.append(("tail", tail_start, len(body)))
return segments, tail_start
def write_npctrig_clause_report(
out_dir: pathlib.Path,
parsed_class_chunks: list[ExtractedChunk],
rows_by_entry: dict[int, list[ClassEventRow]],
raw_data_by_entry: dict[int, bytes],
) -> None:
chunk = next((candidate for candidate in parsed_class_chunks if candidate.primary_label == "NPCTRIG"), None)
if chunk is None:
return
raw_data = raw_data_by_entry.get(chunk.index)
if raw_data is None:
return
records: list[dict[str, object]] = []
clause_motif_names = ["subheader_53_5c", "branch_3f_0a", "writeback_57_02", "push_24_51", "field_4b_fe_0f"]
for row in rows_by_entry.get(chunk.index, []):
if row.raw_code_offset == 0 or row.derived_body_start is None or row.derived_body_end is None:
continue
body = raw_data[row.derived_body_start:row.derived_body_end]
header = parse_body_open_header(body)
subheaders = find_labeled_subheaders(body, "NPCTRIG")
segments, tail_start = build_npctrig_clause_segments(body, subheaders)
motif_hits = {
motif_name: find_all_offsets(body, motif_bytes)
for motif_name, motif_bytes in IMMORTALITY_BODY_MOTIFS
}
repeated_windows_8 = find_repeated_windows(body, 8)
repeated_windows_6 = find_repeated_windows(body, 6)
tail_fields = scan_body_field_offsets(body, tail_window=min(len(body), 192))
segment_rows: list[dict[str, object]] = []
for segment_name, start, end in segments:
segment_body = body[start:end]
labels = [offset for offset in find_all_offsets(segment_body, bytes.fromhex("5B"))]
motif_offsets = {
motif_name: relative_offsets_in_range(motif_hits[motif_name], start, end)
for motif_name in clause_motif_names
}
segment_rows.append(
{
"segment": segment_name,
"start": start,
"end": end,
"length": end - start,
"prefix_hex": hex_edge(segment_body, width=16),
"suffix_hex": hex_tail(segment_body, width=12),
"local_labels": [start + offset for offset in labels[:8]],
"motif_counts": {
motif_name: count_offsets_in_range(motif_hits[motif_name], start, end)
for motif_name in clause_motif_names
},
"motif_offsets": motif_offsets,
}
)
subheader_offset_deltas = [subheaders[index + 1][0] - subheaders[index][0] for index in range(len(subheaders) - 1)]
subheader_target_deltas = [subheaders[index + 1][1] - subheaders[index][1] for index in range(len(subheaders) - 1)]
uniform_stride = subheader_offset_deltas[0] if subheader_offset_deltas and len(set(subheader_offset_deltas)) == 1 else None
full_clause_segments = [
segment
for segment in segment_rows
if segment["segment"].startswith("clause_")
and segment["motif_counts"]["push_24_51"]
and segment["motif_counts"]["writeback_57_02"]
]
records.append(
{
"slot": row.slot,
"event_name_hint": row.event_name_hint or "",
"body_length": len(body),
"header": header,
"subheaders": subheaders,
"subheader_offset_deltas": subheader_offset_deltas,
"subheader_target_deltas": subheader_target_deltas,
"segments": segment_rows,
"tail_start": tail_start,
"tail_fields": tail_fields,
"repeated_windows_8": repeated_windows_8,
"repeated_windows_6": repeated_windows_6,
"has_writeback": bool(motif_hits["writeback_57_02"]),
"has_push_2451": bool(motif_hits["push_24_51"]),
"field_4b_fe_0f_count": len(motif_hits["field_4b_fe_0f"]),
"uniform_stride": uniform_stride,
"full_clause_count": len(full_clause_segments),
"selector_offsets": [offset for offset, _ in subheaders],
"selector_targets": [target for _, target in subheaders],
}
)
if not records:
return
tsv_lines = [
"slot\tevent_name_hint\tbody_length\theader_target\theader_event_code\tsubheader_offsets\tsubheader_targets\tsubheader_offset_deltas\tsubheader_target_deltas\tuniform_stride\tfull_clause_count\ttail_start\thas_writeback\thas_push_2451\tfield_4b_fe_0f_count\trepeated_windows_8\trepeated_windows_6"
]
for record in records:
header = record["header"] or {}
tsv_lines.append(
"0x{slot:02X}\t{event_name_hint}\t{body_length}\t{header_target}\t{header_event_code}\t{subheader_offsets}\t{subheader_targets}\t{subheader_offset_deltas}\t{subheader_target_deltas}\t{uniform_stride}\t{full_clause_count}\t0x{tail_start:04X}\t{has_writeback}\t{has_push_2451}\t{field_4b_fe_0f_count}\t{repeated_windows_8}\t{repeated_windows_6}".format(
slot=record["slot"],
event_name_hint=record["event_name_hint"],
body_length=record["body_length"],
header_target=(f"0x{header['target']:04X}" if header else ""),
header_event_code=(f"0x{header['event_code']:02X}" if header and header.get("event_code") is not None else ""),
subheader_offsets=",".join(f"0x{offset:04X}" for offset, _ in record["subheaders"]),
subheader_targets=",".join(f"0x{target:04X}" for _, target in record["subheaders"]),
subheader_offset_deltas=",".join(f"0x{delta:02X}" for delta in record["subheader_offset_deltas"]),
subheader_target_deltas=",".join(f"0x{delta & 0xFFFF:04X}" for delta in record["subheader_target_deltas"]),
uniform_stride=(f"0x{record['uniform_stride']:02X}" if record["uniform_stride"] is not None else ""),
full_clause_count=record["full_clause_count"],
tail_start=record["tail_start"],
has_writeback="yes" if record["has_writeback"] else "no",
has_push_2451="yes" if record["has_push_2451"] else "no",
field_4b_fe_0f_count=record["field_4b_fe_0f_count"],
repeated_windows_8=";".join(
f"{window.hex()}@{','.join(f'0x{offset:04X}' for offset in offsets)}"
for window, offsets in record["repeated_windows_8"]
),
repeated_windows_6=";".join(
f"{window.hex()}@{','.join(f'0x{offset:04X}' for offset in offsets)}"
for window, offsets in record["repeated_windows_6"]
),
)
)
(out_dir / "immortality_npctrig_clauses.tsv").write_text("\n".join(tsv_lines) + "\n", encoding="utf-8")
md_lines = [
"# Immortality NPCTRIG Clauses",
"",
"This report focuses on the surviving compact NPCTRIG frontier and splits the extracted slot bodies into prefix, clause, and tail regions.",
"It is intended to make the slot `0x0A` versus slot `0x20` difference explicit enough to compare against the runtime-side slot-`0x0A` consumer path.",
"",
]
for record in records:
header = record["header"] or {}
md_lines.extend([
f"## NPCTRIG slot `0x{record['slot']:02X}`",
"",
f"- Event hint: `{record['event_name_hint'] or '-'}`.",
f"- Open header: `0x5A 0x{header['open_arg']:02X} 0x5C 0x{header['target']:04X}` -> `NPCTRIG` with event-code byte `{f'0x{header['event_code']:02X}' if header.get('event_code') is not None else '-'}`." if header else "- Open header: not recognized.",
f"- First tail-field offset: `0x{record['tail_start']:04X}`.",
f"- Subheader offsets: {', '.join(f'`0x{offset:04X}`' for offset, _ in record['subheaders']) or '`-`'}.",
f"- Subheader targets: {', '.join(f'`0x{target:04X}`' for _, target in record['subheaders']) or '`-`'}.",
f"- Subheader offset deltas: {', '.join(f'`0x{delta:02X}`' for delta in record['subheader_offset_deltas']) or '`-`'}.",
f"- Subheader target deltas: {', '.join(f'`0x{delta & 0xFFFF:04X}`' for delta in record['subheader_target_deltas']) or '`-`'}.",
f"- Runtime-shape motifs: `writeback_57_02={'yes' if record['has_writeback'] else 'no'}`, `push_24_51={'yes' if record['has_push_2451'] else 'no'}`, `field_4b_fe_0f={record['field_4b_fe_0f_count']}`.",
"",
"| Segment | Range | Len | Local Labels | Subheaders | Branch 3F 0A | Writeback 57 02 | Push 24 51 | Field 4B FE 0F | Motif Offsets | Prefix | Suffix |",
"|---|---|---:|---|---:|---:|---:|---:|---:|---|---|---|",
])
for segment in record["segments"]:
motif_counts = segment["motif_counts"]
motif_offsets = segment["motif_offsets"]
motif_offset_render = "; ".join(
f"{motif_name}={format_relative_offsets(offsets)}"
for motif_name, offsets in motif_offsets.items()
if offsets
) or "-"
label_render = ",".join(f"0x{offset:04X}" for offset in segment["local_labels"]) or "-"
md_lines.append(
"| {segment} | `0x{start:04X}..0x{end:04X}` | {length} | `{labels}` | {subheaders} | {branch} | {writeback} | {push_2451} | {field_4b_fe_0f} | `{motif_offsets}` | `{prefix}` | `{suffix}` |".format(
segment=segment["segment"],
start=segment["start"],
end=segment["end"],
length=segment["length"],
labels=label_render,
subheaders=motif_counts["subheader_53_5c"],
branch=motif_counts["branch_3f_0a"],
writeback=motif_counts["writeback_57_02"],
push_2451=motif_counts["push_24_51"],
field_4b_fe_0f=motif_counts["field_4b_fe_0f"],
motif_offsets=motif_offset_render,
prefix=segment["prefix_hex"],
suffix=segment["suffix_hex"],
)
)
md_lines.extend([
"",
"Repeated windows (8-byte):",
"",
])
for window, offsets in record["repeated_windows_8"]:
md_lines.append(
f"- `{format_hex_window(window)}` at {', '.join(f'`0x{offset:04X}`' for offset in offsets)}"
)
md_lines.extend([
"",
"Repeated windows (6-byte):",
"",
])
for window, offsets in record["repeated_windows_6"]:
md_lines.append(
f"- `{format_hex_window(window)}` at {', '.join(f'`0x{offset:04X}`' for offset in offsets)}"
)
md_lines.extend([
"",
"Runtime-fit candidates:",
"",
f"- Candidate clause selector starts: {', '.join(f'`0x{offset:04X}`' for offset in record['selector_offsets']) or '`-`'}.",
f"- Candidate clause selector targets: {', '.join(f'`0x{target:04X}`' for target in record['selector_targets']) or '`-`'}.",
f"- Uniform selector stride: `{f'0x{record['uniform_stride']:02X}' if record['uniform_stride'] is not None else '-'}`; full clauses carrying both `push_24_51` and `writeback_57_02`: `{record['full_clause_count']}`.",
"- Runtime side anchor: `000d:5572` proves the wrapper extra word is additive (`entity_vm_slot_load_value(...) + offset`), while `000d:21ed -> 000d:2433` copies one inline blob, reads two signed metadata bytes, then consumes a word matrix where byte A controls the lead-word row count and byte B controls the shared target-list width.",
"",
"Tail field offsets:",
"",
])
for offset, token in record["tail_fields"]:
md_lines.append(f"- `0x{offset:04X}` -> `{token}`")
md_lines.append("")
slot_0a = next((record for record in records if record["slot"] == 0x0A), None)
slot_20 = next((record for record in records if record["slot"] == 0x20), None)
if slot_0a and slot_20:
slot_0a_header = slot_0a["header"] or {}
slot_20_header = slot_20["header"] or {}
md_lines.extend([
"## Current Read",
"",
f"- Slot `0x0A` now reads as a repeated clause ladder, not a monolithic blob: `{len(slot_0a['subheaders'])}` subheaders sit on a uniform `{', '.join(f'0x{delta:02X}' for delta in slot_0a['subheader_offset_deltas']) or '-'}` byte stride, and their targets walk backward by `{', '.join(f'0x{delta & 0xFFFF:04X}' for delta in slot_0a['subheader_target_deltas']) or '-'}`. Each clause block carries one `branch_3f_0a`, one `push_24_51`, and one `writeback_57_02`, which fits an event-bearing clause stream better than a pure type filter.",
f"- Slot `0x20` is structurally different even before the tail fields: its open event-code byte is `{f'0x{slot_20_header['event_code']:02X}' if slot_20_header.get('event_code') is not None else '-'}` instead of `{f'0x{slot_0a_header['event_code']:02X}' if slot_0a_header.get('event_code') is not None else '-'}`, it has only one class-labelled subheader, no `writeback_57_02`, no `push_24_51`, and `{slot_20['field_4b_fe_0f_count']}` `field_4b_fe_0f` hits concentrated around repeated `0x0A 00/05 4B FE 0F ...` windows. That is a materially better fit for a typed gate or setup/attachment body than for the live event-emission ladder.",
"- This split matches the current runtime-side bridge better than the previous undifferentiated frontier. The verified slot-`0x0A` wrapper `0005:2c35` seeds mask `0x0400`, slot `0x0A`, and one additive word that `000d:5572` applies directly to the loaded slot value before `000d:21ed` consumes the result. The exact `000d:21ed -> 000d:22bc` contract is now narrower too: after copying the inline blob it reads two signed bytes, uses byte A as the lead-word row count, uses byte B as the shared target-list width, performs `A x B` `entity_link` calls, and pushes back only non-`0x0400` words. `NPCTRIG slot 0x0A` is the only surviving compact body here with a natural five-row selector family (`5` evenly spaced clause starts at stride `0x2F`), while slot `0x20` offers only one clause and no matching writeback/push motif.",
])
(out_dir / "immortality_npctrig_clauses.md").write_text("\n".join(md_lines) + "\n", encoding="utf-8")
def write_immortality_body_structure_report(
out_dir: pathlib.Path,
parsed_class_chunks: list[ExtractedChunk],
rows_by_entry: dict[int, list[ClassEventRow]],
raw_data_by_entry: dict[int, bytes],
) -> None:
chunk_by_label = {
chunk.primary_label: chunk
for chunk in parsed_class_chunks
if chunk.primary_label in IMMORTALITY_STRUCTURAL_TARGET_LABELS
}
records: list[dict[str, object]] = []
for label in sorted(IMMORTALITY_STRUCTURAL_TARGET_LABELS):
chunk = chunk_by_label.get(label)
if chunk is None:
continue
raw_data = raw_data_by_entry.get(chunk.index)
if raw_data is None:
continue
for row in rows_by_entry.get(chunk.index, []):
if row.raw_code_offset == 0 or row.derived_body_start is None or row.derived_body_end is None:
continue
body = raw_data[row.derived_body_start:row.derived_body_end]
header = parse_body_open_header(body)
subheaders = find_labeled_subheaders(body, label)
motif_hits = {
motif_name: find_all_offsets(body, motif_bytes)
for motif_name, motif_bytes in IMMORTALITY_BODY_MOTIFS
}
records.append(
{
"entry_index": row.entry_index,
"class_name": label,
"slot": row.slot,
"event_name_hint": row.event_name_hint or "",
"body_length": len(body),
"header": header,
"clause_terminators": body.count(0x7A),
"local_labels": body.count(0x5B),
"subheaders": subheaders,
"tail_fields": scan_body_field_tokens(body, tail_window=256),
"all_fields": scan_body_field_tokens(body),
"motif_hits": motif_hits,
}
)
tsv_lines = [
"entry_index\tclass_name\tslot\tevent_name_hint\tbody_length\theader_open_arg\theader_target\theader_label\theader_event_code\tclause_terminator_count\tlocal_label_count\tsubheader_count\tsubheader_targets\ttail_fields\tall_fields\tmotif_counts\tmotif_offsets"
]
for record in records:
header = record["header"] or {}
motif_hits = record["motif_hits"]
tsv_lines.append(
"{entry_index}\t{class_name}\t0x{slot:02X}\t{event_name_hint}\t{body_length}\t{header_open_arg}\t{header_target}\t{header_label}\t{header_event_code}\t{clause_terminators}\t{local_labels}\t{subheader_count}\t{subheader_targets}\t{tail_fields}\t{all_fields}\t{motif_counts}\t{motif_offsets}".format(
entry_index=record["entry_index"],
class_name=record["class_name"],
slot=record["slot"],
event_name_hint=record["event_name_hint"],
body_length=record["body_length"],
header_open_arg=(f"0x{header['open_arg']:02X}" if header else ""),
header_target=(f"0x{header['target']:04X}" if header else ""),
header_label=(header.get("label", "") if header else ""),
header_event_code=(f"0x{header['event_code']:02X}" if header and header.get("event_code") is not None else ""),
clause_terminators=record["clause_terminators"],
local_labels=record["local_labels"],
subheader_count=len(record["subheaders"]),
subheader_targets=",".join(
f"0x{offset:04X}->0x{target:04X}" for offset, target in record["subheaders"]
),
tail_fields=",".join(record["tail_fields"]),
all_fields=",".join(record["all_fields"]),
motif_counts=",".join(
f"{motif_name}:{len(motif_hits[motif_name])}" for motif_name, _ in IMMORTALITY_BODY_MOTIFS
),
motif_offsets=",".join(
f"{motif_name}={format_offset_list(motif_hits[motif_name])}" for motif_name, _ in IMMORTALITY_BODY_MOTIFS if motif_hits[motif_name]
),
)
)
(out_dir / "immortality_body_structure.tsv").write_text("\n".join(tsv_lines) + "\n", encoding="utf-8")
md_lines = [
"# Immortality Body Structure",
"",
"This report decodes one layer deeper than the literal scan for the surviving EVENT and NPCTRIG frontier.",
"It is still heuristic: the output is limited to repeatable byte grammar, subheader boundaries, field-tag trailers, and motif offsets that can be cross-checked against the 000d slot-backed runtime lane.",
"",
]
for record in records:
header = record["header"] or {}
motif_hits = record["motif_hits"]
md_lines.extend([
f"## {record['class_name']} slot `0x{record['slot']:02X}`",
"",
f"- Body length: `{record['body_length']}` bytes.",
f"- Open header: `0x5A 0x{header['open_arg']:02X} 0x5C 0x{header['target']:04X}` -> `{header.get('label', '')}` with embedded event-code byte `{f'0x{header['event_code']:02X}' if header.get('event_code') is not None else '-'}`." if header else "- Open header: not recognized by the current heuristic.",
f"- Clause terminators (`0x7A`): `{record['clause_terminators']}`; local labels (`0x5B`): `{record['local_labels']}`.",
f"- Internal labeled subheaders (`0x53 0x5C <u16> {record['class_name']}`): `{len(record['subheaders'])}` -> {', '.join(f'`0x{offset:04X}->0x{target:04X}`' for offset, target in record['subheaders'][:12]) or '`-`'}." ,
f"- Tail field tags: {', '.join(f'`{value}`' for value in record['tail_fields']) or '`-`' }.",
"",
"| Motif | Count | First Offsets |",
"|---|---:|---|",
])
for motif_name, _ in IMMORTALITY_BODY_MOTIFS:
offsets = motif_hits[motif_name]
md_lines.append(
f"| `{motif_name}` | {len(offsets)} | `{format_offset_list(offsets) or '-'}` |"
)
md_lines.append("")
event_slot_0a = next((record for record in records if record["class_name"] == "EVENT" and record["slot"] == 0x0A), None)
npctrig_slot_0a = next((record for record in records if record["class_name"] == "NPCTRIG" and record["slot"] == 0x0A), None)
npctrig_slot_20 = next((record for record in records if record["class_name"] == "NPCTRIG" and record["slot"] == 0x20), None)
if event_slot_0a and npctrig_slot_0a and npctrig_slot_20:
npctrig_slot_0a_header = npctrig_slot_0a.get("header") or {}
npctrig_slot_20_header = npctrig_slot_20.get("header") or {}
md_lines.extend([
"## Current Read",
"",
f"- `EVENT 0x0A` is the generic hub-shaped body: it has `{len(event_slot_0a['subheaders'])}` internal labeled subheaders and the widest field trailer (`{', '.join(event_slot_0a['tail_fields'])}`).",
f"- `NPCTRIG 0x0A` is the compact player-trigger candidate: it reuses the same class-labelled open header and subheader grammar, but it stays constrained to `{', '.join(npctrig_slot_0a['tail_fields'])}` instead of the wider EVENT field set.",
f"- `NPCTRIG 0x20` keeps the same constrained field set as `NPCTRIG 0x0A` and changes only the embedded prolog event-code byte (`{f'0x{npctrig_slot_20_header['event_code']:02X}' if npctrig_slot_20_header.get('event_code') is not None else '-'}` vs `{f'0x{npctrig_slot_0a_header['event_code']:02X}' if npctrig_slot_0a_header.get('event_code') is not None else '-'}`), which fits a variant trigger/setup lane better than a separate generic hub.",
"- The repeated `0x53 0x5C <u16> LABEL` subheaders and dense `0x5B <u16>` local labels make these bodies look like inline clause streams rather than single flat payloads, which is consistent with the `000d:21ed -> 000d:22bc` runtime lane that copies variable-length inline bytes first and only then consumes compact metadata bytes plus streamed words.",
"- The surviving slot focus is still `0x0A`: both EVENT and NPCTRIG expose non-zero slot-`0x0A` bodies, and the runtime side has an exact offset-specialized masked wrapper for slot `0x0A` at `0005:2c35` (`entity_vm_context_try_create_mask_0400_slot0a_with_offset`).",
])
(out_dir / "immortality_body_structure.md").write_text("\n".join(md_lines) + "\n", encoding="utf-8")
def readable_neighbor_chunks(
center: ExtractedChunk,
chunk_by_index: dict[int, ExtractedChunk],
total_chunks: int,
window: int,
) -> list[ExtractedChunk]:
neighbors: list[ExtractedChunk] = []
for neighbor_index in range(max(0, center.index - window), min(total_chunks, center.index + window + 1)):
if neighbor_index == center.index:
continue
neighbor = chunk_by_index[neighbor_index]
if not (event_tag_kind(neighbor) or has_referent_field(neighbor) or neighbor.primary_label == center.primary_label):
continue
neighbors.append(neighbor)
return neighbors
def unique_preserve_order(values: list[str]) -> list[str]:
seen: set[str] = set()
ordered: list[str] = []
for value in values:
if not value or value in seen:
continue
seen.add(value)
ordered.append(value)
return ordered
def section_runtime_ops(section_name: str) -> list[str]:
if section_name == "Callback trigger lane":
return [
"MATERIALIZE_OR_FORWARD_VALUE",
"PUSH_FRAME_WORD_LITERAL",
"COMPARE_STREAM_DWORD_AND_PUSH_BOOL",
"FINALIZE_MIXED_VALUE_TO_OUTPTR",
]
return [
"APPEND_UNIQUE_INLINE",
"APPEND_UNIQUE_INDIRECT",
"REMOVE_MATCHING_INDIRECT",
"REMOVE_MATCHING_INLINE",
"MATERIALIZE_OR_FORWARD_VALUE",
"PREPEND_INLINE_PAYLOAD",
"BUILD_ENTITY_LINK_MATRIX",
"EMIT_OR_PUSHBACK_RESULT",
"FINALIZE_MIXED_VALUE_TO_OUTPTR",
]
def section_mask_pairs(section_name: str) -> list[str]:
if section_name == "JELYHACK anchor lane":
return ["indirect-only active-event-biased ladder; no direct anchor-specific mask proven"]
if section_name == "Callback trigger lane":
return ["no callback-specific mask pair proven; current ladder favors active event carriers"]
return [
"0x0001:0000",
"0x0002:0001",
"0x0004:0002",
"0x0010:0004",
"0x0020:0005",
"0x0200:0009",
"0x0400:000a",
"0x0800:000b",
"0x1000:000c",
"0x2000:0015",
"0x4000:000e",
"0x8000:0007",
"0x8000:000f",
]
def section_bridge_note(section_name: str) -> str:
if section_name == "JELYHACK anchor lane":
return "Referent-only anchors are now readable as payload owners, but the current mask ladder still correlates more strongly with active-event descriptors than with anchor-only rows."
if section_name == "Callback trigger lane":
return "Callback/eventTrigger descriptors are structurally distinct from the active event lane, so the runtime bridge is still generic slot-backed context flow rather than a callback-specific opcode family."
if section_name == "EVENT hub lane":
return "This is the strongest current descriptor-side bridge into the active event runtime lane: the neighborhood contains explicit event cores and matches the proven payload-chain plus link-matrix VM behavior."
if section_name == "Environmental event lane":
return "Environmental descriptors share the same active event field grammar, so they likely ride the same generic VM event lane even though no hazard-specific opcode split is proven yet."
return "Descriptor-side and runtime-side evidence align only at the conservative family level."
def write_runtime_bridge_reports(
out_dir: pathlib.Path,
descriptor_chunks: list[ExtractedChunk],
chunk_by_index: dict[int, ExtractedChunk],
total_chunks: int,
) -> None:
vm_ir_lines = [
"stage_address\tir_name\topcode_or_lane\tpayload_shape\tevidence"
]
for row in VERIFIED_VM_IR_ROWS:
vm_ir_lines.append(
"{stage_address}\t{ir_name}\t{opcode_or_lane}\t{payload_shape}\t{evidence}".format(**row)
)
(out_dir / "runtime_vm_ir.tsv").write_text("\n".join(vm_ir_lines) + "\n", encoding="utf-8")
mask_lines = [
"wrapper_address\tmask_pair\tcaller_anchor\tdescriptor_bias"
]
for row in VERIFIED_MASK_LADDER_ROWS:
mask_lines.append(
"{wrapper_address}\t{mask_pair}\t{caller_anchor}\t{descriptor_bias}".format(**row)
)
(out_dir / "vm_mask_ladder.tsv").write_text("\n".join(mask_lines) + "\n", encoding="utf-8")
focus_sets = [
("jelyhack_anchor_attachment", "JELYHACK anchor lane", {"JELYHACK", "JELYH2"}, 8),
("event_hub_cluster", "EVENT hub lane", {"EVENT", "COR_BOOT", "NPCTRIG"}, 5),
("environmental_event_cluster", "Environmental event lane", {"FLAMEBOX", "NOSTRIL", "STEAMBOX"}, 5),
("callback_trigger_cluster", "Callback trigger lane", {"SURCAMNS", "SURCAMEW"}, 5),
]
script_tsv_lines = [
"template_id\tsection\tcenter_index\tcenter_label\tattach_labels\tcallback_labels\tneighbor_labels\tevent_families\truntime_ops\tmask_pairs\towner_source\tmirror_write\tselector_status\tbridge_note"
]
script_md_lines = [
"# Readable Script IR",
"",
"This report joins descriptor neighborhoods to the verified 000d VM/runtime lane.",
"It stays conservative: opcode-family and mask-family evidence is carried forward only where the binary proves it.",
"",
"## Verified Runtime Lane",
"",
"- Owner path: `entity_vm_runtime_init_from_path_if_configured -> entity_vm_runtime_create -> entity_vm_runtime_owner_resource_create`",
"- Slot source: `(+0x10/+0x12) + 0x0d*slot + 4` inside the runtime owner/resource object",
"- Context seed: `entity_vm_context_create_from_slot_index` copies that source into `+0xd6/+0xd8` and mirrors it to `0x39ca[slot]`",
"- Selector status: `0x19/0x1a/0x1b` are proven inside `000d:0988`; `0x18` is still implied, and the upstream seed into `[BP-0x32]` remains unresolved",
"",
"## Verified VM IR Operators",
"",
"| Stage | IR | Opcode / Lane | Payload Shape |",
"|---|---|---|---|",
]
for row in VERIFIED_VM_IR_ROWS:
script_md_lines.append(
"| {stage_address} | {ir_name} | {opcode_or_lane} | {payload_shape} |".format(**row)
)
script_md_lines.extend([
"",
"## Verified Mask Ladder",
"",
"| Wrapper | Mask | Caller Anchor | Descriptor Bias |",
"|---|---|---|---|",
])
for row in VERIFIED_MASK_LADDER_ROWS:
script_md_lines.append(
"| {wrapper_address} | {mask_pair} | {caller_anchor} | {descriptor_bias} |".format(**row)
)
owner_source = (
"000d:44df -> 000d:4c99 -> 000d:7000 -> (+0x10/+0x12) + 0x0d*slot + 4"
)
mirror_write = "entity_vm_context_create_from_slot_index writes the same source pair to 0x39ca[context_slot]"
selector_status = "0x19/0x1a/0x1b proven in 000d:0988; 0x18 implied; upstream [BP-0x32] seed unresolved"
for template_id, section_name, center_labels, window in focus_sets:
centers = [chunk for chunk in descriptor_chunks if chunk.primary_label in center_labels]
if not centers:
continue
script_md_lines.extend(["", f"## {section_name}", ""])
for center in centers:
neighbors = readable_neighbor_chunks(center, chunk_by_index, total_chunks, window)
attach_labels = unique_preserve_order([
clean_token(neighbor.primary_label or "")
for neighbor in neighbors
if event_tag_kind(neighbor) == "event"
])
callback_labels = unique_preserve_order([
clean_token(neighbor.primary_label or "")
for neighbor in neighbors
if event_tag_kind(neighbor) == "eventTrigger"
])
neighbor_labels = unique_preserve_order([
clean_token(neighbor.primary_label or "")
for neighbor in neighbors
if has_referent_field(neighbor) and event_tag_kind(neighbor) == ""
])
family_labels = unique_preserve_order([
classify_event_family(neighbor)
for neighbor in neighbors
])
runtime_ops = section_runtime_ops(section_name)
mask_pairs = section_mask_pairs(section_name)
bridge_note = section_bridge_note(section_name)
script_tsv_lines.append(
"{template_id}\t{section}\t{center_index}\t{center_label}\t{attach_labels}\t{callback_labels}\t{neighbor_labels}\t{event_families}\t{runtime_ops}\t{mask_pairs}\t{owner_source}\t{mirror_write}\t{selector_status}\t{bridge_note}".format(
template_id=template_id,
section=section_name,
center_index=center.index,
center_label=clean_token(center.primary_label or ""),
attach_labels=",".join(attach_labels),
callback_labels=",".join(callback_labels),
neighbor_labels=",".join(neighbor_labels),
event_families=",".join(family_labels),
runtime_ops=",".join(runtime_ops),
mask_pairs=",".join(mask_pairs),
owner_source=owner_source,
mirror_write=mirror_write,
selector_status=selector_status,
bridge_note=bridge_note,
)
)
script_md_lines.append(f"### {center.index}: {center.primary_label}")
script_md_lines.append("")
script_md_lines.append(f"Descriptor focus: `{readable_signature(center)}`")
script_md_lines.append("")
script_md_lines.append("Descriptor-side attachments:")
script_md_lines.append(f"- Active event neighbors: {', '.join(attach_labels) or 'none proven in window'}")
script_md_lines.append(f"- Callback neighbors: {', '.join(callback_labels) or 'none proven in window'}")
script_md_lines.append(f"- Referent-side neighbors: {', '.join(neighbor_labels) or 'none proven in window'}")
script_md_lines.append(f"- Event families present: {', '.join(family_labels) or 'none'}")
script_md_lines.append("")
script_md_lines.append("Runtime bridge:")
script_md_lines.append(f"- Runtime ops: {', '.join(runtime_ops)}")
script_md_lines.append(f"- Mask pairs: {', '.join(mask_pairs)}")
script_md_lines.append(f"- Owner source: {owner_source}")
script_md_lines.append(f"- Mirror write: {mirror_write}")
script_md_lines.append(f"- Selector status: {selector_status}")
script_md_lines.append(f"- Interpretation: {bridge_note}")
script_md_lines.append("")
script_md_lines.append("```text")
script_md_lines.append(readable_signature(center))
for label in attach_labels:
script_md_lines.append(f"attach {label}(...) # active event-bearing neighbor")
for label in callback_labels:
script_md_lines.append(f"callback {label}(...) # eventTrigger-bearing neighbor")
for label in neighbor_labels:
script_md_lines.append(f"near {label}(...) # referent-side local context")
script_md_lines.append("")
script_md_lines.append("vm_effect:")
for runtime_op in runtime_ops:
script_md_lines.append(f" {runtime_op}(...)" )
script_md_lines.append("```")
script_md_lines.append("")
(out_dir / "readable_script_ir.tsv").write_text("\n".join(script_tsv_lines) + "\n", encoding="utf-8")
(out_dir / "readable_script_ir.md").write_text("\n".join(script_md_lines), encoding="utf-8")
def chunk_bridge_family(chunk: ExtractedChunk) -> str:
event_family = classify_event_family(chunk)
if event_family:
return event_family
if chunk_role(chunk) == "referent-anchor":
return "referent-anchor"
return ""
def family_runtime_ops(family: str) -> list[str]:
if family == "callback-eventtrigger":
return [
"MATERIALIZE_OR_FORWARD_VALUE",
"PUSH_FRAME_WORD_LITERAL",
"COMPARE_STREAM_DWORD_AND_PUSH_BOOL",
"FINALIZE_MIXED_VALUE_TO_OUTPTR",
]
if family == "referent-anchor":
return [
"APPEND_UNIQUE_INLINE",
"APPEND_UNIQUE_INDIRECT",
"REMOVE_MATCHING_INDIRECT",
"REMOVE_MATCHING_INLINE",
"MATERIALIZE_OR_FORWARD_VALUE",
]
return [
"APPEND_UNIQUE_INLINE",
"APPEND_UNIQUE_INDIRECT",
"REMOVE_MATCHING_INDIRECT",
"REMOVE_MATCHING_INLINE",
"MATERIALIZE_OR_FORWARD_VALUE",
"PREPEND_INLINE_PAYLOAD",
"BUILD_ENTITY_LINK_MATRIX",
"EMIT_OR_PUSHBACK_RESULT",
"FINALIZE_MIXED_VALUE_TO_OUTPTR",
]
def family_mask_pairs(family: str) -> list[str]:
if family == "referent-anchor":
return ["anchor role uses referent registry and payload ownership; no anchor-specific slot mask proven"]
if family == "callback-eventtrigger":
return ["no callback-specific mask pair proven; verified ladder still favors active event carriers"]
return [
"0x0001:0000",
"0x0002:0001",
"0x0004:0002",
"0x0010:0004",
"0x0020:0005",
"0x0200:0009",
"0x0400:000a",
"0x0800:000b",
"0x1000:000c",
"0x2000:0015",
"0x4000:000e",
"0x8000:0007",
"0x8000:000f",
]
def family_bridge_metadata() -> tuple[dict[str, str], ...]:
return (
{
"lane_rank": "1",
"primary_runtime_lane": "active-event payload lane",
"descriptor_family": "event-hub",
"fit_strength": "strongest",
"confidence": "high",
"why": "Explicit 69:0A00 event tag plus the richest source/dest/door/link/time/counter payload shape; best current match for the VM payload-chain plus link-matrix lane.",
"exemplar": "EVENT",
},
{
"lane_rank": "2",
"primary_runtime_lane": "active-event payload lane",
"descriptor_family": "boot-event-core",
"fit_strength": "strong",
"confidence": "high",
"why": "All five _BOOT descriptors share one compact referent,event,counter,item schema and sit beside referent-heavy object islands that fit the same active-event runtime lane.",
"exemplar": "COR_BOOT",
},
{
"lane_rank": "3",
"primary_runtime_lane": "active-event payload lane",
"descriptor_family": "npc-trigger",
"fit_strength": "strong",
"confidence": "moderate-high",
"why": "NPCTRIG carries an explicit event field and sits in the same compact event-bearing core as EVENT and COR_BOOT, but its narrower field set makes it look more satellite than hub.",
"exemplar": "NPCTRIG",
},
{
"lane_rank": "4",
"primary_runtime_lane": "active-event payload lane",
"descriptor_family": "minimal-event-core",
"fit_strength": "moderate",
"confidence": "moderate",
"why": "SFXTRIG keeps the active event tag while stripping most side fields, so it still fits the live event lane but as a smaller attachment form rather than a full hub or boot core.",
"exemplar": "SFXTRIG",
},
{
"lane_rank": "5",
"primary_runtime_lane": "active-event payload lane",
"descriptor_family": "environmental-event",
"fit_strength": "moderate",
"confidence": "moderate",
"why": "FLAMEBOX, NOSTRIL, and STEAMBOX share the same active event grammar, but no hazard-specific opcode or mask split is proven yet beyond the generic active-event-biased ladder.",
"exemplar": "FLAMEBOX",
},
{
"lane_rank": "6",
"primary_runtime_lane": "referent-anchor / payload-owner lane",
"descriptor_family": "referent-anchor",
"fit_strength": "strong",
"confidence": "moderate-high",
"why": "JELYHACK and JELYH2 are still referent-only, but the VM referent registry and payload-chain machinery now make that a live anchor role rather than inert metadata.",
"exemplar": "JELYHACK",
},
{
"lane_rank": "7",
"primary_runtime_lane": "callback / attachment lane",
"descriptor_family": "callback-eventtrigger",
"fit_strength": "weak-moderate",
"confidence": "moderate",
"why": "SURCAMNS and SURCAMEW are structurally coherent callback holders with eventTrigger tags, but the current mask ladder and opcode evidence still align more strongly with active event carriers than callback-specific dispatch.",
"exemplar": "SURCAMNS",
},
)
def family_chunk_map(descriptor_chunks: list[ExtractedChunk]) -> dict[str, list[ExtractedChunk]]:
grouped: dict[str, list[ExtractedChunk]] = {}
for chunk in descriptor_chunks:
family = chunk_bridge_family(chunk)
if not family:
continue
grouped.setdefault(family, []).append(chunk)
return grouped
def choose_family_exemplar(family: str, chunks: list[ExtractedChunk], preferred_label: str) -> ExtractedChunk | None:
for chunk in chunks:
if chunk.primary_label == preferred_label:
return chunk
return chunks[0] if chunks else None
def family_script_block(exemplar: ExtractedChunk, family: str, labels: list[str]) -> list[str]:
lines = [readable_signature(exemplar)]
if family == "event-hub":
lines.extend([
"owner_slot = runtime_owner_table[slot]",
"chain = APPEND_UNIQUE_INLINE(...) or APPEND_UNIQUE_INDIRECT(...)",
"chain = REMOVE_MATCHING_INDIRECT(...) or REMOVE_MATCHING_INLINE(...)",
"payload = PREPEND_INLINE_PAYLOAD(...) when caller bytes are present",
"links = BUILD_ENTITY_LINK_MATRIX(shape_a, shape_b, entity_ids)",
"emit EVENT-style result through FINALIZE_MIXED_VALUE_TO_OUTPTR(...)",
])
elif family == "boot-event-core":
lines.extend([
"anchor referent/event/counter/item into one compact event core",
"materialize slot-backed value from runtime_owner_table[slot]",
"mutate referent payload chain via opcode_0x18_to_0x1b family",
"emit boot-style active event result",
])
elif family == "npc-trigger":
lines.extend([
"materialize slot-backed trigger payload",
"attach event plus item/item2/typeNpc side fields",
"emit NPC-trigger result through shared opcode epilogue",
])
elif family == "minimal-event-core":
lines.extend([
"bind referent to minimal event payload",
"reuse generic active-event mutation path without hub-style side fields",
])
elif family == "environmental-event":
lines.extend([
"bind referent plus event to hazard-specific side fields",
"reuse generic active-event lane; no hazard-specific opcode split proven",
])
elif family == "referent-anchor":
lines.extend([
"referent_id = registry anchor",
"payload_chain = mutable owner-side chain attached to the referent",
"neighboring event-bearing descriptor supplies live event semantics",
"likely attachments: REE_BOOT, SURCAMEW, SFXTRIG",
])
else:
lines.extend([
"callback-side attachment remains descriptor-visible",
"runtime bridge is still generic slot-backed context flow rather than callback-specific opcode dispatch",
])
return lines
def write_runtime_family_bridge_reports(out_dir: pathlib.Path, descriptor_chunks: list[ExtractedChunk]) -> None:
grouped = family_chunk_map(descriptor_chunks)
owner_source = "000d:44df -> 000d:4c99 -> 000d:7000 -> (+0x10/+0x12) + 0x0d*slot + 4"
loader_evidence = (
"0009:67b6/6916 walk helper-owned +0x10/+0x18 tables, format per-entry paths, and then open/read/close files"
)
selector_status = "0x19/0x1a/0x1b proven in 000d:0988; 0x18 implied; upstream [BP-0x32] seed unresolved"
tsv_lines = [
"lane_rank\tprimary_runtime_lane\tdescriptor_family\trepresentative_labels\tfit_strength\tconfidence\truntime_ops\tmask_pairs\towner_source\tloader_evidence\tselector_status\twhy"
]
md_lines = [
"# Runtime Descriptor Family Rankings",
"",
"This report ranks descriptor families against the currently verified 000d VM/runtime lanes.",
"It is intentionally conservative: it scores ecosystem-level fit, not a direct descriptor-id-to-opcode decode.",
"",
"## Owner Source",
"",
f"- Owner path: `{owner_source}`",
f"- Loader evidence: `{loader_evidence}`",
f"- Selector status: `{selector_status}`",
"",
"## Ranked Families",
"",
"| Rank | Runtime Lane | Descriptor Family | Labels | Fit | Confidence |",
"|---:|---|---|---|---|---|",
]
for metadata in family_bridge_metadata():
family = metadata["descriptor_family"]
family_chunks = grouped.get(family, [])
if not family_chunks:
continue
labels = unique_preserve_order([
clean_token(chunk.primary_label or "")
for chunk in family_chunks
if chunk.primary_label
])
exemplar = choose_family_exemplar(family, family_chunks, metadata["exemplar"])
if exemplar is None:
continue
runtime_ops = family_runtime_ops(family)
mask_pairs = family_mask_pairs(family)
tsv_lines.append(
"{lane_rank}\t{primary_runtime_lane}\t{descriptor_family}\t{representative_labels}\t{fit_strength}\t{confidence}\t{runtime_ops}\t{mask_pairs}\t{owner_source}\t{loader_evidence}\t{selector_status}\t{why}".format(
lane_rank=metadata["lane_rank"],
primary_runtime_lane=metadata["primary_runtime_lane"],
descriptor_family=family,
representative_labels=",".join(labels),
fit_strength=metadata["fit_strength"],
confidence=metadata["confidence"],
runtime_ops=",".join(runtime_ops),
mask_pairs=",".join(mask_pairs),
owner_source=owner_source,
loader_evidence=loader_evidence,
selector_status=selector_status,
why=metadata["why"],
)
)
md_lines.append(
"| {lane_rank} | {primary_runtime_lane} | {descriptor_family} | {labels} | {fit_strength} | {confidence} |".format(
lane_rank=metadata["lane_rank"],
primary_runtime_lane=metadata["primary_runtime_lane"],
descriptor_family=family,
labels=", ".join(labels),
fit_strength=metadata["fit_strength"],
confidence=metadata["confidence"],
)
)
md_lines.extend([
"",
f"## {metadata['lane_rank']}. {family}",
"",
f"- Runtime lane: {metadata['primary_runtime_lane']}",
f"- Labels: {', '.join(labels)}",
f"- Fit: {metadata['fit_strength']}",
f"- Confidence: {metadata['confidence']}",
f"- Why: {metadata['why']}",
f"- Runtime ops: {', '.join(runtime_ops)}",
f"- Mask pairs: {', '.join(mask_pairs)}",
"",
"```text",
])
md_lines.extend(family_script_block(exemplar, family, labels))
md_lines.extend([
"```",
"",
])
(out_dir / "runtime_descriptor_family_rankings.tsv").write_text("\n".join(tsv_lines) + "\n", encoding="utf-8")
(out_dir / "runtime_descriptor_family_rankings.md").write_text("\n".join(md_lines), encoding="utf-8")
def write_readable_template_reports(
out_dir: pathlib.Path,
descriptor_chunks: list[ExtractedChunk],
chunk_by_index: dict[int, ExtractedChunk],
total_chunks: int,
) -> None:
focus_sets = [
("JELYHACK anchor lane", {"JELYHACK", "JELYH2"}, 8),
("EVENT hub lane", {"EVENT", "COR_BOOT", "NPCTRIG"}, 5),
("Environmental event lane", {"FLAMEBOX", "NOSTRIL", "STEAMBOX"}, 5),
("Callback trigger lane", {"SURCAMNS", "SURCAMEW"}, 5),
]
tsv_lines = [
"section\tcenter_index\tcenter_label\trelation\tneighbor_index\tdistance\tneighbor_label\trole\tfamily\ttag_kind\tfield_names"
]
md_lines = [
"# EUSECODE Readable Descriptor Templates",
"",
"These are conservative descriptor-side pseudo-script sketches.",
"They reflect verified field grammar and local table neighborhoods, not a direct opcode dump.",
"",
]
for section_name, center_labels, window in focus_sets:
centers = [chunk for chunk in descriptor_chunks if chunk.primary_label in center_labels]
if not centers:
continue
md_lines.append(f"## {section_name}")
md_lines.append("")
for center in centers:
md_lines.append(f"### {center.index}: {center.primary_label}")
md_lines.append("")
md_lines.append("```text")
md_lines.append(readable_signature(center))
for neighbor_index in range(max(0, center.index - window), min(total_chunks, center.index + window + 1)):
if neighbor_index == center.index:
continue
neighbor = chunk_by_index[neighbor_index]
if not (event_tag_kind(neighbor) or has_referent_field(neighbor) or neighbor.primary_label in center_labels):
continue
distance = neighbor.index - center.index
relation = "near"
if event_tag_kind(neighbor) == "event":
relation = "attach"
elif event_tag_kind(neighbor) == "eventTrigger":
relation = "callback"
md_lines.append(
"{relation} {subject} # offset {distance:+d}{family_suffix}".format(
relation=relation,
subject=readable_subject(neighbor),
distance=distance,
family_suffix=(
f", family={classify_event_family(neighbor)}"
if classify_event_family(neighbor)
else ""
),
)
)
tsv_lines.append(
"{section}\t{center_index}\t{center_label}\t{relation}\t{neighbor_index}\t{distance:+d}\t{neighbor_label}\t{role}\t{family}\t{tag_kind}\t{field_names}".format(
section=section_name,
center_index=center.index,
center_label=clean_token(center.primary_label or ""),
relation=relation,
neighbor_index=neighbor.index,
distance=distance,
neighbor_label=clean_token(neighbor.primary_label or ""),
role=chunk_role(neighbor),
family=classify_event_family(neighbor),
tag_kind=event_tag_kind(neighbor),
field_names=",".join(clean_token(field_name) for field_name in neighbor.field_names),
)
)
md_lines.append("```")
md_lines.append("")
family_lines = [
"## Family Signatures",
"",
"| Family | Label | Signature |",
"|---|---|---|",
]
for chunk in sorted(descriptor_chunks, key=lambda value: (classify_event_family(value), value.index)):
family = classify_event_family(chunk)
if not family:
continue
family_lines.append(
"| {family} | {label} | {signature} |".format(
family=family,
label=clean_token(chunk.primary_label or ""),
signature=readable_signature(chunk).replace("|", "/"),
)
)
md_lines.extend(family_lines)
md_lines.append("")
(out_dir / "readable_descriptor_templates.md").write_text("\n".join(md_lines), encoding="utf-8")
(out_dir / "readable_descriptor_templates.tsv").write_text("\n".join(tsv_lines) + "\n", encoding="utf-8")
def header_u16_words(data: bytes, count: int = 16) -> list[str]:
limit = min(len(data) // 2, count)
return [f"0x{read_u16_le(data, index * 2):04X}" for index in range(limit)]
def header_u32_words(data: bytes, count: int = 8) -> list[str]:
limit = min(len(data) // 4, count)
return [f"0x{read_u32_le(data, index * 4):08X}" for index in range(limit)]
def interesting_printable_markers(data: bytes) -> list[str]:
markers: list[str] = []
seen: set[str] = set()
for run in iter_printable_runs(data, min_len=3):
if not any(token in run for token in ("wx[", "wt$[", "t$t=t@", "$Q", "?\n", "?\r")):
continue
if run not in seen:
seen.add(run)
markers.append(run)
return markers[:8]
def write_island_graph(
out_dir: pathlib.Path,
output_name: str,
title: str,
center_labels: set[str],
descriptor_chunks: list[ExtractedChunk],
chunk_by_index: dict[int, ExtractedChunk],
total_chunks: int,
window: int = 5,
) -> None:
centers = [chunk for chunk in descriptor_chunks if chunk.primary_label in center_labels]
if not centers:
return
island_indices = sorted(
{
neighbor_index
for center in centers
for neighbor_index in range(max(0, center.index - window), min(total_chunks, center.index + window + 1))
}
)
island_lines = [f"# {title}", "", "## Nodes", "", "| Index | Label | Role | Fields | Event Evidence |", "|---:|---|---|---|---|"]
for index in island_indices:
chunk = chunk_by_index[index]
island_lines.append(
"| {index} | {label} | {role} | {fields} | {evidence} |".format(
index=index,
label=chunk.primary_label or "",
role=chunk_role(chunk),
fields=",".join(chunk.field_names) or "-",
evidence=",".join(get_event_evidence(chunk)) or "-",
)
)
island_lines.extend(["", "## Edges", "", "| Source | Relation | Target | Evidence |", "|---|---|---|---|"])
for center in centers:
for neighbor_index in range(max(0, center.index - window), min(total_chunks, center.index + window + 1)):
if neighbor_index == center.index:
continue
neighbor = chunk_by_index[neighbor_index]
relation = f"table-neighbor({neighbor.index - center.index:+d})"
event_evidence = get_event_evidence(neighbor)
if event_evidence:
relation = f"possible-event-attachment({neighbor.index - center.index:+d})"
island_lines.append(
"| {source} ({source_index}) | {relation} | {target} ({target_index}) | {evidence} |".format(
source=center.primary_label,
source_index=center.index,
relation=relation,
target=neighbor.primary_label or "",
target_index=neighbor.index,
evidence=",".join(event_evidence) or "same local extraction neighborhood",
)
)
(out_dir / output_name).write_text("\n".join(island_lines) + "\n", encoding="utf-8")
def write_descriptor_compare(
out_dir: pathlib.Path,
output_name: str,
labels: set[str],
descriptor_chunks: list[ExtractedChunk],
) -> None:
compare_lines = [
"entry_index\tlabel\trole\tdata_offset\tdeclared_size\theader_u16\theader_u32\tprintable_markers\tfield_tags"
]
for chunk in descriptor_chunks:
if chunk.primary_label not in labels:
continue
raw_data = pathlib.Path(chunk.raw_path).read_bytes()
compare_lines.append(
"{index}\t{label}\t{role}\t0x{data_offset:X}\t0x{declared_size:X}\t{header_u16}\t{header_u32}\t{markers}\t{field_tags}".format(
index=chunk.index,
label=chunk.primary_label,
role=chunk_role(chunk),
data_offset=chunk.data_offset,
declared_size=chunk.declared_size,
header_u16=",".join(header_u16_words(raw_data)),
header_u32=",".join(header_u32_words(raw_data)),
markers="|".join(interesting_printable_markers(raw_data)),
field_tags=",".join(chunk.field_tags),
)
)
(out_dir / output_name).write_text("\n".join(compare_lines) + "\n", encoding="utf-8")
def write_event_family_reports(
out_dir: pathlib.Path,
descriptor_chunks: list[ExtractedChunk],
chunk_by_index: dict[int, ExtractedChunk],
total_chunks: int,
) -> None:
family_lines = [
"entry_index\tlabel\tfamily\ttag_kind\trole\tfield_count\tfield_names\tfield_tags\tdata_offset\tdeclared_size\tlocal_event_neighbors"
]
families: dict[str, list[ExtractedChunk]] = {}
for chunk in descriptor_chunks:
family = classify_event_family(chunk)
if not family:
continue
families.setdefault(family, []).append(chunk)
local_event_neighbors = 0
for neighbor_index in range(max(0, chunk.index - 5), min(total_chunks, chunk.index + 6)):
if neighbor_index == chunk.index:
continue
neighbor = chunk_by_index[neighbor_index]
if event_tag_kind(neighbor):
local_event_neighbors += 1
family_lines.append(
"{index}\t{label}\t{family}\t{tag_kind}\t{role}\t{field_count}\t{field_names}\t{field_tags}\t0x{data_offset:X}\t0x{declared_size:X}\t{local_event_neighbors}".format(
index=chunk.index,
label=chunk.primary_label or "",
family=family,
tag_kind=event_tag_kind(chunk),
role=chunk_role(chunk),
field_count=len(chunk.field_names),
field_names=",".join(chunk.field_names),
field_tags=",".join(chunk.field_tags),
data_offset=chunk.data_offset,
declared_size=chunk.declared_size,
local_event_neighbors=local_event_neighbors,
)
)
(out_dir / "event_family_index.tsv").write_text("\n".join(family_lines) + "\n", encoding="utf-8")
summary_lines = ["# Event Family Summary", ""]
family_order = [
"event-hub",
"boot-event-core",
"npc-trigger",
"minimal-event-core",
"environmental-event",
"specialized-event",
"callback-eventtrigger",
]
for family in family_order:
family_chunks = families.get(family, [])
if not family_chunks:
continue
summary_lines.append(f"## {family}")
summary_lines.append("")
summary_lines.append("| Index | Label | Tag Kind | Fields | Size | Local Event Neighbors |")
summary_lines.append("|---:|---|---|---|---:|---:|")
for chunk in sorted(family_chunks, key=lambda value: value.index):
local_event_neighbors = 0
for neighbor_index in range(max(0, chunk.index - 5), min(total_chunks, chunk.index + 6)):
if neighbor_index == chunk.index:
continue
neighbor = chunk_by_index[neighbor_index]
if event_tag_kind(neighbor):
local_event_neighbors += 1
summary_lines.append(
"| {index} | {label} | {tag_kind} | {fields} | 0x{declared_size:X} | {local_event_neighbors} |".format(
index=chunk.index,
label=chunk.primary_label or "",
tag_kind=event_tag_kind(chunk),
fields=",".join(chunk.field_names) or "-",
declared_size=chunk.declared_size,
local_event_neighbors=local_event_neighbors,
)
)
summary_lines.append("")
(out_dir / "event_family_summary.md").write_text("\n".join(summary_lines), encoding="utf-8")
def looks_text_like(data: bytes) -> bool:
if not data:
return False
ratio = printable_ratio(data)
if ratio < 0.80:
return False
if b"\r\n" in data or b"\n" in data:
return True
return zero_ratio(data) < 0.05
def parse_flx_table(data: bytes, table_offset: int = 0x80, count_offset: int = 0x54) -> FlxTable:
file_size = len(data)
entry_count = read_u32_le(data, count_offset)
table_end = table_offset + entry_count * 8
if table_end > file_size:
raise ValueError(
f"FLX table extends past EOF: entry_count={entry_count} table_end=0x{table_end:X} file_size=0x{file_size:X}"
)
entries: list[CandidateEntry] = []
for index in range(entry_count):
offset = table_offset + index * 8
data_offset = read_u32_le(data, offset)
declared_size = read_u32_le(data, offset + 4)
if data_offset == 0 and declared_size == 0:
continue
if data_offset <= 0 or data_offset > file_size:
continue
if declared_size <= 0:
continue
entries.append(CandidateEntry(offset, data_offset, declared_size))
return FlxTable(
entry_count=entry_count,
table_offset=table_offset,
table_end=table_end,
entries=entries,
)
def dump_chunk(
base_dir: pathlib.Path, chunk_name: str, data: bytes
) -> tuple[str, str, str | None, bool, float, float, str, str | None, list[str], list[str]]:
raw_path = base_dir / f"{chunk_name}.bin"
strings_path = base_dir / f"{chunk_name}.strings.txt"
text_path = base_dir / f"{chunk_name}.txt"
raw_path.write_bytes(data)
runs = iter_printable_runs(data)
strings_path.write_text("\n".join(runs) + ("\n" if runs else ""), encoding="utf-8")
primary_label, field_names = summarize_descriptor(runs)
field_tags = extract_field_tag_records(data, field_names)
text_like = looks_text_like(data)
actual_text_path: str | None = None
if text_like:
text_path.write_text(data.decode("latin-1", errors="replace"), encoding="utf-8")
actual_text_path = str(text_path)
return (
str(raw_path),
str(strings_path),
actual_text_path,
text_like,
printable_ratio(data),
zero_ratio(data),
ascii_preview(data),
primary_label,
field_names,
field_tags,
)
def extract_candidates(data: bytes, out_dir: pathlib.Path, entries: list[CandidateEntry]) -> list[ExtractedChunk]:
chunks_dir = out_dir / "chunks"
chunks_dir.mkdir(parents=True, exist_ok=True)
extracted: list[ExtractedChunk] = []
file_size = len(data)
sorted_entries = sorted(enumerate(entries), key=lambda item: (item[1].data_offset, item[0]))
next_by_original_index: dict[int, int | None] = {}
for position, (original_index, entry) in enumerate(sorted_entries):
next_offset = sorted_entries[position + 1][1].data_offset if position + 1 < len(sorted_entries) else None
next_by_original_index[original_index] = next_offset
for index, entry in enumerate(entries):
next_offset = next_by_original_index.get(index)
chunk_end = min(file_size, entry.data_offset + entry.declared_size)
chunk_data = data[entry.data_offset:chunk_end]
overlap = next_offset is not None and (entry.data_offset + entry.declared_size) > next_offset
chunk_name = (
f"chunk_{index:03d}_table_{entry.table_offset:04X}_off_{entry.data_offset:06X}_len_{entry.declared_size:06X}"
)
raw_path, strings_path, text_path, text_like, print_ratio, z_ratio, preview, primary_label, field_names, field_tags = dump_chunk(
chunks_dir, chunk_name, chunk_data
)
extracted.append(
ExtractedChunk(
index=index,
table_offset=entry.table_offset,
object_index=object_index_from_table_offset(entry.table_offset),
data_offset=entry.data_offset,
declared_size=entry.declared_size,
next_offset=next_offset,
extracted_size=len(chunk_data),
overlap_with_next=overlap,
text_like=text_like,
printable_ratio=round(print_ratio, 4),
zero_ratio=round(z_ratio, 4),
preview=preview,
raw_path=raw_path,
strings_path=strings_path,
text_path=text_path,
primary_label=primary_label,
field_names=field_names,
field_tags=field_tags,
)
)
annotate_class_layout(extracted)
return extracted
def write_summary(out_dir: pathlib.Path, input_path: pathlib.Path, data: bytes, entries: list[CandidateEntry], chunks: list[ExtractedChunk]) -> None:
summary = {
"input_path": str(input_path),
"file_size": len(data),
"header_preview_hex": data[:128].hex(),
"header_preview_ascii": ascii_preview(data[:128], 128),
"candidate_entries": [asdict(entry) for entry in entries],
"chunks": [asdict(chunk) for chunk in chunks],
}
(out_dir / "summary.json").write_text(json.dumps(summary, indent=2), encoding="utf-8")
index_lines = [
"entry_index\ttable_offset\tobject_index\tclass_id\tclass_name_hint\traw_code_base_u32\tcode_base_minus_one\tconservative_event_count\tevent_table_end\tclass_parse_status\tdata_offset\tdeclared_size\textracted_size\ttext_like\tprintable_ratio\tzero_ratio\toverlap_with_next\tprimary_label\tfield_names\tfield_tags\tpreview"
]
for chunk in chunks:
index_lines.append(
"{index}\t{table_offset}\t{object_index}\t{class_id}\t{class_name_hint}\t{raw_code_base_u32}\t{code_base_minus_one}\t{conservative_event_count}\t{event_table_end}\t{class_parse_status}\t{data_offset}\t{declared_size}\t{extracted_size}\t{text_like}\t{printable_ratio:.4f}\t{zero_ratio:.4f}\t{overlap}\t{primary_label}\t{field_names}\t{field_tags}\t{preview}".format(
index=chunk.index,
table_offset=(f"0x{chunk.table_offset:X}"),
object_index=(f"0x{chunk.object_index:X}" if chunk.object_index is not None else ""),
class_id=(f"0x{chunk.class_id:X}" if chunk.class_id is not None else ""),
class_name_hint=chunk.class_name_hint or "",
raw_code_base_u32=(f"0x{chunk.raw_code_base_u32:X}" if chunk.raw_code_base_u32 is not None else ""),
code_base_minus_one=(f"0x{chunk.code_base_minus_one:X}" if chunk.code_base_minus_one is not None else ""),
conservative_event_count=(chunk.conservative_event_count if chunk.conservative_event_count is not None else ""),
event_table_end=(f"0x{chunk.event_table_end:X}" if chunk.event_table_end is not None else ""),
class_parse_status=chunk.class_parse_status or "",
data_offset=f"0x{chunk.data_offset:X}",
declared_size=f"0x{chunk.declared_size:X}",
extracted_size=f"0x{chunk.extracted_size:X}",
text_like=int(chunk.text_like),
printable_ratio=chunk.printable_ratio,
zero_ratio=chunk.zero_ratio,
overlap=int(chunk.overlap_with_next),
primary_label=chunk.primary_label or "",
field_names=",".join(chunk.field_names),
field_tags=",".join(chunk.field_tags),
preview=chunk.preview.replace("\t", " "),
)
)
(out_dir / "entry_index.tsv").write_text("\n".join(index_lines) + "\n", encoding="utf-8")
descriptor_lines = [
"entry_index\tobject_index\tclass_id\tclass_name_hint\traw_code_base_u32\tcode_base_minus_one\tconservative_event_count\tevent_table_end\tclass_parse_status\tprimary_label\tfield_names\tfield_tags\tdata_offset\tdeclared_size"
]
descriptor_chunks = [chunk for chunk in chunks if chunk.primary_label or chunk.field_names]
for chunk in descriptor_chunks:
descriptor_lines.append(
"{index}\t{object_index}\t{class_id}\t{class_name_hint}\t{raw_code_base_u32}\t{code_base_minus_one}\t{conservative_event_count}\t{event_table_end}\t{class_parse_status}\t{primary_label}\t{field_names}\t{field_tags}\t0x{data_offset:X}\t0x{declared_size:X}".format(
index=chunk.index,
object_index=(f"0x{chunk.object_index:X}" if chunk.object_index is not None else ""),
class_id=(f"0x{chunk.class_id:X}" if chunk.class_id is not None else ""),
class_name_hint=chunk.class_name_hint or "",
raw_code_base_u32=(f"0x{chunk.raw_code_base_u32:X}" if chunk.raw_code_base_u32 is not None else ""),
code_base_minus_one=(f"0x{chunk.code_base_minus_one:X}" if chunk.code_base_minus_one is not None else ""),
conservative_event_count=(chunk.conservative_event_count if chunk.conservative_event_count is not None else ""),
event_table_end=(f"0x{chunk.event_table_end:X}" if chunk.event_table_end is not None else ""),
class_parse_status=chunk.class_parse_status or "",
primary_label=chunk.primary_label or "",
field_names=",".join(chunk.field_names),
field_tags=",".join(chunk.field_tags),
data_offset=chunk.data_offset,
declared_size=chunk.declared_size,
)
)
(out_dir / "descriptor_index.tsv").write_text("\n".join(descriptor_lines) + "\n", encoding="utf-8")
class_layout_lines = [
"entry_index\tobject_index\tclass_id\tclass_name_hint\traw_code_base_u32\tcode_base_minus_one\tconservative_event_count\tevent_table_end\tclass_parse_status\tdata_offset\tdeclared_size\tprimary_label"
]
parsed_class_chunks = [chunk for chunk in chunks if chunk.class_parse_status == "parsed-class-layout"]
class_event_rows, rows_by_entry, raw_data_by_entry = build_class_event_rows(parsed_class_chunks)
repeated_status_by_row = build_repeated_template_status_map(
parsed_class_chunks,
rows_by_entry,
raw_data_by_entry,
)
repeated_family_regression_lines = validate_verified_repeated_family_regressions(
parsed_class_chunks,
rows_by_entry,
repeated_status_by_row,
)
for chunk in parsed_class_chunks:
class_layout_lines.append(
"{index}\t0x{object_index:X}\t0x{class_id:X}\t{class_name_hint}\t0x{raw_code_base_u32:X}\t0x{code_base_minus_one:X}\t{conservative_event_count}\t0x{event_table_end:X}\t{class_parse_status}\t0x{data_offset:X}\t0x{declared_size:X}\t{primary_label}".format(
index=chunk.index,
object_index=chunk.object_index,
class_id=chunk.class_id,
class_name_hint=chunk.class_name_hint or "",
raw_code_base_u32=chunk.raw_code_base_u32,
code_base_minus_one=chunk.code_base_minus_one,
conservative_event_count=chunk.conservative_event_count,
event_table_end=chunk.event_table_end,
class_parse_status=chunk.class_parse_status,
data_offset=chunk.data_offset,
declared_size=chunk.declared_size,
primary_label=chunk.primary_label or "",
)
)
(out_dir / "class_layout_index.tsv").write_text("\n".join(class_layout_lines) + "\n", encoding="utf-8")
class_event_lines = [
"entry_index\tobject_index\tclass_id\tclass_name_hint\tslot\tevent_name_hint\traw_event_entry_word\traw_code_offset\tderived_body_start\tderived_body_end\tderived_body_length\trepeated_template_status"
]
for row in class_event_rows:
class_event_lines.append(
"{entry_index}\t0x{object_index:X}\t0x{class_id:X}\t{class_name_hint}\t0x{slot:02X}\t{event_name_hint}\t0x{raw_event_entry_word:04X}\t0x{raw_code_offset:08X}\t{derived_body_start}\t{derived_body_end}\t{derived_body_length}\t{repeated_template_status}".format(
entry_index=row.entry_index,
object_index=row.object_index,
class_id=row.class_id,
class_name_hint=row.class_name_hint,
slot=row.slot,
event_name_hint=row.event_name_hint or "",
raw_event_entry_word=row.raw_event_entry_word,
raw_code_offset=row.raw_code_offset,
derived_body_start=format_optional_hex(row.derived_body_start, 4),
derived_body_end=format_optional_hex(row.derived_body_end, 4),
derived_body_length=(row.derived_body_length if row.derived_body_length is not None else ""),
repeated_template_status=repeated_status_by_row.get((row.entry_index, row.slot), ""),
)
)
(out_dir / "class_event_index.tsv").write_text("\n".join(class_event_lines) + "\n", encoding="utf-8")
for family_artifact_spec in FAMILY_ARTIFACT_SPECS:
write_family_decompile_artifact(
out_dir,
parsed_class_chunks,
rows_by_entry,
raw_data_by_entry,
repeated_status_by_row,
family_artifact_spec,
)
(out_dir / "repeated_family_regressions.tsv").write_text(
"\n".join(repeated_family_regression_lines) + "\n",
encoding="utf-8",
)
neighborhood_lines = [
"center_index\tneighbor_index\tprimary_label\tfield_names\tfield_tags"
]
interesting = {"JELYHACK", "JELYH2", "NPCTRIG", "CRUZTRIG", "TRIGPAD", "SPECIAL", "EVENT", "SFXTRIG"}
interesting_indices = [chunk.index for chunk in chunks if chunk.primary_label in interesting]
seen_pairs: set[tuple[int, int]] = set()
chunk_by_index = {chunk.index: chunk for chunk in chunks}
for center_index in interesting_indices:
for neighbor_index in range(max(0, center_index - 4), min(len(chunks), center_index + 5)):
pair = (center_index, neighbor_index)
if pair in seen_pairs:
continue
seen_pairs.add(pair)
chunk = chunk_by_index[neighbor_index]
neighborhood_lines.append(
"{center_index}\t{neighbor_index}\t{primary_label}\t{field_names}\t{field_tags}".format(
center_index=center_index,
neighbor_index=neighbor_index,
primary_label=chunk.primary_label or "",
field_names=",".join(chunk.field_names),
field_tags=",".join(chunk.field_tags),
)
)
(out_dir / "descriptor_neighborhoods.tsv").write_text("\n".join(neighborhood_lines) + "\n", encoding="utf-8")
anchor_graph_lines = [
"anchor_index\tanchor_label\tanchor_fields\tneighbor_index\tdistance\tneighbor_label\tneighbor_fields\tneighbor_role\tevent_evidence"
]
for anchor in descriptor_chunks:
if not anchor.primary_label or not has_referent_field(anchor):
continue
for neighbor_index in range(max(0, anchor.index - 8), min(len(chunks), anchor.index + 9)):
if neighbor_index == anchor.index:
continue
neighbor = chunk_by_index[neighbor_index]
event_evidence = get_event_evidence(neighbor)
if not event_evidence:
continue
anchor_graph_lines.append(
"{anchor_index}\t{anchor_label}\t{anchor_fields}\t{neighbor_index}\t{distance:+d}\t{neighbor_label}\t{neighbor_fields}\t{neighbor_role}\t{event_evidence}".format(
anchor_index=anchor.index,
anchor_label=anchor.primary_label,
anchor_fields=",".join(anchor.field_names),
neighbor_index=neighbor.index,
distance=neighbor.index - anchor.index,
neighbor_label=neighbor.primary_label or "",
neighbor_fields=",".join(neighbor.field_names),
neighbor_role=chunk_role(neighbor),
event_evidence=",".join(event_evidence),
)
)
(out_dir / "referent_anchor_event_graph.tsv").write_text("\n".join(anchor_graph_lines) + "\n", encoding="utf-8")
write_island_graph(
out_dir,
"jelyhack_island_graph.md",
"JELYHACK Island Graph",
{"JELYHACK", "JELYH2"},
descriptor_chunks,
chunk_by_index,
len(chunks),
window=8,
)
write_descriptor_compare(
out_dir,
"jelyhack_descriptor_compare.tsv",
{"JELYHACK", "JELYH2", "REE_BOOT", "SURCAMEW", "SFXTRIG"},
descriptor_chunks,
)
write_island_graph(
out_dir,
"event_island_graph.md",
"EVENT Cluster Graph",
{"EVENT", "COR_BOOT", "NPCTRIG", "ROLL_NS", "CRUZTRIG"},
descriptor_chunks,
chunk_by_index,
len(chunks),
)
write_descriptor_compare(
out_dir,
"event_descriptor_compare.tsv",
{"ROLL_NS", "COR_BOOT", "EVENT", "NPCTRIG", "CRUZTRIG", "NPC_ONLY", "VMAIL"},
descriptor_chunks,
)
write_island_graph(
out_dir,
"boot_frontier_graph.md",
"AND/BRO Boot Frontier Graph",
{"AND_BOOT", "BRO_BOOT"},
descriptor_chunks,
chunk_by_index,
len(chunks),
window=6,
)
write_descriptor_compare(
out_dir,
"boot_family_compare.tsv",
{"AND_BOOT", "BRO_BOOT", "COR_BOOT", "VAR_BOOT", "REE_BOOT"},
descriptor_chunks,
)
write_island_graph(
out_dir,
"environmental_event_graph.md",
"Environmental Event Graph",
{"FLAMEBOX", "NOSTRIL", "STEAMBOX"},
descriptor_chunks,
chunk_by_index,
len(chunks),
window=5,
)
write_descriptor_compare(
out_dir,
"environmental_family_compare.tsv",
{"FLAMEBOX", "NOSTRIL", "STEAMBOX"},
descriptor_chunks,
)
write_descriptor_compare(
out_dir,
"callback_trigger_compare.tsv",
{"SURCAMNS", "SURCAMEW"},
descriptor_chunks,
)
write_event_family_reports(out_dir, descriptor_chunks, chunk_by_index, len(chunks))
write_readable_template_reports(out_dir, descriptor_chunks, chunk_by_index, len(chunks))
write_runtime_bridge_reports(out_dir, descriptor_chunks, chunk_by_index, len(chunks))
write_runtime_family_bridge_reports(out_dir, descriptor_chunks)
write_immortality_target_body_scan(out_dir, parsed_class_chunks, rows_by_entry, raw_data_by_entry)
write_immortality_body_structure_report(out_dir, parsed_class_chunks, rows_by_entry, raw_data_by_entry)
write_npctrig_clause_report(out_dir, parsed_class_chunks, rows_by_entry, raw_data_by_entry)
lines = []
lines.append("# EUSECODE.FLX First-Pass Extraction")
lines.append("")
lines.append(f"Input: {input_path}")
lines.append(f"File size: 0x{len(data):X} ({len(data)} bytes)")
lines.append(f"Candidate entries: {len(entries)}")
lines.append("")
lines.append("## Header Preview")
lines.append("")
lines.append(f"ASCII: `{ascii_preview(data[:128], 128)}`")
lines.append("")
lines.append("## Chunks")
lines.append("")
lines.append("| # | Table Off | Data Off | Declared Size | Next Off | Text | Overlap | Preview |")
lines.append("|---:|---:|---:|---:|---:|:---:|:---:|---|")
for chunk in chunks:
next_off = f"0x{chunk.next_offset:X}" if chunk.next_offset is not None else "-"
lines.append(
"| {index} | 0x{table_offset:X} | 0x{data_offset:X} | 0x{declared_size:X} | {next_off} | {text_like} | {overlap} | {preview} |".format(
index=chunk.index,
table_offset=chunk.table_offset,
data_offset=chunk.data_offset,
declared_size=chunk.declared_size,
next_off=next_off,
text_like="yes" if chunk.text_like else "no",
overlap="yes" if chunk.overlap_with_next else "no",
preview=chunk.preview.replace("|", "/"),
)
)
lines.append("")
lines.append("## Notes")
lines.append("")
lines.append("- The extractor now parses the validated FLX table directly: entry count at `0x54`, table at `0x80`, 8 bytes per entry.")
lines.append("- Overlapping declared sizes likely mean some entries are counts or record spans rather than exact chunk lengths.")
lines.append("- `.strings.txt` files are the main human-readable output for now; `.txt` files are emitted only for chunks that look text-like.")
lines.append("- `descriptor_index.tsv` summarizes guessed class labels, field names, and compact tag patterns for descriptor-like chunks.")
lines.append("- `class_layout_index.tsv` records the conservative owner-loaded class parsing state: object index, class id, class-name hint, raw bytes-8..11 field, derived code-base-minus-one, and event-count/table-end values when the local divisibility and bounds checks succeed.")
lines.append("- `class_event_index.tsv` now also emits derived body-window columns (`derived_body_start`, `derived_body_end`, `derived_body_length`) plus conservative `repeated_template_status` tags for verified repeated families.")
lines.append("- `boot_family_decompile.md` / `.tsv`, `callback_family_decompile.md` / `.tsv`, and `environmental_family_decompile.md` / `.tsv` now provide reversible per-class decompile artifacts for the `_BOOT`, `SURCAM*`, and environmental repeated-family lanes.")
lines.append("- `repeated_family_regressions.tsv` enforces the current repeated-family slot sets plus the verified raw-row and derived body-window fields for `JELYHACK/JELYH2`, `_BOOT`, `SURCAM*`, and `FLAMEBOX/NOSTRIL/STEAMBOX`.")
lines.append("- `descriptor_neighborhoods.tsv` captures local table neighborhoods around trigger/event-related classes such as `JELYHACK`, `NPCTRIG`, `CRUZTRIG`, `TRIGPAD`, and `SPECIAL`.")
lines.append("- `referent_anchor_event_graph.tsv` groups referent-bearing descriptors with nearby event-bearing neighbors so the attachment model can be inspected without ad hoc grepping.")
lines.append("- `jelyhack_island_graph.md` now uses a wider local window so the `JELYHACK` / `JELYH2` anchors can be inspected alongside the nearby event-bearing `REE_BOOT`, `SURCAMEW`, and `SFXTRIG` descriptors rather than stopping at the referent-only neighbors.")
lines.append("- `jelyhack_descriptor_compare.tsv` captures the first 16 header words, first 8 dwords, and a few odd printable markers for the core JELYHACK-island descriptors so structural similarity can be compared without raw hex dumps.")
lines.append("- `event_island_graph.md` renders the denser `EVENT` / `COR_BOOT` / `NPCTRIG` / `ROLL_NS` / `CRUZTRIG` island, which currently looks like the strongest event-explicit neighborhood outside the JELYHACK anchor case.")
lines.append("- `event_descriptor_compare.tsv` captures the same header-word and printable-marker comparison for the `EVENT` island so large event-bearing descriptors can be contrasted with neighboring trigger and referent records.")
lines.append("- `boot_frontier_graph.md` renders the upstream referent neighborhood feeding `AND_BOOT` / `BRO_BOOT`, which is currently the clearest unexplored boot-event frontier.")
lines.append("- `boot_family_compare.tsv` compares the five `_BOOT` event cores (`AND_BOOT`, `BRO_BOOT`, `COR_BOOT`, `VAR_BOOT`, `REE_BOOT`) by header words, markers, and field tags.")
lines.append("- `environmental_event_graph.md` renders the three hazard/event islands centered on `FLAMEBOX`, `NOSTRIL`, and `STEAMBOX`, each surrounded by its own referent-heavy local neighborhood.")
lines.append("- `environmental_family_compare.tsv` compares the environmental event trio so the shared hazard pattern (`referent,event,<hazard>,<hazard2>,direction,count`) can be contrasted directly.")
lines.append("- `callback_trigger_compare.tsv` compares `SURCAMNS` and `SURCAMEW` directly so the callback-only `eventTrigger` lane can be checked against the active `event` families without raw hex dumps.")
lines.append("- `event_family_index.tsv` and `event_family_summary.md` classify all current `event` and `eventTrigger` descriptors into reusable families such as boot-event cores, minimal event cores, environmental events, and callback-only surveillance triggers.")
lines.append("- `readable_descriptor_templates.md` and `readable_descriptor_templates.tsv` emit conservative pseudo-script sketches for the strongest current anchor, event-hub, environmental, and callback lanes so USECODE neighborhoods can be read as structured attachments instead of only raw descriptor rows.")
lines.append("- `runtime_vm_ir.tsv` captures the currently verified 000d VM operator vocabulary as machine-readable rows with stage addresses, opcode/lane status, payload shape, and evidence anchors.")
lines.append("- `vm_mask_ladder.tsv` records the current `entity_vm_context_try_create_masked_for_entity` wrapper ladder in machine-readable form so gameplay mask lanes can be compared against descriptor-side families without reopening the notes.")
lines.append("- `readable_script_ir.md` and `readable_script_ir.tsv` join descriptor neighborhoods, the verified VM IR, the runtime owner/source path, and the current mask-family hints into one conservative script-facing bridge artifact.")
lines.append("- `runtime_descriptor_family_rankings.md` and `runtime_descriptor_family_rankings.tsv` rank descriptor families against the verified runtime lanes so the current human-readable script bridge is searchable by family fit rather than only by neighborhood dumps.")
lines.append("- `immortality_target_body_scan.md` and `immortality_target_body_scan.tsv` now scan the strongest current immortality candidates (`EVENT`, `NPCTRIG`, `_BOOT`, `SFXTRIG`, `SPECIAL`, `TRIGPAD`) for inline `0x410` literals and record the tightest remaining active-event template frontier.")
lines.append("- `immortality_npctrig_clauses.md` and `immortality_npctrig_clauses.tsv` now split the compact `NPCTRIG` slot `0x0A` / `0x20` bodies into prefix, clause, and tail regions so the event-bearing ladder can be compared against the typed/setup companion body without reopening raw hex.")
(out_dir / "README.md").write_text("\n".join(lines) + "\n", encoding="utf-8")
all_strings = iter_printable_runs(data)
(out_dir / "all_strings.txt").write_text("\n".join(all_strings) + ("\n" if all_strings else ""), encoding="utf-8")
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("input", nargs="?", type=pathlib.Path, default=DEFAULT_INPUT)
parser.add_argument("output", nargs="?", type=pathlib.Path, default=DEFAULT_OUTPUT)
return parser.parse_args()
def main() -> int:
args = parse_args()
data = args.input.read_bytes()
args.output.mkdir(parents=True, exist_ok=True)
flx_table = parse_flx_table(data)
entries = flx_table.entries
chunks = extract_candidates(data, args.output, entries)
write_summary(args.output, args.input, data, entries, chunks)
print(
f"Parsed {flx_table.entry_count} table slots with {len(chunks)} non-zero entries; extracted to {args.output}"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())