Crusader_Decomp/pyghidra_plans/apply_ne_far_call_fixups.py

160 lines
5.9 KiB
Python
Raw Permalink Normal View History

from __future__ import annotations
import csv
from pathlib import Path
from ghidra.app.cmd.disassemble import DisassembleCommand
from ghidra.util.task import ConsoleTaskMonitor
REPO_ROOT = Path(__file__).resolve().parents[1]
INTERNAL_FIXUPS_TSV = REPO_ROOT / "ne_reloc_far_calls.tsv"
IMPORT_FIXUPS_TSV = REPO_ROOT / "ne_reloc_far_imports.tsv"
def parse_address_text(address_text: str) -> int:
segment_text, offset_text = address_text.split(":", 1)
return (int(segment_text, 16) << 16) + int(offset_text, 16)
def to_address(address_text: str):
address_space = program.getAddressFactory().getDefaultAddressSpace()
return address_space.getAddress(parse_address_text(address_text))
def parse_seg_off(address_text: str) -> tuple[int, int]:
segment_text, offset_text = address_text.split(":", 1)
return int(segment_text, 16), int(offset_text, 16)
def signed_byte(value: int) -> int:
return value if value < 0x80 else value - 0x100
def load_internal_fixups() -> dict[str, tuple[str, str]]:
rows: dict[str, tuple[str, str]] = {}
with INTERNAL_FIXUPS_TSV.open("r", encoding="utf-8", newline="") as handle:
reader = csv.DictReader(handle, delimiter="\t")
for row in reader:
source = row["source_ghidra"].strip()
target = row["target_ghidra"].strip()
label = row["target_label"].strip()
previous = rows.get(source)
if previous is not None and previous != (target, label):
raise RuntimeError(
f"conflicting internal fixup mapping for {source}: {previous} vs {(target, label)}"
)
rows[source] = (target, label)
return rows
def load_import_fixups() -> dict[str, str]:
rows: dict[str, str] = {}
with IMPORT_FIXUPS_TSV.open("r", encoding="utf-8", newline="") as handle:
reader = csv.DictReader(handle, delimiter="\t")
for row in reader:
source = row["source_ghidra"].strip()
target = row["target"].strip()
previous = rows.get(source)
if previous is not None and previous != target:
raise RuntimeError(
f"conflicting import fixup mapping for {source}: {previous} vs {target}"
)
rows[source] = target
return rows
def patch_internal_far_call(operand_address_text: str, target_address_text: str, label: str) -> str | None:
memory = program.getMemory()
listing = program.getListing()
operand_address = to_address(operand_address_text)
instruction_address = operand_address.previous()
instruction_end = instruction_address.add(4)
opcode = int(memory.getByte(instruction_address)) & 0xFF
if opcode != 0x9A:
return None
listing.clearCodeUnits(instruction_address, instruction_end, False)
target_segment, target_offset = parse_seg_off(target_address_text)
operand_bytes = [
target_offset & 0xFF,
(target_offset >> 8) & 0xFF,
target_segment & 0xFF,
(target_segment >> 8) & 0xFF,
]
for index, byte_value in enumerate(operand_bytes):
memory.setByte(operand_address.add(index), signed_byte(byte_value))
command = DisassembleCommand(instruction_address, None, True)
if not command.applyTo(program, ConsoleTaskMonitor()):
raise RuntimeError(f"failed to re-disassemble patched CALLF at {instruction_address}")
comment = f"NE FIXUP APPLIED -> {target_address_text} ({label})"
helpers["set_comment"](program, str(instruction_address), comment, "eol")
return str(instruction_address)
def annotate_import_far_call(operand_address_text: str, import_target: str) -> str:
instruction_address = to_address(operand_address_text).previous()
helpers["set_comment"](program, str(instruction_address), f"NE IMPORT -> {import_target}", "eol")
return str(instruction_address)
def main() -> None:
internal_fixups = load_internal_fixups()
import_fixups = load_import_fixups()
applied_internal = 0
skipped_non_call_internal = 0
annotated_imports = 0
internal_examples: list[str] = []
skipped_examples: list[str] = []
import_examples: list[str] = []
transaction_id = program.startTransaction("Apply NE far-call fixups")
commit = False
try:
for source, (target, label) in sorted(internal_fixups.items()):
instruction_address = patch_internal_far_call(source, target, label)
if instruction_address is None:
skipped_non_call_internal += 1
if len(skipped_examples) < 10:
skipped_examples.append(f"{source} -> {target} ({label})")
continue
applied_internal += 1
if len(internal_examples) < 10:
internal_examples.append(f"{instruction_address} -> {target} ({label})")
for source, target in sorted(import_fixups.items()):
instruction_address = annotate_import_far_call(source, target)
annotated_imports += 1
if len(import_examples) < 10:
import_examples.append(f"{instruction_address} -> {target}")
helpers["set_comment"](
program,
"0000:ffff",
"NE fixup placeholder only. Internal far calls have been patched from the verified relocation table; remaining xrefs, if any, are import callsites or unresolved non-internal cases.",
"pre",
)
commit = True
finally:
program.endTransaction(transaction_id, commit)
print(f"Applied internal far-call patches: {applied_internal}")
for line in internal_examples:
print(f" {line}")
print(f"Skipped internal far-ptr fixups that were not literal CALLF instructions: {skipped_non_call_internal}")
for line in skipped_examples:
print(f" {line}")
print(f"Annotated import far calls: {annotated_imports}")
for line in import_examples:
print(f" {line}")
main()