Crusader_Decomp/pyghidra_plans/apply_ne_far_call_fixups.py
MaddoScientisto 55b3187469 Add script to apply NE far call fixups from TSV files
This commit introduces a new script, `apply_ne_far_call_fixups.py`, which processes internal and import far call fixups from TSV files. The script includes functions to parse addresses, load fixup data, patch internal far calls, and annotate import calls. It handles conflicts in fixup mappings and provides detailed output on the applied patches and any skipped instructions. The script is designed to enhance the handling of NE format far calls in Ghidra.
2026-03-21 13:25:21 +01:00

160 lines
No EOL
5.9 KiB
Python

from __future__ import annotations
import csv
from pathlib import Path
from ghidra.app.cmd.disassemble import DisassembleCommand
from ghidra.util.task import ConsoleTaskMonitor
REPO_ROOT = Path(__file__).resolve().parents[1]
INTERNAL_FIXUPS_TSV = REPO_ROOT / "ne_reloc_far_calls.tsv"
IMPORT_FIXUPS_TSV = REPO_ROOT / "ne_reloc_far_imports.tsv"
def parse_address_text(address_text: str) -> int:
segment_text, offset_text = address_text.split(":", 1)
return (int(segment_text, 16) << 16) + int(offset_text, 16)
def to_address(address_text: str):
address_space = program.getAddressFactory().getDefaultAddressSpace()
return address_space.getAddress(parse_address_text(address_text))
def parse_seg_off(address_text: str) -> tuple[int, int]:
segment_text, offset_text = address_text.split(":", 1)
return int(segment_text, 16), int(offset_text, 16)
def signed_byte(value: int) -> int:
return value if value < 0x80 else value - 0x100
def load_internal_fixups() -> dict[str, tuple[str, str]]:
rows: dict[str, tuple[str, str]] = {}
with INTERNAL_FIXUPS_TSV.open("r", encoding="utf-8", newline="") as handle:
reader = csv.DictReader(handle, delimiter="\t")
for row in reader:
source = row["source_ghidra"].strip()
target = row["target_ghidra"].strip()
label = row["target_label"].strip()
previous = rows.get(source)
if previous is not None and previous != (target, label):
raise RuntimeError(
f"conflicting internal fixup mapping for {source}: {previous} vs {(target, label)}"
)
rows[source] = (target, label)
return rows
def load_import_fixups() -> dict[str, str]:
rows: dict[str, str] = {}
with IMPORT_FIXUPS_TSV.open("r", encoding="utf-8", newline="") as handle:
reader = csv.DictReader(handle, delimiter="\t")
for row in reader:
source = row["source_ghidra"].strip()
target = row["target"].strip()
previous = rows.get(source)
if previous is not None and previous != target:
raise RuntimeError(
f"conflicting import fixup mapping for {source}: {previous} vs {target}"
)
rows[source] = target
return rows
def patch_internal_far_call(operand_address_text: str, target_address_text: str, label: str) -> str | None:
memory = program.getMemory()
listing = program.getListing()
operand_address = to_address(operand_address_text)
instruction_address = operand_address.previous()
instruction_end = instruction_address.add(4)
opcode = int(memory.getByte(instruction_address)) & 0xFF
if opcode != 0x9A:
return None
listing.clearCodeUnits(instruction_address, instruction_end, False)
target_segment, target_offset = parse_seg_off(target_address_text)
operand_bytes = [
target_offset & 0xFF,
(target_offset >> 8) & 0xFF,
target_segment & 0xFF,
(target_segment >> 8) & 0xFF,
]
for index, byte_value in enumerate(operand_bytes):
memory.setByte(operand_address.add(index), signed_byte(byte_value))
command = DisassembleCommand(instruction_address, None, True)
if not command.applyTo(program, ConsoleTaskMonitor()):
raise RuntimeError(f"failed to re-disassemble patched CALLF at {instruction_address}")
comment = f"NE FIXUP APPLIED -> {target_address_text} ({label})"
helpers["set_comment"](program, str(instruction_address), comment, "eol")
return str(instruction_address)
def annotate_import_far_call(operand_address_text: str, import_target: str) -> str:
instruction_address = to_address(operand_address_text).previous()
helpers["set_comment"](program, str(instruction_address), f"NE IMPORT -> {import_target}", "eol")
return str(instruction_address)
def main() -> None:
internal_fixups = load_internal_fixups()
import_fixups = load_import_fixups()
applied_internal = 0
skipped_non_call_internal = 0
annotated_imports = 0
internal_examples: list[str] = []
skipped_examples: list[str] = []
import_examples: list[str] = []
transaction_id = program.startTransaction("Apply NE far-call fixups")
commit = False
try:
for source, (target, label) in sorted(internal_fixups.items()):
instruction_address = patch_internal_far_call(source, target, label)
if instruction_address is None:
skipped_non_call_internal += 1
if len(skipped_examples) < 10:
skipped_examples.append(f"{source} -> {target} ({label})")
continue
applied_internal += 1
if len(internal_examples) < 10:
internal_examples.append(f"{instruction_address} -> {target} ({label})")
for source, target in sorted(import_fixups.items()):
instruction_address = annotate_import_far_call(source, target)
annotated_imports += 1
if len(import_examples) < 10:
import_examples.append(f"{instruction_address} -> {target}")
helpers["set_comment"](
program,
"0000:ffff",
"NE fixup placeholder only. Internal far calls have been patched from the verified relocation table; remaining xrefs, if any, are import callsites or unresolved non-internal cases.",
"pre",
)
commit = True
finally:
program.endTransaction(transaction_id, commit)
print(f"Applied internal far-call patches: {applied_internal}")
for line in internal_examples:
print(f" {line}")
print(f"Skipped internal far-ptr fixups that were not literal CALLF instructions: {skipped_non_call_internal}")
for line in skipped_examples:
print(f" {line}")
print(f"Annotated import far calls: {annotated_imports}")
for line in import_examples:
print(f" {line}")
main()