160 lines
5.9 KiB
Python
160 lines
5.9 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import csv
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
from ghidra.app.cmd.disassemble import DisassembleCommand
|
||
|
|
from ghidra.util.task import ConsoleTaskMonitor
|
||
|
|
|
||
|
|
|
||
|
|
REPO_ROOT = Path(__file__).resolve().parents[1]
|
||
|
|
INTERNAL_FIXUPS_TSV = REPO_ROOT / "ne_reloc_far_calls.tsv"
|
||
|
|
IMPORT_FIXUPS_TSV = REPO_ROOT / "ne_reloc_far_imports.tsv"
|
||
|
|
|
||
|
|
|
||
|
|
def parse_address_text(address_text: str) -> int:
|
||
|
|
segment_text, offset_text = address_text.split(":", 1)
|
||
|
|
return (int(segment_text, 16) << 16) + int(offset_text, 16)
|
||
|
|
|
||
|
|
|
||
|
|
def to_address(address_text: str):
|
||
|
|
address_space = program.getAddressFactory().getDefaultAddressSpace()
|
||
|
|
return address_space.getAddress(parse_address_text(address_text))
|
||
|
|
|
||
|
|
|
||
|
|
def parse_seg_off(address_text: str) -> tuple[int, int]:
|
||
|
|
segment_text, offset_text = address_text.split(":", 1)
|
||
|
|
return int(segment_text, 16), int(offset_text, 16)
|
||
|
|
|
||
|
|
|
||
|
|
def signed_byte(value: int) -> int:
|
||
|
|
return value if value < 0x80 else value - 0x100
|
||
|
|
|
||
|
|
|
||
|
|
def load_internal_fixups() -> dict[str, tuple[str, str]]:
|
||
|
|
rows: dict[str, tuple[str, str]] = {}
|
||
|
|
with INTERNAL_FIXUPS_TSV.open("r", encoding="utf-8", newline="") as handle:
|
||
|
|
reader = csv.DictReader(handle, delimiter="\t")
|
||
|
|
for row in reader:
|
||
|
|
source = row["source_ghidra"].strip()
|
||
|
|
target = row["target_ghidra"].strip()
|
||
|
|
label = row["target_label"].strip()
|
||
|
|
previous = rows.get(source)
|
||
|
|
if previous is not None and previous != (target, label):
|
||
|
|
raise RuntimeError(
|
||
|
|
f"conflicting internal fixup mapping for {source}: {previous} vs {(target, label)}"
|
||
|
|
)
|
||
|
|
rows[source] = (target, label)
|
||
|
|
return rows
|
||
|
|
|
||
|
|
|
||
|
|
def load_import_fixups() -> dict[str, str]:
|
||
|
|
rows: dict[str, str] = {}
|
||
|
|
with IMPORT_FIXUPS_TSV.open("r", encoding="utf-8", newline="") as handle:
|
||
|
|
reader = csv.DictReader(handle, delimiter="\t")
|
||
|
|
for row in reader:
|
||
|
|
source = row["source_ghidra"].strip()
|
||
|
|
target = row["target"].strip()
|
||
|
|
previous = rows.get(source)
|
||
|
|
if previous is not None and previous != target:
|
||
|
|
raise RuntimeError(
|
||
|
|
f"conflicting import fixup mapping for {source}: {previous} vs {target}"
|
||
|
|
)
|
||
|
|
rows[source] = target
|
||
|
|
return rows
|
||
|
|
|
||
|
|
|
||
|
|
def patch_internal_far_call(operand_address_text: str, target_address_text: str, label: str) -> str | None:
|
||
|
|
memory = program.getMemory()
|
||
|
|
listing = program.getListing()
|
||
|
|
|
||
|
|
operand_address = to_address(operand_address_text)
|
||
|
|
instruction_address = operand_address.previous()
|
||
|
|
instruction_end = instruction_address.add(4)
|
||
|
|
|
||
|
|
opcode = int(memory.getByte(instruction_address)) & 0xFF
|
||
|
|
if opcode != 0x9A:
|
||
|
|
return None
|
||
|
|
|
||
|
|
listing.clearCodeUnits(instruction_address, instruction_end, False)
|
||
|
|
|
||
|
|
target_segment, target_offset = parse_seg_off(target_address_text)
|
||
|
|
operand_bytes = [
|
||
|
|
target_offset & 0xFF,
|
||
|
|
(target_offset >> 8) & 0xFF,
|
||
|
|
target_segment & 0xFF,
|
||
|
|
(target_segment >> 8) & 0xFF,
|
||
|
|
]
|
||
|
|
|
||
|
|
for index, byte_value in enumerate(operand_bytes):
|
||
|
|
memory.setByte(operand_address.add(index), signed_byte(byte_value))
|
||
|
|
|
||
|
|
command = DisassembleCommand(instruction_address, None, True)
|
||
|
|
if not command.applyTo(program, ConsoleTaskMonitor()):
|
||
|
|
raise RuntimeError(f"failed to re-disassemble patched CALLF at {instruction_address}")
|
||
|
|
|
||
|
|
comment = f"NE FIXUP APPLIED -> {target_address_text} ({label})"
|
||
|
|
helpers["set_comment"](program, str(instruction_address), comment, "eol")
|
||
|
|
return str(instruction_address)
|
||
|
|
|
||
|
|
|
||
|
|
def annotate_import_far_call(operand_address_text: str, import_target: str) -> str:
|
||
|
|
instruction_address = to_address(operand_address_text).previous()
|
||
|
|
helpers["set_comment"](program, str(instruction_address), f"NE IMPORT -> {import_target}", "eol")
|
||
|
|
return str(instruction_address)
|
||
|
|
|
||
|
|
|
||
|
|
def main() -> None:
|
||
|
|
internal_fixups = load_internal_fixups()
|
||
|
|
import_fixups = load_import_fixups()
|
||
|
|
|
||
|
|
applied_internal = 0
|
||
|
|
skipped_non_call_internal = 0
|
||
|
|
annotated_imports = 0
|
||
|
|
internal_examples: list[str] = []
|
||
|
|
skipped_examples: list[str] = []
|
||
|
|
import_examples: list[str] = []
|
||
|
|
|
||
|
|
transaction_id = program.startTransaction("Apply NE far-call fixups")
|
||
|
|
commit = False
|
||
|
|
try:
|
||
|
|
for source, (target, label) in sorted(internal_fixups.items()):
|
||
|
|
instruction_address = patch_internal_far_call(source, target, label)
|
||
|
|
if instruction_address is None:
|
||
|
|
skipped_non_call_internal += 1
|
||
|
|
if len(skipped_examples) < 10:
|
||
|
|
skipped_examples.append(f"{source} -> {target} ({label})")
|
||
|
|
continue
|
||
|
|
applied_internal += 1
|
||
|
|
if len(internal_examples) < 10:
|
||
|
|
internal_examples.append(f"{instruction_address} -> {target} ({label})")
|
||
|
|
|
||
|
|
for source, target in sorted(import_fixups.items()):
|
||
|
|
instruction_address = annotate_import_far_call(source, target)
|
||
|
|
annotated_imports += 1
|
||
|
|
if len(import_examples) < 10:
|
||
|
|
import_examples.append(f"{instruction_address} -> {target}")
|
||
|
|
|
||
|
|
helpers["set_comment"](
|
||
|
|
program,
|
||
|
|
"0000:ffff",
|
||
|
|
"NE fixup placeholder only. Internal far calls have been patched from the verified relocation table; remaining xrefs, if any, are import callsites or unresolved non-internal cases.",
|
||
|
|
"pre",
|
||
|
|
)
|
||
|
|
|
||
|
|
commit = True
|
||
|
|
finally:
|
||
|
|
program.endTransaction(transaction_id, commit)
|
||
|
|
|
||
|
|
print(f"Applied internal far-call patches: {applied_internal}")
|
||
|
|
for line in internal_examples:
|
||
|
|
print(f" {line}")
|
||
|
|
print(f"Skipped internal far-ptr fixups that were not literal CALLF instructions: {skipped_non_call_internal}")
|
||
|
|
for line in skipped_examples:
|
||
|
|
print(f" {line}")
|
||
|
|
print(f"Annotated import far calls: {annotated_imports}")
|
||
|
|
for line in import_examples:
|
||
|
|
print(f" {line}")
|
||
|
|
|
||
|
|
|
||
|
|
main()
|