from __future__ import annotations import struct from dataclasses import dataclass from pathlib import Path from typing import Callable FLEX_TABLE_OFFSET = 0x80 FLEX_COUNT_OFFSET = 0x54 FIXED_MAP_COUNT_OFFSET = 0x54 FIXED_MAP_TABLE_OFFSET = 0x80 CRUSADER_COORD_SCALE = 2 GLOB_COORD_MASK = ~0x3FF GLOB_COORD_SHIFT = 2 GLOB_COORD_OFFSET = 2 FLAG_INVISIBLE = 0x0010 FLAG_FLIPPED = 0x0020 EGG_FAMILIES = {3, 4, 7, 8} SI_FIXED = 0x0001 SI_SOLID = 0x0002 SI_LAND = 0x0008 SI_OCCL = 0x0010 SI_NOISY = 0x0080 SI_DRAW = 0x0100 SI_ROOF = 0x0400 SI_TRANSL = 0x0800 @dataclass(frozen=True) class FlexEntry: offset: int size: int @dataclass(frozen=True) class ShapeInfo: family: int flags: int x: int y: int z: int anim_type: int @property def is_editor(self) -> bool: return bool(self.flags & 0x1000) @property def is_fixed(self) -> bool: return bool(self.flags & SI_FIXED) @property def is_solid(self) -> bool: return bool(self.flags & SI_SOLID) @property def is_land(self) -> bool: return bool(self.flags & SI_LAND) @property def is_occl(self) -> bool: return bool(self.flags & SI_OCCL) @property def is_noisy(self) -> bool: return bool(self.flags & SI_NOISY) @property def is_draw(self) -> bool: return bool(self.flags & SI_DRAW) @property def is_roof(self) -> bool: return bool(self.flags & SI_ROOF) @property def is_translucent(self) -> bool: return bool(self.flags & SI_TRANSL) @property def is_invitem(self) -> bool: return self.family == 13 @dataclass(frozen=True) class GlobItem: x: int y: int z: int shape: int frame: int @dataclass(frozen=True) class MapItem: x: int y: int z: int shape: int frame: int flags: int quality: int npc_num: int map_num: int next_item: int source: str @dataclass(frozen=True) class ShapeFrame: compressed: bool width: int height: int xoff: int yoff: int line_offsets: tuple[int, ...] rle_data: bytes def read_u16_le(data: bytes, offset: int) -> int: return struct.unpack_from(" int: return data[offset] | (data[offset + 1] << 8) | (data[offset + 2] << 16) def read_u32_le(data: bytes, offset: int) -> int: return struct.unpack_from(" None: self.path = path self.data = path.read_bytes() self.entries = self._read_entries(self.data) @staticmethod def _read_entries(data: bytes) -> list[FlexEntry]: count = read_u32_le(data, FLEX_COUNT_OFFSET) entries: list[FlexEntry] = [] for index in range(count): base = FLEX_TABLE_OFFSET + index * 8 entries.append(FlexEntry(read_u32_le(data, base), read_u32_le(data, base + 4))) return entries def get(self, index: int) -> bytes: entry = self.entries[index] if entry.size == 0: return b"" return self.data[entry.offset : entry.offset + entry.size] def __len__(self) -> int: return len(self.entries) class ShapeArchive: def __init__(self, path: Path) -> None: self.archive = FlexArchive(path) self._shape_cache: dict[int, tuple[ShapeFrame, ...]] = {} self._decoded_frame_cache: dict[tuple[int, int], list[int]] = {} def get_frame(self, shape_index: int, frame_index: int) -> ShapeFrame: frames = self._get_shape(shape_index) if frame_index < 0 or frame_index >= len(frames): raise IndexError(f"shape {shape_index} frame {frame_index} out of range") return frames[frame_index] def decode_frame(self, shape_index: int, frame_index: int) -> tuple[ShapeFrame, list[int]]: cache_key = (shape_index, frame_index) decoded = self._decoded_frame_cache.get(cache_key) frame = self.get_frame(shape_index, frame_index) if decoded is None: decoded = self._decode_pixels(frame) self._decoded_frame_cache[cache_key] = decoded return frame, decoded def _get_shape(self, shape_index: int) -> tuple[ShapeFrame, ...]: cached = self._shape_cache.get(shape_index) if cached is not None: return cached raw = self.archive.get(shape_index) if not raw: raise ValueError(f"shape {shape_index} has no data") frames = self._parse_shape(raw) self._shape_cache[shape_index] = frames return frames @staticmethod def _parse_shape(data: bytes) -> tuple[ShapeFrame, ...]: frame_count = read_u16_le(data, 4) frames: list[ShapeFrame] = [] for index in range(frame_count): header_offset = 6 + index * 8 frame_offset = read_u24_le(data, header_offset) frame_size = read_u32_le(data, header_offset + 4) frame_data = data[frame_offset : frame_offset + frame_size] if len(frame_data) < 28: raise ValueError(f"frame {index} too small: {len(frame_data)}") compressed = bool(read_u32_le(frame_data, 8)) width = read_u32_le(frame_data, 12) height = read_u32_le(frame_data, 16) xoff = struct.unpack_from(" list[int]: pixels = [-1] * (frame.width * frame.height) rle = frame.rle_data for row in range(frame.height): pos = frame.line_offsets[row] xpos = 0 while xpos < frame.width: if pos >= len(rle): raise ValueError(f"row {row} overran RLE data") xpos += rle[pos] pos += 1 if xpos == frame.width: break if pos >= len(rle): raise ValueError(f"row {row} missing run header") dlen = rle[pos] pos += 1 run_type = 0 if frame.compressed: run_type = dlen & 1 dlen >>= 1 if dlen <= 0 or xpos + dlen > frame.width: raise ValueError(f"invalid run length {dlen} at row {row}") row_base = row * frame.width + xpos if run_type == 0: end = pos + dlen if end > len(rle): raise ValueError(f"row {row} literal run overruns RLE data") run = rle[pos:end] for index, color in enumerate(run): pixels[row_base + index] = color pos = end else: if pos >= len(rle): raise ValueError(f"row {row} repeated-color run missing color byte") color = rle[pos] pos += 1 for index in range(dlen): pixels[row_base + index] = color xpos += dlen return pixels def load_palette(path: Path) -> list[tuple[int, int, int]]: data = path.read_bytes() if len(data) < 768: raise ValueError(f"palette too small: {path}") palette: list[tuple[int, int, int]] = [] for index in range(256): r = (data[index * 3] * 255) // 63 g = (data[index * 3 + 1] * 255) // 63 b = (data[index * 3 + 2] * 255) // 63 palette.append((r, g, b)) return palette def load_typeflags(path: Path) -> list[ShapeInfo]: data = path.read_bytes() infos: list[ShapeInfo] = [] for base in range(0, len(data), 9): block = data[base : base + 9] if len(block) < 9: break flags = 0 if block[0] & 0x01: flags |= 0x0001 if block[0] & 0x02: flags |= 0x0002 if block[0] & 0x04: flags |= 0x0004 if block[0] & 0x08: flags |= 0x0008 if block[0] & 0x10: flags |= 0x0010 if block[0] & 0x20: flags |= 0x0020 if block[0] & 0x40: flags |= 0x0040 if block[0] & 0x80: flags |= 0x0080 if block[1] & 0x01: flags |= 0x0100 if block[1] & 0x02: flags |= 0x0200 if block[1] & 0x04: flags |= 0x0400 if block[1] & 0x08: flags |= 0x0800 if block[6] & 0x01: flags |= 0x1000 if block[6] & 0x02: flags |= 0x2000 if block[6] & 0x04: flags |= 0x4000 if block[6] & 0x08: flags |= 0x8000 if block[6] & 0x10: flags |= 0x10000 if block[6] & 0x20: flags |= 0x20000 if block[6] & 0x40: flags |= 0x40000 if block[6] & 0x80: flags |= 0x80000 family = (block[1] >> 4) + ((block[2] & 1) << 4) x = ((block[3] << 3) | (block[2] >> 5)) & 0x1F y = (block[3] >> 2) & 0x1F z = ((block[4] << 1) | (block[3] >> 7)) & 0x1F anim_type = block[4] >> 4 infos.append(ShapeInfo(family=family, flags=flags, x=x, y=y, z=z, anim_type=anim_type)) return infos def load_globs(path: Path) -> list[list[GlobItem]]: archive = FlexArchive(path) globs: list[list[GlobItem]] = [] for index in range(len(archive)): raw = archive.get(index) if not raw: globs.append([]) continue count = read_u16_le(raw, 0) items: list[GlobItem] = [] for item_index in range(count): base = 2 + item_index * 6 items.append( GlobItem( x=raw[base], y=raw[base + 1], z=raw[base + 2], shape=read_u16_le(raw, base + 3), frame=raw[base + 5], ) ) globs.append(items) return globs def load_map_items(path: Path, map_index: int) -> list[MapItem]: if not path.exists(): raise FileNotFoundError(path) data = path.read_bytes() map_count = read_u16_le(data, FIXED_MAP_COUNT_OFFSET) if map_index < 0 or map_index >= map_count: raise ValueError(f"map index {map_index} out of range 0..{map_count - 1}") table_offset = FIXED_MAP_TABLE_OFFSET + map_index * 8 map_offset = read_u32_le(data, table_offset) map_size = read_u32_le(data, table_offset + 4) payload = data[map_offset : map_offset + map_size] if len(payload) != map_size: raise ValueError(f"map {map_index} payload truncated") items: list[MapItem] = [] for base in range(0, len(payload), 16): record = payload[base : base + 16] if len(record) < 16: break x = read_u16_le(record, 0) * CRUSADER_COORD_SCALE y = read_u16_le(record, 2) * CRUSADER_COORD_SCALE items.append( MapItem( x=x, y=y, z=record[4], shape=read_u16_le(record, 5), frame=record[7], flags=read_u16_le(record, 8), quality=read_u16_le(record, 10), npc_num=record[12], map_num=record[13], next_item=read_u16_le(record, 14), source="fixed", ) ) return items def expand_glob_item(item: MapItem, globs: list[list[GlobItem]]) -> list[MapItem]: if item.quality < 0 or item.quality >= len(globs): return [] expanded: list[MapItem] = [] for glob_item in globs[item.quality]: expanded.append( MapItem( x=(item.x & GLOB_COORD_MASK) + (glob_item.x << GLOB_COORD_SHIFT) + GLOB_COORD_OFFSET, y=(item.y & GLOB_COORD_MASK) + (glob_item.y << GLOB_COORD_SHIFT) + GLOB_COORD_OFFSET, z=item.z + glob_item.z, shape=glob_item.shape, frame=glob_item.frame, flags=0, quality=0, npc_num=0, map_num=item.map_num, next_item=0, source="glob", ) ) return expanded def collect_render_items( base_items: list[MapItem], shape_infos: list[ShapeInfo], globs: list[list[GlobItem]], include_editor: bool, expand_globs: bool, world_rect: tuple[int, int, int, int] | None, include_roofs: bool = True, include_hidden_markers: bool = False, progress: Callable[[str], None] | None = None, checkpoint_every: int = 0, ) -> list[MapItem]: render_items: list[MapItem] = [] pending = list(base_items) index = 0 skipped_invisible = 0 skipped_world_rect = 0 skipped_invalid_shape = 0 skipped_editor = 0 skipped_egg = 0 skipped_roof = 0 skipped_hidden = 0 expanded_globs = 0 while index < len(pending): item = pending[index] index += 1 if item.flags & FLAG_INVISIBLE: if not include_hidden_markers: skipped_hidden += 1 continue skipped_invisible += 1 if world_rect is not None: min_x, min_y, max_x, max_y = world_rect if item.x < min_x or item.y < min_y or item.x > max_x or item.y > max_y: skipped_world_rect += 1 continue if item.shape >= len(shape_infos): skipped_invalid_shape += 1 continue info = shape_infos[item.shape] if info.is_editor and not include_editor: skipped_editor += 1 continue if info.is_roof and not include_roofs: skipped_roof += 1 continue if expand_globs and info.family == 3 and item.source == "fixed": pending.extend(expand_glob_item(item, globs)) expanded_globs += 1 if not include_hidden_markers: continue if info.family in EGG_FAMILIES and not include_hidden_markers: skipped_egg += 1 continue render_items.append(item) if progress is not None and checkpoint_every > 0 and index % checkpoint_every == 0: progress( "collect " f"processed={index} pending={len(pending)} rendered={len(render_items)} " f"expanded_globs={expanded_globs} skipped=(invisible:{skipped_invisible}, world:{skipped_world_rect}, " f"shape:{skipped_invalid_shape}, editor:{skipped_editor}, roof:{skipped_roof}, hidden:{skipped_hidden}, egg:{skipped_egg})" ) if progress is not None: progress( "collect complete " f"processed={index} pending={len(pending)} rendered={len(render_items)} " f"expanded_globs={expanded_globs} skipped=(invisible:{skipped_invisible}, world:{skipped_world_rect}, " f"shape:{skipped_invalid_shape}, editor:{skipped_editor}, roof:{skipped_roof}, hidden:{skipped_hidden}, egg:{skipped_egg})" ) return render_items def parse_world_rect(values: list[str] | None) -> tuple[int, int, int, int] | None: if values is None: return None if len(values) != 4: raise ValueError("--world-rect expects four integers: min_x min_y max_x max_y") min_x, min_y, max_x, max_y = (int(value, 0) for value in values) if min_x > max_x or min_y > max_y: raise ValueError("invalid --world-rect bounds") return min_x, min_y, max_x, max_y def resolve_fixed_dat(static_dir: Path, fixed_dat: str | None) -> Path: if fixed_dat: return Path(fixed_dat) return static_dir / "FIXED.DAT" def resolve_static_dir(repo_root: Path, game: str, static_dir: str | None) -> Path: if static_dir: return Path(static_dir) if game == "regret": return repo_root / "STATIC_REGRET" return repo_root / "STATIC"