from __future__ import annotations import argparse import csv from collections import defaultdict from dataclasses import dataclass from pathlib import Path import sys from typing import Any REPO_ROOT = Path(__file__).resolve().parents[1] if str(REPO_ROOT) not in sys.path: sys.path.insert(0, str(REPO_ROOT)) from tools.poc_crusader_usecode_parser import ( build_listing_labels, default_shape_catalog_path, format_script_statement, load_shape_catalog, parse_body_ir, render_pseudocode, ) @dataclass class ClassManifestRow: class_name: str entry_index: int body_count: int debug_line_count: int mapped_line_count: int collision_count: int output_path: str def parse_int(value: str) -> int: return int(value, 0) def load_rows(path: Path) -> list[dict[str, str]]: with path.open("r", encoding="utf-8", newline="") as handle: return list(csv.DictReader(handle, delimiter="\t")) def load_layout_by_entry(path: Path) -> dict[int, dict[str, str]]: layout_by_entry: dict[int, dict[str, str]] = {} for row in load_rows(path): entry_index = row.get("entry_index") if not entry_index: continue try: layout_by_entry[parse_int(entry_index)] = row except ValueError: continue return layout_by_entry def safe_unit_name(name: str) -> str: filtered = "".join(char for char in name.strip().upper() if char.isalnum() or char == "_") if not filtered: return "UNKNOWN" return filtered[:8] def chunked(items: list[str], limit: int) -> list[str]: return [items[index:index + limit] for index in range(0, len(items), limit)] def summarize_line_ops(ir: dict[str, Any], ops: list[dict[str, Any]]) -> str: label_map = build_listing_labels(ir) body_start = ir["event"]["derived_body_start"] low_signal_prefixes = ("push_",) low_signal_mnemonics = { "add_sp", "init", "line_number", "symbol_info", "word_to_dword", "dword_to_word", "copy_string", "ptr_to_string", "str_to_ptr", } rendered = [ format_script_statement(op, label_map, body_start) for op in ops if op["mnemonic"] not in low_signal_mnemonics ] preferred = [ statement for statement, op in zip(rendered, [op for op in ops if op["mnemonic"] not in low_signal_mnemonics], strict=False) if not op["mnemonic"].startswith(low_signal_prefixes) ] selected = preferred or rendered if not selected: selected = [ format_script_statement(op, label_map, body_start) for op in ops if op["mnemonic"] != "line_number" ] if not selected: return "" summary = " ; ".join(selected[:3]) if len(selected) > 3: summary += " ; ..." return summary def group_ops_by_debug_line(ir: dict[str, Any]) -> tuple[dict[int, list[dict[str, Any]]], list[int]]: grouped: dict[int, list[dict[str, Any]]] = defaultdict(list) seen_lines: list[int] = [] current_line: int | None = None for op in ir["ops"]: if op["mnemonic"] == "line_number": current_line = op["operands"]["line_number"] if current_line not in grouped: seen_lines.append(current_line) continue if current_line is None: continue grouped[current_line].append(op) return grouped, sorted(seen_lines) def build_sparse_lines_for_ir(ir: dict[str, Any]) -> tuple[dict[int, str], int]: grouped, debug_lines = group_ops_by_debug_line(ir) sparse_lines: dict[int, str] = {} for line_number in debug_lines: summary = summarize_line_ops(ir, grouped.get(line_number, [])) slot = ir["event"]["slot"] slot_name = ir["event"]["event_name_hint"] or f"slot_{slot:02X}" prefix = f"[{slot:02X}:{slot_name}]" sparse_lines[line_number] = f"{prefix} {summary}".rstrip() return sparse_lines, len(debug_lines) def render_pseudocode_appendix(class_name: str, irs: list[dict[str, Any]], shape_catalog: dict[int, str]) -> list[str]: lines = [ "", f"/* synthesized appendix for {class_name} */", "/* sparse lines above preserve recovered debugger line numbers where available */", ] for ir in sorted(irs, key=lambda value: value["event"]["slot"]): slot = ir["event"]["slot"] slot_name = ir["event"]["event_name_hint"] or f"slot_{slot:02X}" lines.extend([ "", f"/* ===== slot 0x{slot:02X} {slot_name} ===== */", ]) lines.extend(render_pseudocode(ir, shape_catalog=shape_catalog).rstrip("\n").splitlines()) lines.append("") return lines def write_manifest(path: Path, rows: list[ClassManifestRow]) -> None: with path.open("w", encoding="utf-8", newline="") as handle: writer = csv.writer(handle, delimiter="\t") writer.writerow([ "class_name", "entry_index", "body_count", "debug_line_count", "mapped_line_count", "collision_count", "output_path", ]) for row in rows: writer.writerow([ row.class_name, row.entry_index, row.body_count, row.debug_line_count, row.mapped_line_count, row.collision_count, row.output_path, ]) def main() -> None: parser = argparse.ArgumentParser(description="Generate synthesized .unk files from extracted Crusader USECODE data") parser.add_argument( "--extracted-root", required=True, help="Extracted USECODE root containing class_event_index.tsv and class_layout_index.tsv", ) parser.add_argument( "--output-dir", required=True, help="Directory that will receive generated .unk files", ) parser.add_argument( "--variant", choices=["auto", "regret", "remorse"], default="auto", help="Intrinsic numbering variant to apply during decompilation", ) parser.add_argument( "--shape-csv", help="Optional shape catalog CSV; defaults to the catalog matching the extracted root and variant", ) parser.add_argument( "--class-filter", action="append", default=[], help="Limit generation to one or more class names", ) parser.add_argument( "--overwrite", action="store_true", help="Overwrite existing .unk files in the output directory", ) args = parser.parse_args() extracted_root = Path(args.extracted_root) output_dir = Path(args.output_dir) output_dir.mkdir(parents=True, exist_ok=True) class_event_index = extracted_root / "class_event_index.tsv" class_layout_index = extracted_root / "class_layout_index.tsv" event_rows = load_rows(class_event_index) layout_by_entry = load_layout_by_entry(class_layout_index) allowed_classes = {value.upper() for value in args.class_filter} shape_csv = Path(args.shape_csv) if args.shape_csv else default_shape_catalog_path(extracted_root, args.variant) shape_catalog = load_shape_catalog(shape_csv) rows_by_class: dict[str, list[dict[str, str]]] = defaultdict(list) for row in event_rows: if not row.get("derived_body_start") or not row.get("derived_body_end"): continue class_name = row.get("class_name_hint", "") if not class_name: continue if allowed_classes and class_name.upper() not in allowed_classes: continue rows_by_class[class_name].append(row) manifest_rows: list[ClassManifestRow] = [] for class_name, rows in sorted(rows_by_class.items()): irs: list[dict[str, Any]] = [] sparse_map: dict[int, list[str]] = defaultdict(list) debug_line_count = 0 collision_count = 0 entry_index = parse_int(rows[0]["entry_index"]) for row in sorted(rows, key=lambda value: parse_int(value["slot"])): layout_row = layout_by_entry.get(parse_int(row["entry_index"])) if layout_row is None: continue ir = parse_body_ir(row, layout_row, None if args.variant == "auto" else args.variant, extracted_root) irs.append(ir) slot_lines, slot_debug_count = build_sparse_lines_for_ir(ir) debug_line_count += slot_debug_count for line_number, content in slot_lines.items(): if sparse_map[line_number]: collision_count += 1 sparse_map[line_number].append(content) if not irs: continue max_line = max(sparse_map) if sparse_map else 0 output_lines = [""] * max_line for line_number in sorted(sparse_map): output_lines[line_number - 1] = " || ".join(sparse_map[line_number]) output_lines.extend(render_pseudocode_appendix(class_name, irs, shape_catalog)) output_path = output_dir / f"{safe_unit_name(class_name)}.unk" if output_path.exists() and not args.overwrite: raise FileExistsError(f"Refusing to overwrite existing file: {output_path}") output_path.write_text("\n".join(output_lines).rstrip("\n") + "\n", encoding="ascii", errors="replace") manifest_rows.append( ClassManifestRow( class_name=class_name, entry_index=entry_index, body_count=len(irs), debug_line_count=debug_line_count, mapped_line_count=len(sparse_map), collision_count=collision_count, output_path=output_path.name, ) ) write_manifest(output_dir / "SYNTH_UNK_MANIFEST.tsv", manifest_rows) if __name__ == "__main__": main()