This commit introduces a new script, `apply_ne_far_call_fixups.py`, which processes internal and import far call fixups from TSV files. The script includes functions to parse addresses, load fixup data, patch internal far calls, and annotate import calls. It handles conflicts in fixup mappings and provides detailed output on the applied patches and any skipped instructions. The script is designed to enhance the handling of NE format far calls in Ghidra.
160 lines
No EOL
5.9 KiB
Python
160 lines
No EOL
5.9 KiB
Python
from __future__ import annotations
|
|
|
|
import csv
|
|
from pathlib import Path
|
|
|
|
from ghidra.app.cmd.disassemble import DisassembleCommand
|
|
from ghidra.util.task import ConsoleTaskMonitor
|
|
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[1]
|
|
INTERNAL_FIXUPS_TSV = REPO_ROOT / "ne_reloc_far_calls.tsv"
|
|
IMPORT_FIXUPS_TSV = REPO_ROOT / "ne_reloc_far_imports.tsv"
|
|
|
|
|
|
def parse_address_text(address_text: str) -> int:
|
|
segment_text, offset_text = address_text.split(":", 1)
|
|
return (int(segment_text, 16) << 16) + int(offset_text, 16)
|
|
|
|
|
|
def to_address(address_text: str):
|
|
address_space = program.getAddressFactory().getDefaultAddressSpace()
|
|
return address_space.getAddress(parse_address_text(address_text))
|
|
|
|
|
|
def parse_seg_off(address_text: str) -> tuple[int, int]:
|
|
segment_text, offset_text = address_text.split(":", 1)
|
|
return int(segment_text, 16), int(offset_text, 16)
|
|
|
|
|
|
def signed_byte(value: int) -> int:
|
|
return value if value < 0x80 else value - 0x100
|
|
|
|
|
|
def load_internal_fixups() -> dict[str, tuple[str, str]]:
|
|
rows: dict[str, tuple[str, str]] = {}
|
|
with INTERNAL_FIXUPS_TSV.open("r", encoding="utf-8", newline="") as handle:
|
|
reader = csv.DictReader(handle, delimiter="\t")
|
|
for row in reader:
|
|
source = row["source_ghidra"].strip()
|
|
target = row["target_ghidra"].strip()
|
|
label = row["target_label"].strip()
|
|
previous = rows.get(source)
|
|
if previous is not None and previous != (target, label):
|
|
raise RuntimeError(
|
|
f"conflicting internal fixup mapping for {source}: {previous} vs {(target, label)}"
|
|
)
|
|
rows[source] = (target, label)
|
|
return rows
|
|
|
|
|
|
def load_import_fixups() -> dict[str, str]:
|
|
rows: dict[str, str] = {}
|
|
with IMPORT_FIXUPS_TSV.open("r", encoding="utf-8", newline="") as handle:
|
|
reader = csv.DictReader(handle, delimiter="\t")
|
|
for row in reader:
|
|
source = row["source_ghidra"].strip()
|
|
target = row["target"].strip()
|
|
previous = rows.get(source)
|
|
if previous is not None and previous != target:
|
|
raise RuntimeError(
|
|
f"conflicting import fixup mapping for {source}: {previous} vs {target}"
|
|
)
|
|
rows[source] = target
|
|
return rows
|
|
|
|
|
|
def patch_internal_far_call(operand_address_text: str, target_address_text: str, label: str) -> str | None:
|
|
memory = program.getMemory()
|
|
listing = program.getListing()
|
|
|
|
operand_address = to_address(operand_address_text)
|
|
instruction_address = operand_address.previous()
|
|
instruction_end = instruction_address.add(4)
|
|
|
|
opcode = int(memory.getByte(instruction_address)) & 0xFF
|
|
if opcode != 0x9A:
|
|
return None
|
|
|
|
listing.clearCodeUnits(instruction_address, instruction_end, False)
|
|
|
|
target_segment, target_offset = parse_seg_off(target_address_text)
|
|
operand_bytes = [
|
|
target_offset & 0xFF,
|
|
(target_offset >> 8) & 0xFF,
|
|
target_segment & 0xFF,
|
|
(target_segment >> 8) & 0xFF,
|
|
]
|
|
|
|
for index, byte_value in enumerate(operand_bytes):
|
|
memory.setByte(operand_address.add(index), signed_byte(byte_value))
|
|
|
|
command = DisassembleCommand(instruction_address, None, True)
|
|
if not command.applyTo(program, ConsoleTaskMonitor()):
|
|
raise RuntimeError(f"failed to re-disassemble patched CALLF at {instruction_address}")
|
|
|
|
comment = f"NE FIXUP APPLIED -> {target_address_text} ({label})"
|
|
helpers["set_comment"](program, str(instruction_address), comment, "eol")
|
|
return str(instruction_address)
|
|
|
|
|
|
def annotate_import_far_call(operand_address_text: str, import_target: str) -> str:
|
|
instruction_address = to_address(operand_address_text).previous()
|
|
helpers["set_comment"](program, str(instruction_address), f"NE IMPORT -> {import_target}", "eol")
|
|
return str(instruction_address)
|
|
|
|
|
|
def main() -> None:
|
|
internal_fixups = load_internal_fixups()
|
|
import_fixups = load_import_fixups()
|
|
|
|
applied_internal = 0
|
|
skipped_non_call_internal = 0
|
|
annotated_imports = 0
|
|
internal_examples: list[str] = []
|
|
skipped_examples: list[str] = []
|
|
import_examples: list[str] = []
|
|
|
|
transaction_id = program.startTransaction("Apply NE far-call fixups")
|
|
commit = False
|
|
try:
|
|
for source, (target, label) in sorted(internal_fixups.items()):
|
|
instruction_address = patch_internal_far_call(source, target, label)
|
|
if instruction_address is None:
|
|
skipped_non_call_internal += 1
|
|
if len(skipped_examples) < 10:
|
|
skipped_examples.append(f"{source} -> {target} ({label})")
|
|
continue
|
|
applied_internal += 1
|
|
if len(internal_examples) < 10:
|
|
internal_examples.append(f"{instruction_address} -> {target} ({label})")
|
|
|
|
for source, target in sorted(import_fixups.items()):
|
|
instruction_address = annotate_import_far_call(source, target)
|
|
annotated_imports += 1
|
|
if len(import_examples) < 10:
|
|
import_examples.append(f"{instruction_address} -> {target}")
|
|
|
|
helpers["set_comment"](
|
|
program,
|
|
"0000:ffff",
|
|
"NE fixup placeholder only. Internal far calls have been patched from the verified relocation table; remaining xrefs, if any, are import callsites or unresolved non-internal cases.",
|
|
"pre",
|
|
)
|
|
|
|
commit = True
|
|
finally:
|
|
program.endTransaction(transaction_id, commit)
|
|
|
|
print(f"Applied internal far-call patches: {applied_internal}")
|
|
for line in internal_examples:
|
|
print(f" {line}")
|
|
print(f"Skipped internal far-ptr fixups that were not literal CALLF instructions: {skipped_non_call_internal}")
|
|
for line in skipped_examples:
|
|
print(f" {line}")
|
|
print(f"Annotated import far calls: {annotated_imports}")
|
|
for line in import_examples:
|
|
print(f" {line}")
|
|
|
|
|
|
main() |