Add new modules for Crusader map rendering and processing
- Implemented `formats.py` to define data structures and functions for handling map data, including reading and decoding shape and map items. - Created `png.py` for generating PNG images from shape frames and pixel data. - Developed `sorting.py` to manage the sorting and rendering order of map items based on their properties and spatial relationships. - Introduced `render_all_maps.py` to facilitate the rendering of all maps for specified games, including command-line argument parsing and subprocess management for rendering tasks.
This commit is contained in:
parent
af5b77ea13
commit
82ae89865a
47 changed files with 1602 additions and 1562 deletions
516
tools/crusader_map/formats.py
Normal file
516
tools/crusader_map/formats.py
Normal file
|
|
@ -0,0 +1,516 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import struct
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Callable
|
||||
|
||||
|
||||
FLEX_TABLE_OFFSET = 0x80
|
||||
FLEX_COUNT_OFFSET = 0x54
|
||||
FIXED_MAP_COUNT_OFFSET = 0x54
|
||||
FIXED_MAP_TABLE_OFFSET = 0x80
|
||||
CRUSADER_COORD_SCALE = 2
|
||||
GLOB_COORD_MASK = ~0x3FF
|
||||
GLOB_COORD_SHIFT = 2
|
||||
GLOB_COORD_OFFSET = 2
|
||||
FLAG_INVISIBLE = 0x0010
|
||||
FLAG_FLIPPED = 0x0020
|
||||
EGG_FAMILIES = {3, 4, 7, 8}
|
||||
|
||||
SI_FIXED = 0x0001
|
||||
SI_SOLID = 0x0002
|
||||
SI_LAND = 0x0008
|
||||
SI_OCCL = 0x0010
|
||||
SI_NOISY = 0x0080
|
||||
SI_DRAW = 0x0100
|
||||
SI_ROOF = 0x0400
|
||||
SI_TRANSL = 0x0800
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class FlexEntry:
|
||||
offset: int
|
||||
size: int
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ShapeInfo:
|
||||
family: int
|
||||
flags: int
|
||||
x: int
|
||||
y: int
|
||||
z: int
|
||||
anim_type: int
|
||||
|
||||
@property
|
||||
def is_editor(self) -> bool:
|
||||
return bool(self.flags & 0x1000)
|
||||
|
||||
@property
|
||||
def is_fixed(self) -> bool:
|
||||
return bool(self.flags & SI_FIXED)
|
||||
|
||||
@property
|
||||
def is_solid(self) -> bool:
|
||||
return bool(self.flags & SI_SOLID)
|
||||
|
||||
@property
|
||||
def is_land(self) -> bool:
|
||||
return bool(self.flags & SI_LAND)
|
||||
|
||||
@property
|
||||
def is_occl(self) -> bool:
|
||||
return bool(self.flags & SI_OCCL)
|
||||
|
||||
@property
|
||||
def is_noisy(self) -> bool:
|
||||
return bool(self.flags & SI_NOISY)
|
||||
|
||||
@property
|
||||
def is_draw(self) -> bool:
|
||||
return bool(self.flags & SI_DRAW)
|
||||
|
||||
@property
|
||||
def is_roof(self) -> bool:
|
||||
return bool(self.flags & SI_ROOF)
|
||||
|
||||
@property
|
||||
def is_translucent(self) -> bool:
|
||||
return bool(self.flags & SI_TRANSL)
|
||||
|
||||
@property
|
||||
def is_invitem(self) -> bool:
|
||||
return self.family == 13
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class GlobItem:
|
||||
x: int
|
||||
y: int
|
||||
z: int
|
||||
shape: int
|
||||
frame: int
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class MapItem:
|
||||
x: int
|
||||
y: int
|
||||
z: int
|
||||
shape: int
|
||||
frame: int
|
||||
flags: int
|
||||
quality: int
|
||||
npc_num: int
|
||||
map_num: int
|
||||
next_item: int
|
||||
source: str
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ShapeFrame:
|
||||
compressed: bool
|
||||
width: int
|
||||
height: int
|
||||
xoff: int
|
||||
yoff: int
|
||||
line_offsets: tuple[int, ...]
|
||||
rle_data: bytes
|
||||
|
||||
|
||||
def read_u16_le(data: bytes, offset: int) -> int:
|
||||
return struct.unpack_from("<H", data, offset)[0]
|
||||
|
||||
|
||||
def read_u24_le(data: bytes, offset: int) -> int:
|
||||
return data[offset] | (data[offset + 1] << 8) | (data[offset + 2] << 16)
|
||||
|
||||
|
||||
def read_u32_le(data: bytes, offset: int) -> int:
|
||||
return struct.unpack_from("<I", data, offset)[0]
|
||||
|
||||
|
||||
class FlexArchive:
|
||||
def __init__(self, path: Path) -> None:
|
||||
self.path = path
|
||||
self.data = path.read_bytes()
|
||||
self.entries = self._read_entries(self.data)
|
||||
|
||||
@staticmethod
|
||||
def _read_entries(data: bytes) -> list[FlexEntry]:
|
||||
count = read_u32_le(data, FLEX_COUNT_OFFSET)
|
||||
entries: list[FlexEntry] = []
|
||||
for index in range(count):
|
||||
base = FLEX_TABLE_OFFSET + index * 8
|
||||
entries.append(FlexEntry(read_u32_le(data, base), read_u32_le(data, base + 4)))
|
||||
return entries
|
||||
|
||||
def get(self, index: int) -> bytes:
|
||||
entry = self.entries[index]
|
||||
if entry.size == 0:
|
||||
return b""
|
||||
return self.data[entry.offset : entry.offset + entry.size]
|
||||
|
||||
def __len__(self) -> int:
|
||||
return len(self.entries)
|
||||
|
||||
|
||||
class ShapeArchive:
|
||||
def __init__(self, path: Path) -> None:
|
||||
self.archive = FlexArchive(path)
|
||||
self._shape_cache: dict[int, tuple[ShapeFrame, ...]] = {}
|
||||
self._decoded_frame_cache: dict[tuple[int, int], list[int]] = {}
|
||||
|
||||
def get_frame(self, shape_index: int, frame_index: int) -> ShapeFrame:
|
||||
frames = self._get_shape(shape_index)
|
||||
if frame_index < 0 or frame_index >= len(frames):
|
||||
raise IndexError(f"shape {shape_index} frame {frame_index} out of range")
|
||||
return frames[frame_index]
|
||||
|
||||
def decode_frame(self, shape_index: int, frame_index: int) -> tuple[ShapeFrame, list[int]]:
|
||||
cache_key = (shape_index, frame_index)
|
||||
decoded = self._decoded_frame_cache.get(cache_key)
|
||||
frame = self.get_frame(shape_index, frame_index)
|
||||
if decoded is None:
|
||||
decoded = self._decode_pixels(frame)
|
||||
self._decoded_frame_cache[cache_key] = decoded
|
||||
return frame, decoded
|
||||
|
||||
def _get_shape(self, shape_index: int) -> tuple[ShapeFrame, ...]:
|
||||
cached = self._shape_cache.get(shape_index)
|
||||
if cached is not None:
|
||||
return cached
|
||||
raw = self.archive.get(shape_index)
|
||||
if not raw:
|
||||
raise ValueError(f"shape {shape_index} has no data")
|
||||
frames = self._parse_shape(raw)
|
||||
self._shape_cache[shape_index] = frames
|
||||
return frames
|
||||
|
||||
@staticmethod
|
||||
def _parse_shape(data: bytes) -> tuple[ShapeFrame, ...]:
|
||||
frame_count = read_u16_le(data, 4)
|
||||
frames: list[ShapeFrame] = []
|
||||
for index in range(frame_count):
|
||||
header_offset = 6 + index * 8
|
||||
frame_offset = read_u24_le(data, header_offset)
|
||||
frame_size = read_u32_le(data, header_offset + 4)
|
||||
frame_data = data[frame_offset : frame_offset + frame_size]
|
||||
if len(frame_data) < 28:
|
||||
raise ValueError(f"frame {index} too small: {len(frame_data)}")
|
||||
compressed = bool(read_u32_le(frame_data, 8))
|
||||
width = read_u32_le(frame_data, 12)
|
||||
height = read_u32_le(frame_data, 16)
|
||||
xoff = struct.unpack_from("<i", frame_data, 20)[0]
|
||||
yoff = struct.unpack_from("<i", frame_data, 24)[0]
|
||||
line_offsets = tuple(read_u32_le(frame_data, 28 + row * 4) - ((height - row) * 4) for row in range(height))
|
||||
rle_offset = 28 + height * 4
|
||||
frames.append(
|
||||
ShapeFrame(
|
||||
compressed=compressed,
|
||||
width=width,
|
||||
height=height,
|
||||
xoff=xoff,
|
||||
yoff=yoff,
|
||||
line_offsets=line_offsets,
|
||||
rle_data=frame_data[rle_offset:],
|
||||
)
|
||||
)
|
||||
return tuple(frames)
|
||||
|
||||
@staticmethod
|
||||
def _decode_pixels(frame: ShapeFrame) -> list[int]:
|
||||
pixels = [-1] * (frame.width * frame.height)
|
||||
rle = frame.rle_data
|
||||
for row in range(frame.height):
|
||||
pos = frame.line_offsets[row]
|
||||
xpos = 0
|
||||
while xpos < frame.width:
|
||||
if pos >= len(rle):
|
||||
raise ValueError(f"row {row} overran RLE data")
|
||||
xpos += rle[pos]
|
||||
pos += 1
|
||||
if xpos == frame.width:
|
||||
break
|
||||
if pos >= len(rle):
|
||||
raise ValueError(f"row {row} missing run header")
|
||||
dlen = rle[pos]
|
||||
pos += 1
|
||||
run_type = 0
|
||||
if frame.compressed:
|
||||
run_type = dlen & 1
|
||||
dlen >>= 1
|
||||
if dlen <= 0 or xpos + dlen > frame.width:
|
||||
raise ValueError(f"invalid run length {dlen} at row {row}")
|
||||
row_base = row * frame.width + xpos
|
||||
if run_type == 0:
|
||||
end = pos + dlen
|
||||
if end > len(rle):
|
||||
raise ValueError(f"row {row} literal run overruns RLE data")
|
||||
run = rle[pos:end]
|
||||
for index, color in enumerate(run):
|
||||
pixels[row_base + index] = color
|
||||
pos = end
|
||||
else:
|
||||
if pos >= len(rle):
|
||||
raise ValueError(f"row {row} repeated-color run missing color byte")
|
||||
color = rle[pos]
|
||||
pos += 1
|
||||
for index in range(dlen):
|
||||
pixels[row_base + index] = color
|
||||
xpos += dlen
|
||||
return pixels
|
||||
|
||||
|
||||
def load_palette(path: Path) -> list[tuple[int, int, int]]:
|
||||
data = path.read_bytes()
|
||||
if len(data) < 768:
|
||||
raise ValueError(f"palette too small: {path}")
|
||||
palette: list[tuple[int, int, int]] = []
|
||||
for index in range(256):
|
||||
r = (data[index * 3] * 255) // 63
|
||||
g = (data[index * 3 + 1] * 255) // 63
|
||||
b = (data[index * 3 + 2] * 255) // 63
|
||||
palette.append((r, g, b))
|
||||
return palette
|
||||
|
||||
|
||||
def load_typeflags(path: Path) -> list[ShapeInfo]:
|
||||
data = path.read_bytes()
|
||||
infos: list[ShapeInfo] = []
|
||||
for base in range(0, len(data), 9):
|
||||
block = data[base : base + 9]
|
||||
if len(block) < 9:
|
||||
break
|
||||
flags = 0
|
||||
if block[0] & 0x01:
|
||||
flags |= 0x0001
|
||||
if block[0] & 0x02:
|
||||
flags |= 0x0002
|
||||
if block[0] & 0x04:
|
||||
flags |= 0x0004
|
||||
if block[0] & 0x08:
|
||||
flags |= 0x0008
|
||||
if block[0] & 0x10:
|
||||
flags |= 0x0010
|
||||
if block[0] & 0x20:
|
||||
flags |= 0x0020
|
||||
if block[0] & 0x40:
|
||||
flags |= 0x0040
|
||||
if block[0] & 0x80:
|
||||
flags |= 0x0080
|
||||
if block[1] & 0x01:
|
||||
flags |= 0x0100
|
||||
if block[1] & 0x02:
|
||||
flags |= 0x0200
|
||||
if block[1] & 0x04:
|
||||
flags |= 0x0400
|
||||
if block[1] & 0x08:
|
||||
flags |= 0x0800
|
||||
if block[6] & 0x01:
|
||||
flags |= 0x1000
|
||||
if block[6] & 0x02:
|
||||
flags |= 0x2000
|
||||
if block[6] & 0x04:
|
||||
flags |= 0x4000
|
||||
if block[6] & 0x08:
|
||||
flags |= 0x8000
|
||||
if block[6] & 0x10:
|
||||
flags |= 0x10000
|
||||
if block[6] & 0x20:
|
||||
flags |= 0x20000
|
||||
if block[6] & 0x40:
|
||||
flags |= 0x40000
|
||||
if block[6] & 0x80:
|
||||
flags |= 0x80000
|
||||
family = (block[1] >> 4) + ((block[2] & 1) << 4)
|
||||
x = ((block[3] << 3) | (block[2] >> 5)) & 0x1F
|
||||
y = (block[3] >> 2) & 0x1F
|
||||
z = ((block[4] << 1) | (block[3] >> 7)) & 0x1F
|
||||
anim_type = block[4] >> 4
|
||||
infos.append(ShapeInfo(family=family, flags=flags, x=x, y=y, z=z, anim_type=anim_type))
|
||||
return infos
|
||||
|
||||
|
||||
def load_globs(path: Path) -> list[list[GlobItem]]:
|
||||
archive = FlexArchive(path)
|
||||
globs: list[list[GlobItem]] = []
|
||||
for index in range(len(archive)):
|
||||
raw = archive.get(index)
|
||||
if not raw:
|
||||
globs.append([])
|
||||
continue
|
||||
count = read_u16_le(raw, 0)
|
||||
items: list[GlobItem] = []
|
||||
for item_index in range(count):
|
||||
base = 2 + item_index * 6
|
||||
items.append(
|
||||
GlobItem(
|
||||
x=raw[base],
|
||||
y=raw[base + 1],
|
||||
z=raw[base + 2],
|
||||
shape=read_u16_le(raw, base + 3),
|
||||
frame=raw[base + 5],
|
||||
)
|
||||
)
|
||||
globs.append(items)
|
||||
return globs
|
||||
|
||||
|
||||
def load_map_items(path: Path, map_index: int) -> list[MapItem]:
|
||||
if not path.exists():
|
||||
raise FileNotFoundError(path)
|
||||
data = path.read_bytes()
|
||||
map_count = read_u16_le(data, FIXED_MAP_COUNT_OFFSET)
|
||||
if map_index < 0 or map_index >= map_count:
|
||||
raise ValueError(f"map index {map_index} out of range 0..{map_count - 1}")
|
||||
table_offset = FIXED_MAP_TABLE_OFFSET + map_index * 8
|
||||
map_offset = read_u32_le(data, table_offset)
|
||||
map_size = read_u32_le(data, table_offset + 4)
|
||||
payload = data[map_offset : map_offset + map_size]
|
||||
if len(payload) != map_size:
|
||||
raise ValueError(f"map {map_index} payload truncated")
|
||||
items: list[MapItem] = []
|
||||
for base in range(0, len(payload), 16):
|
||||
record = payload[base : base + 16]
|
||||
if len(record) < 16:
|
||||
break
|
||||
x = read_u16_le(record, 0) * CRUSADER_COORD_SCALE
|
||||
y = read_u16_le(record, 2) * CRUSADER_COORD_SCALE
|
||||
items.append(
|
||||
MapItem(
|
||||
x=x,
|
||||
y=y,
|
||||
z=record[4],
|
||||
shape=read_u16_le(record, 5),
|
||||
frame=record[7],
|
||||
flags=read_u16_le(record, 8),
|
||||
quality=read_u16_le(record, 10),
|
||||
npc_num=record[12],
|
||||
map_num=record[13],
|
||||
next_item=read_u16_le(record, 14),
|
||||
source="fixed",
|
||||
)
|
||||
)
|
||||
return items
|
||||
|
||||
|
||||
def expand_glob_item(item: MapItem, globs: list[list[GlobItem]]) -> list[MapItem]:
|
||||
if item.quality < 0 or item.quality >= len(globs):
|
||||
return []
|
||||
expanded: list[MapItem] = []
|
||||
for glob_item in globs[item.quality]:
|
||||
expanded.append(
|
||||
MapItem(
|
||||
x=(item.x & GLOB_COORD_MASK) + (glob_item.x << GLOB_COORD_SHIFT) + GLOB_COORD_OFFSET,
|
||||
y=(item.y & GLOB_COORD_MASK) + (glob_item.y << GLOB_COORD_SHIFT) + GLOB_COORD_OFFSET,
|
||||
z=item.z + glob_item.z,
|
||||
shape=glob_item.shape,
|
||||
frame=glob_item.frame,
|
||||
flags=0,
|
||||
quality=0,
|
||||
npc_num=0,
|
||||
map_num=item.map_num,
|
||||
next_item=0,
|
||||
source="glob",
|
||||
)
|
||||
)
|
||||
return expanded
|
||||
|
||||
|
||||
def collect_render_items(
|
||||
base_items: list[MapItem],
|
||||
shape_infos: list[ShapeInfo],
|
||||
globs: list[list[GlobItem]],
|
||||
include_editor: bool,
|
||||
expand_globs: bool,
|
||||
world_rect: tuple[int, int, int, int] | None,
|
||||
include_roofs: bool = True,
|
||||
include_hidden_markers: bool = False,
|
||||
progress: Callable[[str], None] | None = None,
|
||||
checkpoint_every: int = 0,
|
||||
) -> list[MapItem]:
|
||||
render_items: list[MapItem] = []
|
||||
pending = list(base_items)
|
||||
index = 0
|
||||
skipped_invisible = 0
|
||||
skipped_world_rect = 0
|
||||
skipped_invalid_shape = 0
|
||||
skipped_editor = 0
|
||||
skipped_egg = 0
|
||||
skipped_roof = 0
|
||||
skipped_hidden = 0
|
||||
expanded_globs = 0
|
||||
while index < len(pending):
|
||||
item = pending[index]
|
||||
index += 1
|
||||
if item.flags & FLAG_INVISIBLE:
|
||||
if not include_hidden_markers:
|
||||
skipped_hidden += 1
|
||||
continue
|
||||
skipped_invisible += 1
|
||||
if world_rect is not None:
|
||||
min_x, min_y, max_x, max_y = world_rect
|
||||
if item.x < min_x or item.y < min_y or item.x > max_x or item.y > max_y:
|
||||
skipped_world_rect += 1
|
||||
continue
|
||||
if item.shape >= len(shape_infos):
|
||||
skipped_invalid_shape += 1
|
||||
continue
|
||||
info = shape_infos[item.shape]
|
||||
if info.is_editor and not include_editor:
|
||||
skipped_editor += 1
|
||||
continue
|
||||
if info.is_roof and not include_roofs:
|
||||
skipped_roof += 1
|
||||
continue
|
||||
if expand_globs and info.family == 3 and item.source == "fixed":
|
||||
pending.extend(expand_glob_item(item, globs))
|
||||
expanded_globs += 1
|
||||
if not include_hidden_markers:
|
||||
continue
|
||||
if info.family in EGG_FAMILIES and not include_hidden_markers:
|
||||
skipped_egg += 1
|
||||
continue
|
||||
render_items.append(item)
|
||||
if progress is not None and checkpoint_every > 0 and index % checkpoint_every == 0:
|
||||
progress(
|
||||
"collect "
|
||||
f"processed={index} pending={len(pending)} rendered={len(render_items)} "
|
||||
f"expanded_globs={expanded_globs} skipped=(invisible:{skipped_invisible}, world:{skipped_world_rect}, "
|
||||
f"shape:{skipped_invalid_shape}, editor:{skipped_editor}, roof:{skipped_roof}, hidden:{skipped_hidden}, egg:{skipped_egg})"
|
||||
)
|
||||
if progress is not None:
|
||||
progress(
|
||||
"collect complete "
|
||||
f"processed={index} pending={len(pending)} rendered={len(render_items)} "
|
||||
f"expanded_globs={expanded_globs} skipped=(invisible:{skipped_invisible}, world:{skipped_world_rect}, "
|
||||
f"shape:{skipped_invalid_shape}, editor:{skipped_editor}, roof:{skipped_roof}, hidden:{skipped_hidden}, egg:{skipped_egg})"
|
||||
)
|
||||
return render_items
|
||||
|
||||
|
||||
def parse_world_rect(values: list[str] | None) -> tuple[int, int, int, int] | None:
|
||||
if values is None:
|
||||
return None
|
||||
if len(values) != 4:
|
||||
raise ValueError("--world-rect expects four integers: min_x min_y max_x max_y")
|
||||
min_x, min_y, max_x, max_y = (int(value, 0) for value in values)
|
||||
if min_x > max_x or min_y > max_y:
|
||||
raise ValueError("invalid --world-rect bounds")
|
||||
return min_x, min_y, max_x, max_y
|
||||
|
||||
|
||||
def resolve_fixed_dat(static_dir: Path, fixed_dat: str | None) -> Path:
|
||||
if fixed_dat:
|
||||
return Path(fixed_dat)
|
||||
return static_dir / "FIXED.DAT"
|
||||
|
||||
|
||||
def resolve_static_dir(repo_root: Path, game: str, static_dir: str | None) -> Path:
|
||||
if static_dir:
|
||||
return Path(static_dir)
|
||||
if game == "regret":
|
||||
return repo_root / "STATIC_REGRET"
|
||||
return repo_root / "STATIC"
|
||||
Loading…
Add table
Add a link
Reference in a new issue