Crusader_Decomp/ne_reloc_parser.py

379 lines
15 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
NE Relocation Table Parser for Crusader: No Remorse
====================================================
Reads the NE header + per-segment relocation entries from CRUSADER.EXE.
Resolves each CALLF 0x0000:FFFF fixup to its real inter-segment target.
Emits a mapping file suitable for Ghidra annotation.
NE binary: CRUSADER.EXE (bound MZ+NE, NE header at 0x36F70)
Raw import: Ghidra loads the whole file as flat RAM.
Ghidra flat address = file_offset (since it's a raw binary import)
Ghidra seg:off = (flat >> 16) : (flat & 0xFFFF)
"""
import struct, sys, os, json
from collections import defaultdict
EXE_PATH = r'k:\ghidra\Crusader_Decomp\CRUSADER.EXE'
NE_HEADER_OFFSET = 0x36F70 # e_lfanew from MZ header
# ── NE relocation entry address-type codes ──
ADDR_LOBYTE = 0
ADDR_SELECTOR = 2
ADDR_FARPTR = 3 # 16:16 far pointer ← this is CALLF target
ADDR_OFFSET = 5
ADDR_48PTR = 11
ADDR_OFFSET32 = 13
# ── NE relocation entry relocation-type codes ──
REL_INTERNAL = 0 # intra-module (segment:offset)
REL_IMPORTORD = 1 # imported by ordinal
REL_IMPORTNAM = 2 # imported by name
REL_OSFIXUP = 3 # OS fixup
ADDR_TYPE_NAMES = {
0: 'lobyte', 2: 'selector', 3: 'far_ptr_16:16',
5: 'offset16', 11: 'ptr_48', 13: 'offset32'
}
REL_TYPE_NAMES = {
0: 'internal', 1: 'import_ordinal', 2: 'import_name', 3: 'osfixup'
}
def read_u8(data, off):
return data[off]
def read_u16(data, off):
return struct.unpack_from('<H', data, off)[0]
def read_u32(data, off):
return struct.unpack_from('<I', data, off)[0]
def parse_ne_header(data, ne_off):
"""Parse key fields from the NE header."""
magic = data[ne_off:ne_off+2]
assert magic == b'NE', f"Bad NE magic at 0x{ne_off:X}: {magic}"
hdr = {}
hdr['linker_ver'] = read_u8(data, ne_off + 2)
hdr['linker_rev'] = read_u8(data, ne_off + 3)
hdr['entry_table_off'] = read_u16(data, ne_off + 4) + ne_off
hdr['entry_table_len'] = read_u16(data, ne_off + 6)
hdr['flags'] = read_u16(data, ne_off + 12)
hdr['auto_data_seg'] = read_u16(data, ne_off + 14)
hdr['seg_table_off'] = read_u16(data, ne_off + 34) + ne_off
hdr['resource_table_off'] = read_u16(data, ne_off + 36) + ne_off
hdr['resident_name_off'] = read_u16(data, ne_off + 38) + ne_off
hdr['module_ref_off'] = read_u16(data, ne_off + 40) + ne_off
hdr['imported_name_off'] = read_u16(data, ne_off + 42) + ne_off
hdr['nonresident_name_off'] = read_u32(data, ne_off + 44)
hdr['moveable_entries'] = read_u16(data, ne_off + 48)
hdr['alignment_shift'] = read_u16(data, ne_off + 50)
hdr['num_resource_segs'] = read_u16(data, ne_off + 52)
hdr['target_os'] = read_u8(data, ne_off + 54)
hdr['num_segments'] = read_u16(data, ne_off + 44 - 10) # field at offset 0x1C
# Actually let me re-check the NE header layout more carefully
# NE header fields (offsets relative to NE signature):
# 0x1C = number of segments
# 0x22 = offset of segment table (relative to NE header)
# 0x32 = alignment shift count
hdr['num_segments'] = read_u16(data, ne_off + 0x1C)
hdr['seg_table_off'] = read_u16(data, ne_off + 0x22) + ne_off
hdr['alignment_shift'] = read_u16(data, ne_off + 0x32)
hdr['module_ref_off'] = read_u16(data, ne_off + 0x28) + ne_off
hdr['imported_name_off'] = read_u16(data, ne_off + 0x2A) + ne_off
hdr['num_module_refs'] = read_u16(data, ne_off + 0x1E)
return hdr
def parse_segment_table(data, hdr):
"""Parse the NE segment table entries (8 bytes each)."""
segments = []
off = hdr['seg_table_off']
shift = hdr['alignment_shift']
for i in range(hdr['num_segments']):
sector_off = read_u16(data, off)
seg_len = read_u16(data, off + 2)
seg_flags = read_u16(data, off + 4)
min_alloc = read_u16(data, off + 6)
file_offset = sector_off << shift if sector_off != 0 else 0
has_reloc = bool(seg_flags & 0x0100)
# Fix zero length = 64K
if seg_len == 0 and sector_off != 0:
seg_len = 0x10000
segments.append({
'index': i + 1, # 1-based segment number
'file_offset': file_offset,
'length': seg_len,
'flags': seg_flags,
'min_alloc': min_alloc,
'has_reloc': has_reloc,
})
off += 8
return segments
def parse_module_refs(data, hdr):
"""Parse the module reference table → imported module names."""
modules = []
mref_off = hdr['module_ref_off']
iname_off = hdr['imported_name_off']
for i in range(hdr['num_module_refs']):
name_off_rel = read_u16(data, mref_off + i * 2)
name_off_abs = iname_off + name_off_rel
name_len = read_u8(data, name_off_abs)
name = data[name_off_abs + 1: name_off_abs + 1 + name_len].decode('ascii', errors='replace')
modules.append(name)
return modules
def parse_relocations(data, seg):
"""Parse relocation entries for a single segment."""
if not seg['has_reloc']:
return []
# Relocation table starts right after the segment data in the file
reloc_off = seg['file_offset'] + seg['length']
num_relocs = read_u16(data, reloc_off)
reloc_off += 2
entries = []
for i in range(num_relocs):
addr_type = read_u8(data, reloc_off)
rel_type = read_u8(data, reloc_off + 1)
chain_off = read_u16(data, reloc_off + 2) # offset within segment where fixup applies
# Additive flag is bit 2 of rel_type
additive = bool(rel_type & 0x04)
rel_type_base = rel_type & 0x03
entry = {
'addr_type': addr_type,
'addr_type_name': ADDR_TYPE_NAMES.get(addr_type, f'unk_{addr_type}'),
'rel_type': rel_type_base,
'rel_type_name': REL_TYPE_NAMES.get(rel_type_base, f'unk_{rel_type_base}'),
'additive': additive,
'seg_offset': chain_off,
'seg_index': seg['index'],
}
if rel_type_base == REL_INTERNAL:
# Internal reference
target_seg = read_u8(data, reloc_off + 4)
reserved = read_u8(data, reloc_off + 5)
target_off = read_u16(data, reloc_off + 6)
if target_seg == 0xFF:
# Moveable segment, target_off is entry table ordinal
entry['target_type'] = 'moveable_entry'
entry['entry_ordinal'] = target_off
else:
entry['target_type'] = 'fixed'
entry['target_seg'] = target_seg # 1-based segment number
entry['target_offset'] = target_off
elif rel_type_base == REL_IMPORTORD:
module_idx = read_u16(data, reloc_off + 4) # 1-based
ordinal = read_u16(data, reloc_off + 6)
entry['target_type'] = 'import_ordinal'
entry['module_index'] = module_idx
entry['ordinal'] = ordinal
elif rel_type_base == REL_IMPORTNAM:
module_idx = read_u16(data, reloc_off + 4) # 1-based
name_off = read_u16(data, reloc_off + 6)
entry['target_type'] = 'import_name'
entry['module_index'] = module_idx
entry['name_offset'] = name_off
elif rel_type_base == REL_OSFIXUP:
fixup_type = read_u16(data, reloc_off + 4)
entry['target_type'] = 'osfixup'
entry['osfixup_type'] = fixup_type
entries.append(entry)
reloc_off += 8
return entries
def follow_reloc_chain(data, seg, first_offset, addr_type):
"""
NE relocations use a chain: the first entry points to an offset in
the segment. At that offset, a word points to the next offset
needing the same fixup. 0xFFFF terminates the chain.
Returns all offsets in the chain.
"""
offsets = []
seg_data_start = seg['file_offset']
seg_len = seg['length']
current = first_offset
visited = set()
while current != 0xFFFF and current < seg_len:
if current in visited:
break # cycle protection
visited.add(current)
offsets.append(current)
# For far_ptr: the call instruction is CALLF seg:off at the offset
# The offset field (first word) at current contains the next chain link
next_ptr_file = seg_data_start + current
if next_ptr_file + 2 > len(data):
break
next_off = read_u16(data, next_ptr_file)
current = next_off
return offsets
def file_offset_to_ghidra(file_off):
"""Convert file offset to Ghidra seg:off address string (raw import)."""
seg = file_off >> 16
off = file_off & 0xFFFF
return f'{seg:04x}:{off:04x}'
def main():
print(f"Reading {EXE_PATH}...")
with open(EXE_PATH, 'rb') as f:
data = f.read()
print(f" File size: {len(data)} bytes (0x{len(data):X})")
# Verify NE header location
# Check MZ header first
assert data[0:2] == b'MZ', "Not an MZ executable"
lfanew = read_u32(data, 0x3C)
print(f" e_lfanew from MZ header: 0x{lfanew:X}")
# Use the known NE offset
ne_off = NE_HEADER_OFFSET
print(f" Using NE header at: 0x{ne_off:X}")
hdr = parse_ne_header(data, ne_off)
print(f" Segments: {hdr['num_segments']}")
print(f" Alignment shift: {hdr['alignment_shift']}")
print(f" Module refs: {hdr['num_module_refs']}")
modules = parse_module_refs(data, hdr)
print(f" Imported modules: {modules}")
segments = parse_segment_table(data, hdr)
# Parse all relocations
all_fixups = [] # list of resolved fixup records
stats = defaultdict(int)
for seg in segments:
relocs = parse_relocations(data, seg)
if not relocs:
continue
for reloc in relocs:
# Follow the chain to find ALL offsets needing this fixup
chain = follow_reloc_chain(data, seg, reloc['seg_offset'], reloc['addr_type'])
for fixup_off in chain:
fixup_file_off = seg['file_offset'] + fixup_off
ghidra_addr = file_offset_to_ghidra(fixup_file_off)
rec = {
'source_seg': seg['index'],
'source_offset_in_seg': fixup_off,
'source_file_offset': fixup_file_off,
'source_ghidra': ghidra_addr,
'addr_type': reloc['addr_type_name'],
'rel_type': reloc['rel_type_name'],
}
if reloc.get('target_type') == 'fixed':
target_seg_idx = reloc['target_seg']
target_off = reloc['target_offset']
target_seg_info = segments[target_seg_idx - 1]
target_file_off = target_seg_info['file_offset'] + target_off
target_ghidra = file_offset_to_ghidra(target_file_off)
rec['target'] = f'seg{target_seg_idx:03d}:{target_off:04x}'
rec['target_ghidra'] = target_ghidra
rec['target_file_offset'] = target_file_off
elif reloc.get('target_type') == 'moveable_entry':
rec['target'] = f'entry_ordinal_{reloc["entry_ordinal"]}'
rec['target_ghidra'] = '?'
elif reloc.get('target_type') == 'import_ordinal':
mod_idx = reloc['module_index']
mod_name = modules[mod_idx - 1] if mod_idx <= len(modules) else f'mod{mod_idx}'
rec['target'] = f'{mod_name}.{reloc["ordinal"]}'
rec['target_ghidra'] = '?'
elif reloc.get('target_type') == 'import_name':
mod_idx = reloc['module_index']
mod_name = modules[mod_idx - 1] if mod_idx <= len(modules) else f'mod{mod_idx}'
# Read the imported name
iname_base = hdr['imported_name_off']
name_off = iname_base + reloc['name_offset']
name_len = read_u8(data, name_off)
name = data[name_off+1:name_off+1+name_len].decode('ascii', errors='replace')
rec['target'] = f'{mod_name}.{name}'
rec['target_ghidra'] = '?'
elif reloc.get('target_type') == 'osfixup':
rec['target'] = f'osfixup_{reloc["osfixup_type"]}'
rec['target_ghidra'] = '?'
else:
rec['target'] = '???'
rec['target_ghidra'] = '?'
all_fixups.append(rec)
stats[reloc['addr_type_name']] += 1
print(f"\n Total resolved fixup points: {len(all_fixups)}")
print(f" By address type: {dict(stats)}")
# Filter to just far_ptr (CALLF) fixups with internal targets — these are the ones
# that decompile as CALLF 0000:ffff in Ghidra
far_calls = [f for f in all_fixups if f['addr_type'] == 'far_ptr_16:16' and f.get('target_ghidra', '?') != '?']
far_imports = [f for f in all_fixups if f['addr_type'] == 'far_ptr_16:16' and f.get('target_ghidra', '?') == '?']
print(f" Far-call internal fixups: {len(far_calls)}")
print(f" Far-call import fixups: {len(far_imports)}")
# Save full results
out_path = os.path.join(os.path.dirname(EXE_PATH), 'ne_reloc_fixups.json')
with open(out_path, 'w') as f:
json.dump(all_fixups, f, indent=2)
print(f"\n Full fixup table written to: {out_path}")
# Save a focused far-call table (TSV) for easy use
tsv_path = os.path.join(os.path.dirname(EXE_PATH), 'ne_reloc_far_calls.tsv')
with open(tsv_path, 'w') as f:
f.write("source_ghidra\ttarget_ghidra\ttarget_label\tsource_seg\tsource_off_in_seg\n")
for rec in sorted(far_calls, key=lambda r: r['source_file_offset']):
f.write(f"{rec['source_ghidra']}\t{rec['target_ghidra']}\t{rec['target']}\t")
f.write(f"seg{rec['source_seg']:03d}\t0x{rec['source_offset_in_seg']:04x}\n")
print(f" Far-call internal TSV: {tsv_path}")
# Also save import far-calls
imp_path = os.path.join(os.path.dirname(EXE_PATH), 'ne_reloc_far_imports.tsv')
with open(imp_path, 'w') as f:
f.write("source_ghidra\ttarget\tsource_seg\tsource_off_in_seg\n")
for rec in sorted(far_imports, key=lambda r: r['source_file_offset']):
f.write(f"{rec['source_ghidra']}\t{rec['target']}\t")
f.write(f"seg{rec['source_seg']:03d}\t0x{rec['source_offset_in_seg']:04x}\n")
print(f" Far-call import TSV: {imp_path}")
# Print a sample of game-segment far calls (seg039=seg001 region in raw, file offset 0x6E200)
print("\n── Sample: seg039 (NE seg 39, game seg001 area) far-call fixups ──")
seg39_calls = [f for f in far_calls if f['source_seg'] == 39]
for rec in sorted(seg39_calls, key=lambda r: r['source_offset_in_seg'])[:30]:
print(f" {rec['source_ghidra']}{rec['target_ghidra']} ({rec['target']})")
# Print a sample around the entity_ai_update_loop / entity_animation area
print("\n── Sample: seg059 (NE seg 59, game 0007: area) far-call fixups ──")
seg59_calls = [f for f in far_calls if f['source_seg'] == 59]
for rec in sorted(seg59_calls, key=lambda r: r['source_offset_in_seg'])[:30]:
print(f" {rec['source_ghidra']}{rec['target_ghidra']} ({rec['target']})")
if __name__ == '__main__':
main()