Add 'annotate-usecode' command to import USECODE IR JSON annotations

- Introduced a new command 'annotate-usecode' to import USECODE IR JSON annotation hints as Ghidra comments on compiled anchors.
- Added argument parsing for multiple IR JSON files, comment type selection, and a dry-run option.
- Implemented logic to read annotation records from the provided IR files and set comments on the corresponding addresses in Ghidra.
- Enhanced JSON schema to include response structure for the new command.
This commit is contained in:
MaddoScientisto 2026-03-24 18:14:20 +01:00
commit daa363c3d2
39 changed files with 41450 additions and 871 deletions

View file

@ -516,6 +516,39 @@ FAMILY_ARTIFACT_SPECS: tuple[FamilyArtifactSpec, ...] = (
)
IMMORTALITY_TARGET_LABELS: tuple[str, ...] = (
"EVENT",
"NPCTRIG",
"COR_BOOT",
"REE_BOOT",
"SFXTRIG",
"SPECIAL",
"TRIGPAD",
)
IMMORTALITY_TEMPLATE_COMPARE_LABELS: frozenset[str] = frozenset(
{"NPCTRIG", "COR_BOOT", "REE_BOOT", "SFXTRIG"}
)
IMMORTALITY_STRUCTURAL_TARGET_LABELS: frozenset[str] = frozenset({"EVENT", "NPCTRIG"})
IMMORTALITY_BODY_MOTIFS: tuple[tuple[str, bytes], ...] = (
("call_40_06_4c_02", bytes.fromhex("40 06 4c 02")),
("call_40_06_0f_04", bytes.fromhex("40 06 0f 04")),
("subheader_53_5c", bytes.fromhex("53 5c")),
("writeback_57_02", bytes.fromhex("57 02")),
("branch_59_0a", bytes.fromhex("59 0a")),
("branch_3f_0a", bytes.fromhex("3f 0a")),
("field_4b_fe_0f", bytes.fromhex("4b fe 0f")),
("field_4b_fc_0f", bytes.fromhex("4b fc 0f")),
("push_24_51", bytes.fromhex("24 51")),
("event_field_69_0a_00", bytes.fromhex("69 0a 00")),
)
VERIFIED_REPEATED_FAMILY_ROW_EXPECTATIONS: tuple[RepeatedFamilyRowExpectation, ...] = (
RepeatedFamilyRowExpectation("JELYHACK", 0x01, 0x002A, 0x00000001, 0x00D4, 0x00FE, 42, "referent-anchor-twin/shared-slot-0x01/same-length-template"),
RepeatedFamilyRowExpectation("JELYH2", 0x01, 0x002A, 0x00000001, 0x00D4, 0x00FE, 42, "referent-anchor-twin/shared-slot-0x01/same-length-template"),
@ -776,6 +809,33 @@ def hex_tail(data: bytes, width: int = 8) -> str:
return data[-width:].hex()
def find_all_offsets(haystack: bytes, needle: bytes) -> list[int]:
offsets: list[int] = []
start = 0
while True:
found = haystack.find(needle, start)
if found < 0:
return offsets
offsets.append(found)
start = found + 1
def common_prefix_len(left: bytes, right: bytes) -> int:
limit = min(len(left), len(right))
offset = 0
while offset < limit and left[offset] == right[offset]:
offset += 1
return offset
def common_suffix_len(left: bytes, right: bytes) -> int:
limit = min(len(left), len(right))
offset = 0
while offset < limit and left[-1 - offset] == right[-1 - offset]:
offset += 1
return offset
def write_family_decompile_artifact(
out_dir: pathlib.Path,
parsed_class_chunks: list[ExtractedChunk],
@ -996,6 +1056,683 @@ def validate_verified_repeated_family_regressions(
return report_lines
def write_immortality_target_body_scan(
out_dir: pathlib.Path,
parsed_class_chunks: list[ExtractedChunk],
rows_by_entry: dict[int, list[ClassEventRow]],
raw_data_by_entry: dict[int, bytes],
) -> None:
chunk_by_label = {
chunk.primary_label: chunk
for chunk in parsed_class_chunks
if chunk.primary_label
}
scan_patterns = (
("le16_0410", struct.pack("<H", 0x0410)),
("le32_00000410", struct.pack("<I", 0x00000410)),
("le16_1004", struct.pack("<H", 0x1004)),
)
body_records: list[dict[str, object]] = []
comparison_records: list[tuple[int, int, int, str, int, int, str, int, int]] = []
compare_bodies: list[tuple[str, int, bytes]] = []
for label in IMMORTALITY_TARGET_LABELS:
chunk = chunk_by_label.get(label)
if chunk is None:
continue
raw_data = raw_data_by_entry.get(chunk.index)
if raw_data is None:
continue
for row in rows_by_entry.get(chunk.index, []):
if row.raw_code_offset == 0:
continue
if row.derived_body_start is None or row.derived_body_end is None:
continue
body = raw_data[row.derived_body_start:row.derived_body_end]
pattern_hits = {
pattern_name: find_all_offsets(body, needle)
for pattern_name, needle in scan_patterns
}
body_records.append(
{
"entry_index": row.entry_index,
"class_name": label,
"slot": row.slot,
"event_name_hint": row.event_name_hint or "",
"body_start": row.derived_body_start,
"body_end": row.derived_body_end,
"body_length": row.derived_body_length or len(body),
"first16": body[:16].hex(),
"last16": body[-16:].hex(),
"hits": pattern_hits,
}
)
if label in IMMORTALITY_TEMPLATE_COMPARE_LABELS:
compare_bodies.append((label, row.slot, body))
for left_index, left in enumerate(compare_bodies):
left_label, left_slot, left_body = left
for right in compare_bodies[left_index + 1:]:
right_label, right_slot, right_body = right
prefix = common_prefix_len(left_body, right_body)
suffix = common_suffix_len(left_body, right_body)
comparison_records.append(
(
prefix + suffix,
prefix,
suffix,
left_label,
left_slot,
len(left_body),
right_label,
right_slot,
len(right_body),
)
)
comparison_records.sort(reverse=True)
tsv_lines = [
"entry_index\tclass_name\tslot\tevent_name_hint\tbody_start\tbody_end\tbody_length\tle16_0410_count\tle16_0410_offsets\tle32_00000410_count\tle32_00000410_offsets\tle16_1004_count\tle16_1004_offsets\tbody_prefix_hex\tbody_suffix_hex"
]
for record in body_records:
hits = record["hits"]
tsv_lines.append(
"{entry_index}\t{class_name}\t0x{slot:02X}\t{event_name_hint}\t0x{body_start:04X}\t0x{body_end:04X}\t{body_length}\t{le16_count}\t{le16_offsets}\t{le32_count}\t{le32_offsets}\t{be16_count}\t{be16_offsets}\t{first16}\t{last16}".format(
entry_index=record["entry_index"],
class_name=record["class_name"],
slot=record["slot"],
event_name_hint=record["event_name_hint"],
body_start=record["body_start"],
body_end=record["body_end"],
body_length=record["body_length"],
le16_count=len(hits["le16_0410"]),
le16_offsets=",".join(f"0x{offset:04X}" for offset in hits["le16_0410"]),
le32_count=len(hits["le32_00000410"]),
le32_offsets=",".join(f"0x{offset:04X}" for offset in hits["le32_00000410"]),
be16_count=len(hits["le16_1004"]),
be16_offsets=",".join(f"0x{offset:04X}" for offset in hits["le16_1004"]),
first16=record["first16"],
last16=record["last16"],
)
)
(out_dir / "immortality_target_body_scan.tsv").write_text("\n".join(tsv_lines) + "\n", encoding="utf-8")
md_lines = [
"# Immortality Target Body Scan",
"",
"This report is a focused follow-up on the player-trigger immortality lane.",
"It scans the current highest-value EUSECODE candidates for inline `0x410` literals and compares the strongest active-event template bodies.",
"",
]
any_literal_hits = any(
record["hits"]["le16_0410"] or record["hits"]["le32_00000410"] or record["hits"]["le16_1004"]
for record in body_records
)
if any_literal_hits:
md_lines.append("- At least one target body contains an inline `0x410`-adjacent literal; inspect the TSV rows with non-zero hit counts.")
else:
md_lines.append("- No scanned target body contains inline little-endian `0x0410`, inline dword `0x00000410`, or byte-swapped `0x1004` literals.")
md_lines.append("- `EVENT` remains the widest unresolved active-event frontier because it still exposes one monolithic slot-`0x0A` body (`8150` bytes) with no finer body split yet.")
md_lines.append("- `NPCTRIG` remains the strongest compact player-trigger frontier because it is event-bearing and has two non-zero bodies (`0x0A`, `0x20`) but still no inline `0x410` literal.")
md_lines.append("- `_BOOT` event cores (`COR_BOOT`, `REE_BOOT`) remain near-template event families rather than special immortality emitters: their best pairings share only short common prefixes plus shared suffix-heavy tails.")
md_lines.append("- `SPECIAL` and `TRIGPAD` stay negative controls here: callable bodies exist, but the new literal scan still shows no inline `0x410` evidence.")
md_lines.append("")
md_lines.append("## Body Rows")
md_lines.append("")
md_lines.append("| Class | Slot | Hint | Body Range | Len | `0x0410` hits | `0x00000410` hits | `0x1004` hits | Prefix | Suffix |")
md_lines.append("|---|---:|---|---|---:|---|---|---|---|---|")
for record in body_records:
hits = record["hits"]
md_lines.append(
"| {class_name} | `0x{slot:02X}` | {event_name_hint} | `0x{body_start:04X}..0x{body_end:04X}` | {body_length} | {le16_count}:{le16_offsets} | {le32_count}:{le32_offsets} | {be16_count}:{be16_offsets} | `{first16}` | `{last16}` |".format(
class_name=record["class_name"],
slot=record["slot"],
event_name_hint=record["event_name_hint"] or "-",
body_start=record["body_start"],
body_end=record["body_end"],
body_length=record["body_length"],
le16_count=len(hits["le16_0410"]),
le16_offsets=",".join(f"0x{offset:04X}" for offset in hits["le16_0410"]) or "-",
le32_count=len(hits["le32_00000410"]),
le32_offsets=",".join(f"0x{offset:04X}" for offset in hits["le32_00000410"]) or "-",
be16_count=len(hits["le16_1004"]),
be16_offsets=",".join(f"0x{offset:04X}" for offset in hits["le16_1004"]) or "-",
first16=record["first16"],
last16=record["last16"],
)
)
md_lines.extend([
"",
"## Strongest Template Pairings",
"",
"These comparisons are limited to `COR_BOOT`, `REE_BOOT`, `NPCTRIG`, and `SFXTRIG` because they are the current highest-value active-event families near the immortality frontier.",
"",
"| Left | Right | Prefix | Suffix | Total |",
"|---|---|---:|---:|---:|",
])
for total, prefix, suffix, left_label, left_slot, left_len, right_label, right_slot, right_len in comparison_records[:12]:
md_lines.append(
f"| {left_label} `0x{left_slot:02X}` (`{left_len}`) | {right_label} `0x{right_slot:02X}` (`{right_len}`) | {prefix} | {suffix} | {total} |"
)
(out_dir / "immortality_target_body_scan.md").write_text("\n".join(md_lines) + "\n", encoding="utf-8")
def read_ascii_cstring(data: bytes, start: int, max_len: int = 48) -> tuple[str, int] | None:
end = start
limit = min(len(data), start + max_len)
while end < limit and data[end] != 0:
byte = data[end]
if not (0x20 <= byte <= 0x7E):
return None
end += 1
if end >= len(data) or end == start or data[end] != 0:
return None
return data[start:end].decode("latin-1"), end + 1
def parse_body_open_header(body: bytes) -> dict[str, object] | None:
if len(body) < 7:
return None
if body[0] == 0x5A and body[2] == 0x5C:
open_arg = body[1]
target_offset = 3
label_offset = 5
elif body[1] == 0x5C:
open_arg = body[0]
target_offset = 1
label_offset = 3
else:
return None
label_result = read_ascii_cstring(body, label_offset)
if label_result is None:
return None
label, offset = label_result
while offset < len(body) and body[offset] == 0:
offset += 1
event_code = body[offset + 1] if offset + 1 < len(body) and body[offset] == 0x0B else None
return {
"open_arg": open_arg,
"target": read_u16_le(body, target_offset),
"label": label,
"event_code": event_code,
}
def find_labeled_subheaders(body: bytes, label: str) -> list[tuple[int, int]]:
offsets: list[tuple[int, int]] = []
label_bytes = label.encode("latin-1")
marker = b"\x53\x5c"
search_start = 0
while True:
found = body.find(marker, search_start)
if found < 0 or found + 4 >= len(body):
return offsets
if body[found + 4:found + 4 + len(label_bytes)] == label_bytes:
offsets.append((found, read_u16_le(body, found + 2)))
search_start = found + 1
def scan_body_field_tokens(body: bytes, tail_window: int | None = None) -> list[str]:
tokens: list[str] = []
seen: set[str] = set()
start = max(0, len(body) - tail_window) if tail_window is not None else 0
for offset in range(start, len(body) - 4):
if body[offset] not in {0x24, 0x69}:
continue
field_result = read_ascii_cstring(body, offset + 3)
if field_result is None:
continue
field_name, _ = field_result
token = f"{body[offset]:02X}:{read_u16_le(body, offset + 1):04X}->{field_name}"
if token not in seen:
seen.add(token)
tokens.append(token)
return tokens
def format_offset_list(offsets: list[int], limit: int = 10) -> str:
if not offsets:
return ""
rendered = ",".join(f"0x{offset:04X}" for offset in offsets[:limit])
if len(offsets) > limit:
rendered += ",..."
return rendered
def scan_body_field_offsets(body: bytes, tail_window: int | None = None) -> list[tuple[int, str]]:
tokens: list[tuple[int, str]] = []
seen: set[tuple[int, str]] = set()
start = max(0, len(body) - tail_window) if tail_window is not None else 0
for offset in range(start, len(body) - 4):
if body[offset] not in {0x24, 0x69}:
continue
field_result = read_ascii_cstring(body, offset + 3)
if field_result is None:
continue
field_name, _ = field_result
token = f"{body[offset]:02X}:{read_u16_le(body, offset + 1):04X}->{field_name}"
entry = (offset, token)
if entry in seen:
continue
seen.add(entry)
tokens.append(entry)
return tokens
def count_offsets_in_range(offsets: list[int], start: int, end: int) -> int:
return sum(1 for offset in offsets if start <= offset < end)
def relative_offsets_in_range(offsets: list[int], start: int, end: int) -> list[int]:
return [offset - start for offset in offsets if start <= offset < end]
def format_relative_offsets(offsets: list[int], limit: int = 8) -> str:
if not offsets:
return "-"
rendered = ",".join(f"+0x{offset:02X}" for offset in offsets[:limit])
if len(offsets) > limit:
rendered += ",..."
return rendered
def find_repeated_windows(body: bytes, size: int, min_count: int = 2, max_results: int = 6) -> list[tuple[bytes, list[int]]]:
if size <= 0 or len(body) < size:
return []
offsets_by_window: dict[bytes, list[int]] = {}
for offset in range(0, len(body) - size + 1):
window = body[offset:offset + size]
if window.count(0) == len(window):
continue
offsets_by_window.setdefault(window, []).append(offset)
repeated = [
(window, offsets)
for window, offsets in offsets_by_window.items()
if len(offsets) >= min_count
]
repeated.sort(key=lambda item: (-len(item[1]), item[1][0], item[0]))
return repeated[:max_results]
def format_hex_window(window: bytes) -> str:
return " ".join(f"{byte:02X}" for byte in window)
def build_npctrig_clause_segments(
body: bytes,
subheaders: list[tuple[int, int]],
) -> tuple[list[tuple[str, int, int]], int]:
first_subheader = subheaders[0][0] if subheaders else 0
tail_fields = scan_body_field_offsets(body, tail_window=min(len(body), 192))
tail_start = tail_fields[0][0] if tail_fields else len(body)
if tail_start <= first_subheader:
tail_start = len(body)
segments: list[tuple[str, int, int]] = []
if first_subheader > 0:
segments.append(("prefix", 0, first_subheader))
for index, (start, _) in enumerate(subheaders):
next_start = subheaders[index + 1][0] if index + 1 < len(subheaders) else tail_start
segments.append((f"clause_{index + 1}", start, next_start))
if tail_start < len(body):
segments.append(("tail", tail_start, len(body)))
return segments, tail_start
def write_npctrig_clause_report(
out_dir: pathlib.Path,
parsed_class_chunks: list[ExtractedChunk],
rows_by_entry: dict[int, list[ClassEventRow]],
raw_data_by_entry: dict[int, bytes],
) -> None:
chunk = next((candidate for candidate in parsed_class_chunks if candidate.primary_label == "NPCTRIG"), None)
if chunk is None:
return
raw_data = raw_data_by_entry.get(chunk.index)
if raw_data is None:
return
records: list[dict[str, object]] = []
clause_motif_names = ["subheader_53_5c", "branch_3f_0a", "writeback_57_02", "push_24_51", "field_4b_fe_0f"]
for row in rows_by_entry.get(chunk.index, []):
if row.raw_code_offset == 0 or row.derived_body_start is None or row.derived_body_end is None:
continue
body = raw_data[row.derived_body_start:row.derived_body_end]
header = parse_body_open_header(body)
subheaders = find_labeled_subheaders(body, "NPCTRIG")
segments, tail_start = build_npctrig_clause_segments(body, subheaders)
motif_hits = {
motif_name: find_all_offsets(body, motif_bytes)
for motif_name, motif_bytes in IMMORTALITY_BODY_MOTIFS
}
repeated_windows_8 = find_repeated_windows(body, 8)
repeated_windows_6 = find_repeated_windows(body, 6)
tail_fields = scan_body_field_offsets(body, tail_window=min(len(body), 192))
segment_rows: list[dict[str, object]] = []
for segment_name, start, end in segments:
segment_body = body[start:end]
labels = [offset for offset in find_all_offsets(segment_body, bytes.fromhex("5B"))]
motif_offsets = {
motif_name: relative_offsets_in_range(motif_hits[motif_name], start, end)
for motif_name in clause_motif_names
}
segment_rows.append(
{
"segment": segment_name,
"start": start,
"end": end,
"length": end - start,
"prefix_hex": hex_edge(segment_body, width=16),
"suffix_hex": hex_tail(segment_body, width=12),
"local_labels": [start + offset for offset in labels[:8]],
"motif_counts": {
motif_name: count_offsets_in_range(motif_hits[motif_name], start, end)
for motif_name in clause_motif_names
},
"motif_offsets": motif_offsets,
}
)
subheader_offset_deltas = [subheaders[index + 1][0] - subheaders[index][0] for index in range(len(subheaders) - 1)]
subheader_target_deltas = [subheaders[index + 1][1] - subheaders[index][1] for index in range(len(subheaders) - 1)]
uniform_stride = subheader_offset_deltas[0] if subheader_offset_deltas and len(set(subheader_offset_deltas)) == 1 else None
full_clause_segments = [
segment
for segment in segment_rows
if segment["segment"].startswith("clause_")
and segment["motif_counts"]["push_24_51"]
and segment["motif_counts"]["writeback_57_02"]
]
records.append(
{
"slot": row.slot,
"event_name_hint": row.event_name_hint or "",
"body_length": len(body),
"header": header,
"subheaders": subheaders,
"subheader_offset_deltas": subheader_offset_deltas,
"subheader_target_deltas": subheader_target_deltas,
"segments": segment_rows,
"tail_start": tail_start,
"tail_fields": tail_fields,
"repeated_windows_8": repeated_windows_8,
"repeated_windows_6": repeated_windows_6,
"has_writeback": bool(motif_hits["writeback_57_02"]),
"has_push_2451": bool(motif_hits["push_24_51"]),
"field_4b_fe_0f_count": len(motif_hits["field_4b_fe_0f"]),
"uniform_stride": uniform_stride,
"full_clause_count": len(full_clause_segments),
"selector_offsets": [offset for offset, _ in subheaders],
"selector_targets": [target for _, target in subheaders],
}
)
if not records:
return
tsv_lines = [
"slot\tevent_name_hint\tbody_length\theader_target\theader_event_code\tsubheader_offsets\tsubheader_targets\tsubheader_offset_deltas\tsubheader_target_deltas\tuniform_stride\tfull_clause_count\ttail_start\thas_writeback\thas_push_2451\tfield_4b_fe_0f_count\trepeated_windows_8\trepeated_windows_6"
]
for record in records:
header = record["header"] or {}
tsv_lines.append(
"0x{slot:02X}\t{event_name_hint}\t{body_length}\t{header_target}\t{header_event_code}\t{subheader_offsets}\t{subheader_targets}\t{subheader_offset_deltas}\t{subheader_target_deltas}\t{uniform_stride}\t{full_clause_count}\t0x{tail_start:04X}\t{has_writeback}\t{has_push_2451}\t{field_4b_fe_0f_count}\t{repeated_windows_8}\t{repeated_windows_6}".format(
slot=record["slot"],
event_name_hint=record["event_name_hint"],
body_length=record["body_length"],
header_target=(f"0x{header['target']:04X}" if header else ""),
header_event_code=(f"0x{header['event_code']:02X}" if header and header.get("event_code") is not None else ""),
subheader_offsets=",".join(f"0x{offset:04X}" for offset, _ in record["subheaders"]),
subheader_targets=",".join(f"0x{target:04X}" for _, target in record["subheaders"]),
subheader_offset_deltas=",".join(f"0x{delta:02X}" for delta in record["subheader_offset_deltas"]),
subheader_target_deltas=",".join(f"0x{delta & 0xFFFF:04X}" for delta in record["subheader_target_deltas"]),
uniform_stride=(f"0x{record['uniform_stride']:02X}" if record["uniform_stride"] is not None else ""),
full_clause_count=record["full_clause_count"],
tail_start=record["tail_start"],
has_writeback="yes" if record["has_writeback"] else "no",
has_push_2451="yes" if record["has_push_2451"] else "no",
field_4b_fe_0f_count=record["field_4b_fe_0f_count"],
repeated_windows_8=";".join(
f"{window.hex()}@{','.join(f'0x{offset:04X}' for offset in offsets)}"
for window, offsets in record["repeated_windows_8"]
),
repeated_windows_6=";".join(
f"{window.hex()}@{','.join(f'0x{offset:04X}' for offset in offsets)}"
for window, offsets in record["repeated_windows_6"]
),
)
)
(out_dir / "immortality_npctrig_clauses.tsv").write_text("\n".join(tsv_lines) + "\n", encoding="utf-8")
md_lines = [
"# Immortality NPCTRIG Clauses",
"",
"This report focuses on the surviving compact NPCTRIG frontier and splits the extracted slot bodies into prefix, clause, and tail regions.",
"It is intended to make the slot `0x0A` versus slot `0x20` difference explicit enough to compare against the runtime-side slot-`0x0A` consumer path.",
"",
]
for record in records:
header = record["header"] or {}
md_lines.extend([
f"## NPCTRIG slot `0x{record['slot']:02X}`",
"",
f"- Event hint: `{record['event_name_hint'] or '-'}`.",
f"- Open header: `0x5A 0x{header['open_arg']:02X} 0x5C 0x{header['target']:04X}` -> `NPCTRIG` with event-code byte `{f'0x{header['event_code']:02X}' if header.get('event_code') is not None else '-'}`." if header else "- Open header: not recognized.",
f"- First tail-field offset: `0x{record['tail_start']:04X}`.",
f"- Subheader offsets: {', '.join(f'`0x{offset:04X}`' for offset, _ in record['subheaders']) or '`-`'}.",
f"- Subheader targets: {', '.join(f'`0x{target:04X}`' for _, target in record['subheaders']) or '`-`'}.",
f"- Subheader offset deltas: {', '.join(f'`0x{delta:02X}`' for delta in record['subheader_offset_deltas']) or '`-`'}.",
f"- Subheader target deltas: {', '.join(f'`0x{delta & 0xFFFF:04X}`' for delta in record['subheader_target_deltas']) or '`-`'}.",
f"- Runtime-shape motifs: `writeback_57_02={'yes' if record['has_writeback'] else 'no'}`, `push_24_51={'yes' if record['has_push_2451'] else 'no'}`, `field_4b_fe_0f={record['field_4b_fe_0f_count']}`.",
"",
"| Segment | Range | Len | Local Labels | Subheaders | Branch 3F 0A | Writeback 57 02 | Push 24 51 | Field 4B FE 0F | Motif Offsets | Prefix | Suffix |",
"|---|---|---:|---|---:|---:|---:|---:|---:|---|---|---|",
])
for segment in record["segments"]:
motif_counts = segment["motif_counts"]
motif_offsets = segment["motif_offsets"]
motif_offset_render = "; ".join(
f"{motif_name}={format_relative_offsets(offsets)}"
for motif_name, offsets in motif_offsets.items()
if offsets
) or "-"
label_render = ",".join(f"0x{offset:04X}" for offset in segment["local_labels"]) or "-"
md_lines.append(
"| {segment} | `0x{start:04X}..0x{end:04X}` | {length} | `{labels}` | {subheaders} | {branch} | {writeback} | {push_2451} | {field_4b_fe_0f} | `{motif_offsets}` | `{prefix}` | `{suffix}` |".format(
segment=segment["segment"],
start=segment["start"],
end=segment["end"],
length=segment["length"],
labels=label_render,
subheaders=motif_counts["subheader_53_5c"],
branch=motif_counts["branch_3f_0a"],
writeback=motif_counts["writeback_57_02"],
push_2451=motif_counts["push_24_51"],
field_4b_fe_0f=motif_counts["field_4b_fe_0f"],
motif_offsets=motif_offset_render,
prefix=segment["prefix_hex"],
suffix=segment["suffix_hex"],
)
)
md_lines.extend([
"",
"Repeated windows (8-byte):",
"",
])
for window, offsets in record["repeated_windows_8"]:
md_lines.append(
f"- `{format_hex_window(window)}` at {', '.join(f'`0x{offset:04X}`' for offset in offsets)}"
)
md_lines.extend([
"",
"Repeated windows (6-byte):",
"",
])
for window, offsets in record["repeated_windows_6"]:
md_lines.append(
f"- `{format_hex_window(window)}` at {', '.join(f'`0x{offset:04X}`' for offset in offsets)}"
)
md_lines.extend([
"",
"Runtime-fit candidates:",
"",
f"- Candidate clause selector starts: {', '.join(f'`0x{offset:04X}`' for offset in record['selector_offsets']) or '`-`'}.",
f"- Candidate clause selector targets: {', '.join(f'`0x{target:04X}`' for target in record['selector_targets']) or '`-`'}.",
f"- Uniform selector stride: `{f'0x{record['uniform_stride']:02X}' if record['uniform_stride'] is not None else '-'}`; full clauses carrying both `push_24_51` and `writeback_57_02`: `{record['full_clause_count']}`.",
"- Runtime side anchor: `000d:5572` proves the wrapper extra word is additive (`entity_vm_slot_load_value(...) + offset`), while `000d:21ed -> 000d:2433` copies one inline blob, reads two signed metadata bytes, then consumes a word matrix where byte A controls the lead-word row count and byte B controls the shared target-list width.",
"",
"Tail field offsets:",
"",
])
for offset, token in record["tail_fields"]:
md_lines.append(f"- `0x{offset:04X}` -> `{token}`")
md_lines.append("")
slot_0a = next((record for record in records if record["slot"] == 0x0A), None)
slot_20 = next((record for record in records if record["slot"] == 0x20), None)
if slot_0a and slot_20:
slot_0a_header = slot_0a["header"] or {}
slot_20_header = slot_20["header"] or {}
md_lines.extend([
"## Current Read",
"",
f"- Slot `0x0A` now reads as a repeated clause ladder, not a monolithic blob: `{len(slot_0a['subheaders'])}` subheaders sit on a uniform `{', '.join(f'0x{delta:02X}' for delta in slot_0a['subheader_offset_deltas']) or '-'}` byte stride, and their targets walk backward by `{', '.join(f'0x{delta & 0xFFFF:04X}' for delta in slot_0a['subheader_target_deltas']) or '-'}`. Each clause block carries one `branch_3f_0a`, one `push_24_51`, and one `writeback_57_02`, which fits an event-bearing clause stream better than a pure type filter.",
f"- Slot `0x20` is structurally different even before the tail fields: its open event-code byte is `{f'0x{slot_20_header['event_code']:02X}' if slot_20_header.get('event_code') is not None else '-'}` instead of `{f'0x{slot_0a_header['event_code']:02X}' if slot_0a_header.get('event_code') is not None else '-'}`, it has only one class-labelled subheader, no `writeback_57_02`, no `push_24_51`, and `{slot_20['field_4b_fe_0f_count']}` `field_4b_fe_0f` hits concentrated around repeated `0x0A 00/05 4B FE 0F ...` windows. That is a materially better fit for a typed gate or setup/attachment body than for the live event-emission ladder.",
"- This split matches the current runtime-side bridge better than the previous undifferentiated frontier. The verified slot-`0x0A` wrapper `0005:2c35` seeds mask `0x0400`, slot `0x0A`, and one additive word that `000d:5572` applies directly to the loaded slot value before `000d:21ed` consumes the result. The exact `000d:21ed -> 000d:22bc` contract is now narrower too: after copying the inline blob it reads two signed bytes, uses byte A as the lead-word row count, uses byte B as the shared target-list width, performs `A x B` `entity_link` calls, and pushes back only non-`0x0400` words. `NPCTRIG slot 0x0A` is the only surviving compact body here with a natural five-row selector family (`5` evenly spaced clause starts at stride `0x2F`), while slot `0x20` offers only one clause and no matching writeback/push motif.",
])
(out_dir / "immortality_npctrig_clauses.md").write_text("\n".join(md_lines) + "\n", encoding="utf-8")
def write_immortality_body_structure_report(
out_dir: pathlib.Path,
parsed_class_chunks: list[ExtractedChunk],
rows_by_entry: dict[int, list[ClassEventRow]],
raw_data_by_entry: dict[int, bytes],
) -> None:
chunk_by_label = {
chunk.primary_label: chunk
for chunk in parsed_class_chunks
if chunk.primary_label in IMMORTALITY_STRUCTURAL_TARGET_LABELS
}
records: list[dict[str, object]] = []
for label in sorted(IMMORTALITY_STRUCTURAL_TARGET_LABELS):
chunk = chunk_by_label.get(label)
if chunk is None:
continue
raw_data = raw_data_by_entry.get(chunk.index)
if raw_data is None:
continue
for row in rows_by_entry.get(chunk.index, []):
if row.raw_code_offset == 0 or row.derived_body_start is None or row.derived_body_end is None:
continue
body = raw_data[row.derived_body_start:row.derived_body_end]
header = parse_body_open_header(body)
subheaders = find_labeled_subheaders(body, label)
motif_hits = {
motif_name: find_all_offsets(body, motif_bytes)
for motif_name, motif_bytes in IMMORTALITY_BODY_MOTIFS
}
records.append(
{
"entry_index": row.entry_index,
"class_name": label,
"slot": row.slot,
"event_name_hint": row.event_name_hint or "",
"body_length": len(body),
"header": header,
"clause_terminators": body.count(0x7A),
"local_labels": body.count(0x5B),
"subheaders": subheaders,
"tail_fields": scan_body_field_tokens(body, tail_window=256),
"all_fields": scan_body_field_tokens(body),
"motif_hits": motif_hits,
}
)
tsv_lines = [
"entry_index\tclass_name\tslot\tevent_name_hint\tbody_length\theader_open_arg\theader_target\theader_label\theader_event_code\tclause_terminator_count\tlocal_label_count\tsubheader_count\tsubheader_targets\ttail_fields\tall_fields\tmotif_counts\tmotif_offsets"
]
for record in records:
header = record["header"] or {}
motif_hits = record["motif_hits"]
tsv_lines.append(
"{entry_index}\t{class_name}\t0x{slot:02X}\t{event_name_hint}\t{body_length}\t{header_open_arg}\t{header_target}\t{header_label}\t{header_event_code}\t{clause_terminators}\t{local_labels}\t{subheader_count}\t{subheader_targets}\t{tail_fields}\t{all_fields}\t{motif_counts}\t{motif_offsets}".format(
entry_index=record["entry_index"],
class_name=record["class_name"],
slot=record["slot"],
event_name_hint=record["event_name_hint"],
body_length=record["body_length"],
header_open_arg=(f"0x{header['open_arg']:02X}" if header else ""),
header_target=(f"0x{header['target']:04X}" if header else ""),
header_label=(header.get("label", "") if header else ""),
header_event_code=(f"0x{header['event_code']:02X}" if header and header.get("event_code") is not None else ""),
clause_terminators=record["clause_terminators"],
local_labels=record["local_labels"],
subheader_count=len(record["subheaders"]),
subheader_targets=",".join(
f"0x{offset:04X}->0x{target:04X}" for offset, target in record["subheaders"]
),
tail_fields=",".join(record["tail_fields"]),
all_fields=",".join(record["all_fields"]),
motif_counts=",".join(
f"{motif_name}:{len(motif_hits[motif_name])}" for motif_name, _ in IMMORTALITY_BODY_MOTIFS
),
motif_offsets=",".join(
f"{motif_name}={format_offset_list(motif_hits[motif_name])}" for motif_name, _ in IMMORTALITY_BODY_MOTIFS if motif_hits[motif_name]
),
)
)
(out_dir / "immortality_body_structure.tsv").write_text("\n".join(tsv_lines) + "\n", encoding="utf-8")
md_lines = [
"# Immortality Body Structure",
"",
"This report decodes one layer deeper than the literal scan for the surviving EVENT and NPCTRIG frontier.",
"It is still heuristic: the output is limited to repeatable byte grammar, subheader boundaries, field-tag trailers, and motif offsets that can be cross-checked against the 000d slot-backed runtime lane.",
"",
]
for record in records:
header = record["header"] or {}
motif_hits = record["motif_hits"]
md_lines.extend([
f"## {record['class_name']} slot `0x{record['slot']:02X}`",
"",
f"- Body length: `{record['body_length']}` bytes.",
f"- Open header: `0x5A 0x{header['open_arg']:02X} 0x5C 0x{header['target']:04X}` -> `{header.get('label', '')}` with embedded event-code byte `{f'0x{header['event_code']:02X}' if header.get('event_code') is not None else '-'}`." if header else "- Open header: not recognized by the current heuristic.",
f"- Clause terminators (`0x7A`): `{record['clause_terminators']}`; local labels (`0x5B`): `{record['local_labels']}`.",
f"- Internal labeled subheaders (`0x53 0x5C <u16> {record['class_name']}`): `{len(record['subheaders'])}` -> {', '.join(f'`0x{offset:04X}->0x{target:04X}`' for offset, target in record['subheaders'][:12]) or '`-`'}." ,
f"- Tail field tags: {', '.join(f'`{value}`' for value in record['tail_fields']) or '`-`' }.",
"",
"| Motif | Count | First Offsets |",
"|---|---:|---|",
])
for motif_name, _ in IMMORTALITY_BODY_MOTIFS:
offsets = motif_hits[motif_name]
md_lines.append(
f"| `{motif_name}` | {len(offsets)} | `{format_offset_list(offsets) or '-'}` |"
)
md_lines.append("")
event_slot_0a = next((record for record in records if record["class_name"] == "EVENT" and record["slot"] == 0x0A), None)
npctrig_slot_0a = next((record for record in records if record["class_name"] == "NPCTRIG" and record["slot"] == 0x0A), None)
npctrig_slot_20 = next((record for record in records if record["class_name"] == "NPCTRIG" and record["slot"] == 0x20), None)
if event_slot_0a and npctrig_slot_0a and npctrig_slot_20:
npctrig_slot_0a_header = npctrig_slot_0a.get("header") or {}
npctrig_slot_20_header = npctrig_slot_20.get("header") or {}
md_lines.extend([
"## Current Read",
"",
f"- `EVENT 0x0A` is the generic hub-shaped body: it has `{len(event_slot_0a['subheaders'])}` internal labeled subheaders and the widest field trailer (`{', '.join(event_slot_0a['tail_fields'])}`).",
f"- `NPCTRIG 0x0A` is the compact player-trigger candidate: it reuses the same class-labelled open header and subheader grammar, but it stays constrained to `{', '.join(npctrig_slot_0a['tail_fields'])}` instead of the wider EVENT field set.",
f"- `NPCTRIG 0x20` keeps the same constrained field set as `NPCTRIG 0x0A` and changes only the embedded prolog event-code byte (`{f'0x{npctrig_slot_20_header['event_code']:02X}' if npctrig_slot_20_header.get('event_code') is not None else '-'}` vs `{f'0x{npctrig_slot_0a_header['event_code']:02X}' if npctrig_slot_0a_header.get('event_code') is not None else '-'}`), which fits a variant trigger/setup lane better than a separate generic hub.",
"- The repeated `0x53 0x5C <u16> LABEL` subheaders and dense `0x5B <u16>` local labels make these bodies look like inline clause streams rather than single flat payloads, which is consistent with the `000d:21ed -> 000d:22bc` runtime lane that copies variable-length inline bytes first and only then consumes compact metadata bytes plus streamed words.",
"- The surviving slot focus is still `0x0A`: both EVENT and NPCTRIG expose non-zero slot-`0x0A` bodies, and the runtime side has an exact offset-specialized masked wrapper for slot `0x0A` at `0005:2c35` (`entity_vm_context_try_create_mask_0400_slot0a_with_offset`).",
])
(out_dir / "immortality_body_structure.md").write_text("\n".join(md_lines) + "\n", encoding="utf-8")
def readable_neighbor_chunks(
center: ExtractedChunk,
chunk_by_index: dict[int, ExtractedChunk],
@ -2211,6 +2948,9 @@ def write_summary(out_dir: pathlib.Path, input_path: pathlib.Path, data: bytes,
write_readable_template_reports(out_dir, descriptor_chunks, chunk_by_index, len(chunks))
write_runtime_bridge_reports(out_dir, descriptor_chunks, chunk_by_index, len(chunks))
write_runtime_family_bridge_reports(out_dir, descriptor_chunks)
write_immortality_target_body_scan(out_dir, parsed_class_chunks, rows_by_entry, raw_data_by_entry)
write_immortality_body_structure_report(out_dir, parsed_class_chunks, rows_by_entry, raw_data_by_entry)
write_npctrig_clause_report(out_dir, parsed_class_chunks, rows_by_entry, raw_data_by_entry)
lines = []
lines.append("# EUSECODE.FLX First-Pass Extraction")
@ -2269,6 +3009,8 @@ def write_summary(out_dir: pathlib.Path, input_path: pathlib.Path, data: bytes,
lines.append("- `vm_mask_ladder.tsv` records the current `entity_vm_context_try_create_masked_for_entity` wrapper ladder in machine-readable form so gameplay mask lanes can be compared against descriptor-side families without reopening the notes.")
lines.append("- `readable_script_ir.md` and `readable_script_ir.tsv` join descriptor neighborhoods, the verified VM IR, the runtime owner/source path, and the current mask-family hints into one conservative script-facing bridge artifact.")
lines.append("- `runtime_descriptor_family_rankings.md` and `runtime_descriptor_family_rankings.tsv` rank descriptor families against the verified runtime lanes so the current human-readable script bridge is searchable by family fit rather than only by neighborhood dumps.")
lines.append("- `immortality_target_body_scan.md` and `immortality_target_body_scan.tsv` now scan the strongest current immortality candidates (`EVENT`, `NPCTRIG`, `_BOOT`, `SFXTRIG`, `SPECIAL`, `TRIGPAD`) for inline `0x410` literals and record the tightest remaining active-event template frontier.")
lines.append("- `immortality_npctrig_clauses.md` and `immortality_npctrig_clauses.tsv` now split the compact `NPCTRIG` slot `0x0A` / `0x20` bodies into prefix, clause, and tail regions so the event-bearing ladder can be compared against the typed/setup companion body without reopening raw hex.")
(out_dir / "README.md").write_text("\n".join(lines) + "\n", encoding="utf-8")
all_strings = iter_printable_runs(data)