from __future__ import annotations import csv from pathlib import Path from ghidra.app.cmd.disassemble import DisassembleCommand from ghidra.util.task import ConsoleTaskMonitor REPO_ROOT = Path(__file__).resolve().parents[1] INTERNAL_FIXUPS_TSV = REPO_ROOT / "ne_reloc_far_calls.tsv" IMPORT_FIXUPS_TSV = REPO_ROOT / "ne_reloc_far_imports.tsv" def parse_address_text(address_text: str) -> int: segment_text, offset_text = address_text.split(":", 1) return (int(segment_text, 16) << 16) + int(offset_text, 16) def to_address(address_text: str): address_space = program.getAddressFactory().getDefaultAddressSpace() return address_space.getAddress(parse_address_text(address_text)) def parse_seg_off(address_text: str) -> tuple[int, int]: segment_text, offset_text = address_text.split(":", 1) return int(segment_text, 16), int(offset_text, 16) def signed_byte(value: int) -> int: return value if value < 0x80 else value - 0x100 def load_internal_fixups() -> dict[str, tuple[str, str]]: rows: dict[str, tuple[str, str]] = {} with INTERNAL_FIXUPS_TSV.open("r", encoding="utf-8", newline="") as handle: reader = csv.DictReader(handle, delimiter="\t") for row in reader: source = row["source_ghidra"].strip() target = row["target_ghidra"].strip() label = row["target_label"].strip() previous = rows.get(source) if previous is not None and previous != (target, label): raise RuntimeError( f"conflicting internal fixup mapping for {source}: {previous} vs {(target, label)}" ) rows[source] = (target, label) return rows def load_import_fixups() -> dict[str, str]: rows: dict[str, str] = {} with IMPORT_FIXUPS_TSV.open("r", encoding="utf-8", newline="") as handle: reader = csv.DictReader(handle, delimiter="\t") for row in reader: source = row["source_ghidra"].strip() target = row["target"].strip() previous = rows.get(source) if previous is not None and previous != target: raise RuntimeError( f"conflicting import fixup mapping for {source}: {previous} vs {target}" ) rows[source] = target return rows def patch_internal_far_call(operand_address_text: str, target_address_text: str, label: str) -> str | None: memory = program.getMemory() listing = program.getListing() operand_address = to_address(operand_address_text) instruction_address = operand_address.previous() instruction_end = instruction_address.add(4) opcode = int(memory.getByte(instruction_address)) & 0xFF if opcode != 0x9A: return None listing.clearCodeUnits(instruction_address, instruction_end, False) target_segment, target_offset = parse_seg_off(target_address_text) operand_bytes = [ target_offset & 0xFF, (target_offset >> 8) & 0xFF, target_segment & 0xFF, (target_segment >> 8) & 0xFF, ] for index, byte_value in enumerate(operand_bytes): memory.setByte(operand_address.add(index), signed_byte(byte_value)) command = DisassembleCommand(instruction_address, None, True) if not command.applyTo(program, ConsoleTaskMonitor()): raise RuntimeError(f"failed to re-disassemble patched CALLF at {instruction_address}") comment = f"NE FIXUP APPLIED -> {target_address_text} ({label})" helpers["set_comment"](program, str(instruction_address), comment, "eol") return str(instruction_address) def annotate_import_far_call(operand_address_text: str, import_target: str) -> str: instruction_address = to_address(operand_address_text).previous() helpers["set_comment"](program, str(instruction_address), f"NE IMPORT -> {import_target}", "eol") return str(instruction_address) def main() -> None: internal_fixups = load_internal_fixups() import_fixups = load_import_fixups() applied_internal = 0 skipped_non_call_internal = 0 annotated_imports = 0 internal_examples: list[str] = [] skipped_examples: list[str] = [] import_examples: list[str] = [] transaction_id = program.startTransaction("Apply NE far-call fixups") commit = False try: for source, (target, label) in sorted(internal_fixups.items()): instruction_address = patch_internal_far_call(source, target, label) if instruction_address is None: skipped_non_call_internal += 1 if len(skipped_examples) < 10: skipped_examples.append(f"{source} -> {target} ({label})") continue applied_internal += 1 if len(internal_examples) < 10: internal_examples.append(f"{instruction_address} -> {target} ({label})") for source, target in sorted(import_fixups.items()): instruction_address = annotate_import_far_call(source, target) annotated_imports += 1 if len(import_examples) < 10: import_examples.append(f"{instruction_address} -> {target}") helpers["set_comment"]( program, "0000:ffff", "NE fixup placeholder only. Internal far calls have been patched from the verified relocation table; remaining xrefs, if any, are import callsites or unresolved non-internal cases.", "pre", ) commit = True finally: program.endTransaction(transaction_id, commit) print(f"Applied internal far-call patches: {applied_internal}") for line in internal_examples: print(f" {line}") print(f"Skipped internal far-ptr fixups that were not literal CALLF instructions: {skipped_non_call_internal}") for line in skipped_examples: print(f" {line}") print(f"Annotated import far calls: {annotated_imports}") for line in import_examples: print(f" {line}") main()