diff --git a/.github/instructions/ghidra.instructions.md b/.github/instructions/ghidra.instructions.md index 157b7f6..1b1d5db 100644 --- a/.github/instructions/ghidra.instructions.md +++ b/.github/instructions/ghidra.instructions.md @@ -39,6 +39,7 @@ applyTo: "**" - Invoke the toolkit with `\.venv-pyghidra311\Scripts\python.exe -m tools.pyghidra_crusader ...` from the repo root. - Keep PyGhidra batches small too: prefer one focused repair plan or 1-5 direct edits at a time. - Write operations require the Ghidra project to open successfully. If `Crusader.lock` is present because the GUI owns the project, close Ghidra first or operate on a project copy. +- If the workflow needs the user to change Ghidra state, use the ask-questions tool with a yes/no confirmation prompt instead of plain text. Ask the user to close Ghidra before PyGhidra write commands, and ask the user to open the Ghidra project before MCP server commands. The prompt should briefly describe exactly what to do and instruct the user to answer `Yes` only after the action is complete. # Current Verified Raw-Import Ports diff --git a/.github/skills/pyghidra-ghidra-ops/SKILL.md b/.github/skills/pyghidra-ghidra-ops/SKILL.md index ef676b8..f4732da 100644 --- a/.github/skills/pyghidra-ghidra-ops/SKILL.md +++ b/.github/skills/pyghidra-ghidra-ops/SKILL.md @@ -1,12 +1,14 @@ # PyGhidra Ghidra Ops -Use this skill when Ghidra MCP is missing a needed write operation and you need native CPython access to the Ghidra API for the local Crusader project. +Use this skill when Ghidra MCP is missing a needed operation and you need native CPython access to the Ghidra API for the local Crusader project. ## Use Cases - Create or delete functions in `CRUSADER-RAW.EXE`. - Apply small batched repairs driven by verified addresses. - Add comments or rename functions by address from a repeatable JSON plan. +- Decompile or disassemble functions without switching back to the MCP server. +- Query function metadata, search by name, and inspect xrefs from the same local CLI. - Inspect project root files to confirm the program name/path before running edits. ## Workspace Defaults @@ -56,6 +58,63 @@ Rename a function by entry address: .\.venv-pyghidra311\Scripts\python.exe -m tools.pyghidra_crusader rename-function --entry 0006:02cc --name entity_class_get_flag20 ``` +MCP-style read/query commands are also available from the same CLI: + +```powershell +.\.venv-pyghidra311\Scripts\python.exe -m tools.pyghidra_crusader get-function-by-address --address 000a:48ff +.\.venv-pyghidra311\Scripts\python.exe -m tools.pyghidra_crusader get-function-containing --address 000a:4901 +.\.venv-pyghidra311\Scripts\python.exe -m tools.pyghidra_crusader decompile-function-by-address --address 000a:48ff +.\.venv-pyghidra311\Scripts\python.exe -m tools.pyghidra_crusader disassemble-function --address 000a:48ff +.\.venv-pyghidra311\Scripts\python.exe -m tools.pyghidra_crusader read-region --start 000a:48ff --end 000a:4912 +.\.venv-pyghidra311\Scripts\python.exe -m tools.pyghidra_crusader search-functions-by-name --query rng_ +.\.venv-pyghidra311\Scripts\python.exe -m tools.pyghidra_crusader list-strings --limit 20 +.\.venv-pyghidra311\Scripts\python.exe -m tools.pyghidra_crusader list-imports --limit 20 +.\.venv-pyghidra311\Scripts\python.exe -m tools.pyghidra_crusader list-exports --limit 20 +.\.venv-pyghidra311\Scripts\python.exe -m tools.pyghidra_crusader list-namespaces --limit 20 +.\.venv-pyghidra311\Scripts\python.exe -m tools.pyghidra_crusader list-segments --limit 20 +.\.venv-pyghidra311\Scripts\python.exe -m tools.pyghidra_crusader list-data-items --limit 20 +.\.venv-pyghidra311\Scripts\python.exe -m tools.pyghidra_crusader list-classes --limit 20 +.\.venv-pyghidra311\Scripts\python.exe -m tools.pyghidra_crusader get-xrefs-to --address 000a:48ff +.\.venv-pyghidra311\Scripts\python.exe -m tools.pyghidra_crusader get-function-xrefs --name rng_next_modulo +``` + +All commands also support structured output for scripting: + +```powershell +.\.venv-pyghidra311\Scripts\python.exe -m tools.pyghidra_crusader --format json get-function-by-address --address 000a:48ff +``` + +For ad hoc investigation, prefer `run-script` over multiline `python -c` or pasted PowerShell here-strings. It avoids leaving the shared shell stuck in an unfinished string/block state: + +```powershell +.\.venv-pyghidra311\Scripts\python.exe -m tools.pyghidra_crusader run-script --script .\pyghidra_plans\inspect_rng.py --read-only +``` + +Script globals available inside `run-script`: + +```python +config +project +program +helpers["get_function"] +helpers["get_function_containing"] +helpers["decompile_function"] +helpers["disassemble_function"] +helpers["get_xrefs_to"] +helpers["get_xrefs_from"] +helpers["read_region_bytes"] +helpers["rename_function"] +helpers["set_comment"] +``` + +Write-side MCP-style aliases are available too: + +```powershell +.\.venv-pyghidra311\Scripts\python.exe -m tools.pyghidra_crusader rename-function-by-address --entry 000a:48ff --name rng_next_modulo +.\.venv-pyghidra311\Scripts\python.exe -m tools.pyghidra_crusader set-decompiler-comment --address 000a:48ff --text "Returns RNG output modulo the requested bound." +.\.venv-pyghidra311\Scripts\python.exe -m tools.pyghidra_crusader set-disassembly-comment --address 000a:48ff --text "Modulo wrapper around rng_advance_state" +``` + Apply a small JSON plan: ```json @@ -109,4 +168,9 @@ Dry-run a plan before touching the project: - Address strings accept raw `SSSS:OOOO` form or plain integers such as `0x75a90`. - The CLI tries a few root folder path variants when opening the program so it can tolerate minor project path differences. -- Plan files support `remove_functions`, `rename_functions`, `create_functions`, `comments`, and `assert_functions`. \ No newline at end of file +- Plan files support `remove_functions`, `rename_functions`, `create_functions`, `comments`, and `assert_functions`. +- `set-decompiler-comment` maps to a pre-comment and `set-disassembly-comment` maps to an EOL comment. +- Read/query commands open the program read-only; create/rename/comment/plan commands still require the project to be writable. +- `run-script --read-only` is the safest way to do one-off inspection without getting the shared PowerShell session stuck in a multiline Python string. +- `read-region` now reads bytes one address at a time instead of relying on a bulk `getBytes` path that produced misleading all-zero results in this project under PyGhidra. +- PyGhidra startup now suppresses the noisy local GhidraMCP `Module.manifest` warnings during normal CLI operation. \ No newline at end of file diff --git a/.gitignore b/.gitignore index 95db82e..e3938a4 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,26 @@ Thumbs.db # Local Python environments .venv-pyghidra311/ + +# Python caches, bytecode, and tooling state +__pycache__/ +*.py[cod] +*$py.class +.python-version +.pytest_cache/ +.mypy_cache/ +.ruff_cache/ +.pyre/ +.hypothesis/ +.tox/ +.nox/ +.coverage +.coverage.* +htmlcov/ +build/ +dist/ +*.egg-info/ + +# Local scratch and probe files +.tmp_*.txt +.tmp_*.py diff --git a/Crusader.rep/idata/00/~00000006.db/db.34.gbf b/Crusader.rep/idata/00/~00000006.db/db.42.gbf similarity index 99% rename from Crusader.rep/idata/00/~00000006.db/db.34.gbf rename to Crusader.rep/idata/00/~00000006.db/db.42.gbf index 5a1037b..8983d8b 100644 Binary files a/Crusader.rep/idata/00/~00000006.db/db.34.gbf and b/Crusader.rep/idata/00/~00000006.db/db.42.gbf differ diff --git a/Crusader.rep/idata/00/~00000006.db/db.33.gbf b/Crusader.rep/idata/00/~00000006.db/db.43.gbf similarity index 99% rename from Crusader.rep/idata/00/~00000006.db/db.33.gbf rename to Crusader.rep/idata/00/~00000006.db/db.43.gbf index afd9d4e..8c8c5d1 100644 Binary files a/Crusader.rep/idata/00/~00000006.db/db.33.gbf and b/Crusader.rep/idata/00/~00000006.db/db.43.gbf differ diff --git a/Crusader.rep/user/00/~00000005.db/db.2.gbf b/Crusader.rep/user/00/~00000005.db/db.7.gbf similarity index 99% rename from Crusader.rep/user/00/~00000005.db/db.2.gbf rename to Crusader.rep/user/00/~00000005.db/db.7.gbf index a40d688..a8e917e 100644 Binary files a/Crusader.rep/user/00/~00000005.db/db.2.gbf and b/Crusader.rep/user/00/~00000005.db/db.7.gbf differ diff --git a/Crusader.rep/user/00/~00000005.db/db.3.gbf b/Crusader.rep/user/00/~00000005.db/db.8.gbf similarity index 98% rename from Crusader.rep/user/00/~00000005.db/db.3.gbf rename to Crusader.rep/user/00/~00000005.db/db.8.gbf index a8726a6..1778358 100644 Binary files a/Crusader.rep/user/00/~00000005.db/db.3.gbf and b/Crusader.rep/user/00/~00000005.db/db.8.gbf differ diff --git a/crusader_decompilation_notes.md b/crusader_decompilation_notes.md index c182a24..805fa51 100644 --- a/crusader_decompilation_notes.md +++ b/crusader_decompilation_notes.md @@ -75,6 +75,23 @@ Known call-site classifications (by argument pattern): - `entity_fire_weapon` currently decompiles as a thin wrapper that calls `projectile_init_vector`. - `fire_weapon_from_cursor` still decompiles poorly in the raw import, but disassembly shows it begins by pushing cursor sprite/state data from the `0x27d6` area, consistent with the existing seg001 notes. +### Raw seg091 Boundary Recovery (init/context + RNG helpers) + +- Conservative PyGhidra boundary repair created the missing seg091 functions in `CRUSADER-RAW.EXE`: + - `000a:44fd` = `seg091_func_00fd`, body `000a:44fd-000a:454c` + - `000a:454d` = `seg091_func_014d`, body `000a:454d-000a:45fd` + - `000a:48a0` = `rng_advance_state`, body `000a:48a0-000a:48e2` + - `000a:48ff` = `rng_next_modulo`, body `000a:48ff-000a:4912` +- Additional adjacent helper identified directly in the raw import: + - `000a:48e3` = `rng_set_seed` +- Verified current behavior from the raw import: + - `seg091_func_00fd` shares runtime flag `0x44a4` with `runtime_init_or_abort`; if the flag is clear it sets it and dispatches through an unresolved far thunk, then falls into a second unresolved thunk path that Ghidra currently marks as non-returning. + - `seg091_func_014d` also shares flag `0x44a4`; it checks an optional long argument against the global context/cookie at `0x45a6`, zeroes the pointed byte when the argument is null, then dispatches through an unresolved far thunk. Keep the positional name until caller-side analysis resolves the thunk target and full signature. + - `rng_set_seed` writes the 32-bit RNG seed/state pair at `0x4584:0x4586` and forces the low word odd. + - `rng_advance_state` updates the same 32-bit state with a simple multiply/add step. + - `rng_next_modulo` advances the RNG state and returns the result modulo the requested bound, or `0` when the bound is zero. +- Short decompiler comments were added in Ghidra at all five seg091 entries so the current evidence stays attached to the raw database. + ### Raw 0007 Gameplay Helper Batch (entity/tile aux state) - New conservative gameplay-side helper renames (direct analysis from field writes and call structure): @@ -826,7 +843,7 @@ A scroll/camera management cluster found in the `0007:bxxx–0007:dxxx` range. | Address | Name | Evidence | |---------|------|---------| -| `0007:5b6f` | `entity_set_at_target_update_facing` *(likely internal block, not true top-level function)* | Direct raw-analysis name from the visible local behavior: sets entity `+0x3a = 1` (arrived flag); calls `entity_set_facing_direction`; clears bit `0x10` from entity type table `0x7e1e[type*0x79+0x59]`; then tail-calls onward. Relocation data places it at `seg043:016f`, and resolved call sites exist immediately before/after it (`5b36`, `5b44`, `5bb9`), so this address is likely an internal labeled block inside the larger missing `0007:5a00` seg043 function rather than a true entrypoint. | +| `0007:5b6f` | internal block only *(no function after repair)* | Direct raw-analysis behavior remains useful as a local label: this block sets entity `+0x3a = 1` (arrived flag), calls `entity_set_facing_direction`, clears bit `0x10` from entity type table `0x7e1e[type*0x79+0x59]`, then tail-calls onward. After the PyGhidra boundary repair, `0007:5b6f` is no longer a function entry and should be treated only as an internal control-flow label inside the first repaired seg043 routine. | ### seg043 Standalone Boundary Recovery @@ -835,8 +852,15 @@ A scroll/camera management cluster found in the `0007:bxxx–0007:dxxx` range. - `seg043:0090` -> raw `0007:5a90` - `seg043:017a` -> raw `0007:5b7a` - `seg043:021c` -> raw `0007:5c1c` -- The first recovered standalone function spans `0x0090..0x0179`, which means the current raw label at `0007:5b6f` falls inside the tail of that routine and overlaps the true return at raw `0007:5b79`. -- Practical consequence: the missing raw `0007:5a00` seg043 function boundary should not start at segment offset `0x0000`, and the current `0007:5b6f` function object should be treated as a mis-split internal block until Ghidra-side function creation/repair is available. +- The first recovered standalone function spans `0x0090..0x0179`, which means raw `0007:5b6f` falls inside the tail of that routine and overlaps the true return at raw `0007:5b79`. +- Repair status: applied in `CRUSADER-RAW.EXE` via the local PyGhidra toolkit. The bad function object at `0007:5b6f` was removed, and three conservative replacement functions were created: + - `0007:5a90` = `seg043_func_0090` with body `0007:5a90..0007:5b79` + - `0007:5b7a` = `entity_set_at_target_update_facing` with body `0007:5b7a..0007:5c1b` + - `0007:5c1c` = `seg043_func_021c` with body `0007:5c1c..0007:5c80` +- Follow-up re-decompilation now supports one real behavioral rename: `0007:5b7a` sets entity `+0x3a` to 1, calls `entity_set_facing_direction`, clears class-detail bit `0x10` at `0x7e1e[type*0x79+0x59]`, then continues into downstream dispatch, so the repaired middle function has been renamed `entity_set_at_target_update_facing`. +- `0007:5a90` now has a stronger structural read from standalone disassembly: it allocates an object when the incoming far pointer is null (literal `0x98`), runs a far setup helper using DS:`0x4b48..0x4b4e` and the second incoming far pointer, writes `0x4c13` at the object base, calls `entity_set_at_target_update_facing` with the third incoming far pointer, then adjusts the nested object at `+0x38` using extents read from the object at `+0x34` before returning the object pointer. +- `0007:5c1c` also has a stronger structural read: it optionally calls a virtual method through `[object->vtable + 0x4c]` when `object+0x44/+0x46` is non-null, passes a local stack word through `entity_class_get_flag20`, then dispatches one or two downstream far helpers using `object+0x48`, gated by a local status byte at `[bp-0xe]`. +- `0007:5a90` and `0007:5c1c` remain intentionally positional because their current decompiles still collapse into unresolved thunk dispatches and do not yet support safe behavioral names. ### Entity Class Flag Helper @@ -1306,7 +1330,7 @@ Named via systematic analysis of 11,692 NE relocation fixup entries. These are t | Rank | Address | Name | Calls | Description | |------|---------|------|-------|-------------| -| 1 | `000a:44fd` | *(no function in Ghidra)* | 331 | Analysis gap at seg091:00fd. In comutils.c segment near joystick code. Needs manual function creation. | +| 1 | `000a:44fd` | `seg091_func_00fd` | 331 | Recovered boundary. Shares init flag `0x44a4` with `runtime_init_or_abort`; thunk-heavy non-returning wrapper. | | 2 | `0003:ac7e` | `mem_alloc` | 272 | Allocation wrapper → seg082:0000 (`0009:a200`) | | 3 | `0008:dbec` | `entity_word_list_destroy` | 238 | Already named. Frees entity word-list buffer. | | 4 | `0003:a751` | `mem_free` | 207 | Free wrapper → seg082:007a (`0009:a27a` = `mem_free_checked`) | @@ -1372,7 +1396,7 @@ Named via systematic analysis of 11,692 NE relocation fixup entries. These are t |------|---------|------|-------|-------------| | 41 | `000a:7b58` | `nop_return_zero_b` | 56 | Returns 0 (default vtable slot) | | 42 | `000b:3ab2` | `sprite_node_dispatch_event` | 56 | Large event dispatch: checks event type (2/4/8/0x100), updates global focus ptr at [0x4fd0:4fd2], dispatches via vtable methods [+0x14/+0x18/+0x20/+0x24] by event code. Switch table for 16 event types. | -| 43 | `000a:48ff` | *(no function in Ghidra)* | 55 | Analysis gap in comutils.c segment | +| 43 | `000a:48ff` | `rng_next_modulo` | 55 | Advances seg091 RNG state and returns the result modulo the requested bound; returns 0 when bound is 0. | | 44 | `000b:3362` | `sprite_tree_unwind_check` | 55 | Validates SS == param_2 (stack segment guard), then decrements global counter at [0x4fd6] | | 45 | `000b:40ee` | `sprite_node_update_and_dispatch` | 55 | If `sprite_node_is_dirty` returns false: marks dirty, calcs accumulated bounds via `sprite_tree_get_accumulated_bounds` (3ed8), then dispatches via thunk | | 46 | `000a:7b5f` | `vtable_stub_trampoline` | 55 | Calls through fixup thunk (forwarder to another function) | @@ -1397,7 +1421,7 @@ Named via systematic analysis of 11,692 NE relocation fixup entries. These are t - The earlier standalone seg001 port hypothesis in this subrange was wrong. - Relocation data places raw `0007:5a00` at `seg043:0000`, and the already-named helper at `0007:5b6f` sits at `seg043:016f`. - Because of that segment placement, standalone seg001 names such as `debris_spawn` (`0x7490`) and `entity_die` (`0x75ff`) should NOT be ported into this raw range. -- `0007:5b6f` currently remains `entity_set_at_target_update_facing` from direct raw analysis; its behavioral name is no longer in conflict with the standalone seg001 `entity_die` note. +- `0007:5b6f` no longer exists as a function after the PyGhidra repair pass. Its old raw-analysis behavior now lines up with the repaired function `0007:5b7a = entity_set_at_target_update_facing`, so `0007:5b6f` should be treated only as an internal control-flow location inside that function. - Additional resolved call targets inside the missing seg043 block were annotated in Ghidra from relocation data: - `0007:5a8a` -> `entity_set_event_type_checked` - `0007:5a98` -> `FUN_0008_cc01` (timer-related flag/event helper; tests `+0x16 & 0x2`, sets `+0x16 |= 0x800`, copies event field `+0x06` to `+0x22`, checks `0x1000`, then conditionally dispatches) @@ -1406,19 +1430,21 @@ Named via systematic analysis of 11,692 NE relocation fixup entries. These are t - `0007:5bb8` -> `entity_is_type_match` - `0007:5c49` -> `entity_class_get_flag20` - `0007:5c8b` -> `mem_alloc_far` -- Current boundary caveat: - - Ghidra likely split the real seg043 routine incorrectly. `0007:5b6f` has no inbound xrefs, while relocation-resolved calls exist on both sides of it inside the same segment window. Treat the current `0007:5b6f` label as a behavioral anchor for one internal block, not yet as a proven standalone function boundary. - - Standalone seg043 disassembly now strengthens that conclusion: real prologues are at raw `0007:5a90`, `0007:5b7a`, and `0007:5c1c`, so the current `0007:5b6f` boundary demonstrably overlaps an earlier function. +- Current boundary state: + - The seg043 split has now been repaired in Ghidra. Verified temporary functions exist at raw `0007:5a90`, `0007:5b7a`, and `0007:5c1c`. + - The repaired middle function at `0007:5b7a` has now been promoted from a positional label to `entity_set_at_target_update_facing` based on direct decompile/disassembly behavior. + - The remaining repaired functions at `0007:5a90` and `0007:5c1c` should keep their positional names until a later pass resolves the thunk-heavy bodies more clearly. + - The next pass on this region should continue re-decompiling `seg043_func_0090` and `seg043_func_021c`, resolve the still-unknown far thunks they call, and replace the positional names only when their behavior is directly supported. | Address | NE Segment | Callers | Notes | |---------|-----------|---------|-------| -| `000a:44fd` | seg091:00fd | 331 | #1 most-called target! In comutils.c segment. | +| `000a:44fd` | seg091:00fd | 331 | Recovered as `seg091_func_00fd`; thunk-heavy init wrapper sharing flag `0x44a4`. | | `000b:2e00` | seg109:0000 | 74 | Start of segment 109. | | `0007:5a00` | seg043:0000 | 64 | Start of segment 43. Earlier seg001 `debris_spawn` port was rejected; still needs manual function creation and direct analysis. | -| `000a:48ff` | seg091:04ff | 55 | In comutils.c segment near joystick code. | +| `000a:48ff` | seg091:04ff | 55 | Recovered as `rng_next_modulo`; bounded wrapper around seg091 RNG state advance. | | `0003:a880` | seg005:0880 | 49 | In CRT segment near `far_memcpy`. | | `0003:ad75` | seg005:0d75 | 43 | In CRT segment near `mem_alloc`. | -| `000a:454d` | seg091:014d | 32 | In comutils.c segment. | +| `000a:454d` | seg091:014d | 32 | Recovered as `seg091_func_014d`; init/context helper using the `0x45a6` cookie/context global. | ### Tier 4: Ranks 61-80 (29-42 callers) @@ -1438,7 +1464,7 @@ Named via systematic analysis of 11,692 NE relocation fixup entries. These are t | 72 | `0009:c433` | `event_queue_align_index` | 34 | Returns `param_1 & 0xFFF8` — aligns ring index to 8-byte event slot boundary | | 73 | `0009:2156` | `dos_file_get_size` | 33 | Saves file position, does INT 21h AH=42h AL=02 (seek to end), restores position. Returns file size in DX:AX | | 74 | `000a:2c41` | `list_iterate_next` | 33 | Linked list iterator: if *out==0 returns first from obj+2; else follows next at ptr+2/+4. Returns bool (has more) | -| 75 | `000a:454d` | *(no function in Ghidra)* | 32 | Analysis gap in comutils.c segment | +| 75 | `000a:454d` | `seg091_func_014d` | 32 | Recovered boundary. Shares flag `0x44a4`; checks optional long argument against the `0x45a6` cookie/context global. | | 76 | `000b:2446` | `sprite_clear_redraw_flag` | 31 | Clears flag at obj+0x17e, then dispatches via thunk | | 77 | `0005:1238` | `entity_get_class_word` | 30 | Looks up table at [0x7e01] indexed by *param_1 * 2, returns word. Sister of `entity_get_type_word` (which uses [0x7df9]) | | 78 | `000b:1446` | `display_null_check_dispatch` | 30 | Null-checks far ptr params, dispatches to different thunks based on result | @@ -1559,13 +1585,13 @@ Compares two 5-byte `map_position` structs: `{ x:word, y:word, layer:byte }`. Re | `0x7df1` | word[] | 2 | Entity base type word | | `0x7e1e` | struct[] | 0x79 | Entity class detail records (121 bytes per class) | -### Analysis Gaps (No Function in Ghidra) +### Recent Manual Boundary Repairs -These high-traffic addresses need manual function creation in Ghidra (Script Manager or UI): +Recent high-traffic addresses recovered with manual function creation in Ghidra/PyGhidra: | Address | NE Segment | Callers | Notes | |---------|-----------|---------|-------| -| `000a:44fd` | seg091:00fd | 331 | #1 most-called target! In comutils.c segment. | +| `000a:48ff` | seg091:04ff | 55 | Recovered as `rng_next_modulo`; manual boundary repair narrowed to `000a:48ff-000a:4912`. | | `000b:2e00` | seg109:0000 | 74 | Start of segment 109. | | `0007:5a00` | seg043:0000 | 64 | Start of segment 43. Earlier seg001 `debris_spawn` port was rejected; still needs manual function creation and direct analysis. | | `0009:a200` | seg082:0000 | - | Target of `mem_alloc`. Start of segment 82. | diff --git a/tools/pyghidra_crusader/__pycache__/__init__.cpython-314.pyc b/tools/pyghidra_crusader/__pycache__/__init__.cpython-314.pyc new file mode 100644 index 0000000..161be06 Binary files /dev/null and b/tools/pyghidra_crusader/__pycache__/__init__.cpython-314.pyc differ diff --git a/tools/pyghidra_crusader/__pycache__/__main__.cpython-314.pyc b/tools/pyghidra_crusader/__pycache__/__main__.cpython-314.pyc new file mode 100644 index 0000000..de996bf Binary files /dev/null and b/tools/pyghidra_crusader/__pycache__/__main__.cpython-314.pyc differ diff --git a/tools/pyghidra_crusader/__pycache__/cli.cpython-311.pyc b/tools/pyghidra_crusader/__pycache__/cli.cpython-311.pyc index d54a6c5..a363335 100644 Binary files a/tools/pyghidra_crusader/__pycache__/cli.cpython-311.pyc and b/tools/pyghidra_crusader/__pycache__/cli.cpython-311.pyc differ diff --git a/tools/pyghidra_crusader/__pycache__/cli.cpython-314.pyc b/tools/pyghidra_crusader/__pycache__/cli.cpython-314.pyc new file mode 100644 index 0000000..8a88405 Binary files /dev/null and b/tools/pyghidra_crusader/__pycache__/cli.cpython-314.pyc differ diff --git a/tools/pyghidra_crusader/__pycache__/common.cpython-311.pyc b/tools/pyghidra_crusader/__pycache__/common.cpython-311.pyc index acf5897..610f196 100644 Binary files a/tools/pyghidra_crusader/__pycache__/common.cpython-311.pyc and b/tools/pyghidra_crusader/__pycache__/common.cpython-311.pyc differ diff --git a/tools/pyghidra_crusader/__pycache__/common.cpython-314.pyc b/tools/pyghidra_crusader/__pycache__/common.cpython-314.pyc new file mode 100644 index 0000000..d0f6565 Binary files /dev/null and b/tools/pyghidra_crusader/__pycache__/common.cpython-314.pyc differ diff --git a/tools/pyghidra_crusader/cli.py b/tools/pyghidra_crusader/cli.py index 41f62fd..ed938fa 100644 --- a/tools/pyghidra_crusader/cli.py +++ b/tools/pyghidra_crusader/cli.py @@ -12,13 +12,30 @@ from .common import ( DEFAULT_FOLDER_PATH, ProjectConfig, create_function, + decompile_function, + disassemble_function, + format_function_summary, get_function, + get_function_containing, + get_functions_by_exact_name, + get_xrefs_from, + get_xrefs_to, + list_classes, + list_data_items, + list_exports, list_root_files, + list_imports, + list_namespaces, + list_segments, + list_strings, open_program, open_project, + read_region_bytes, remove_function, rename_function, + run_script_file, save_program, + search_functions_by_name, set_comment, transaction, ) @@ -58,6 +75,12 @@ def build_parser() -> argparse.ArgumentParser: action="store_true", help="Restore project tool state while opening the project.", ) + parser.add_argument( + "--format", + choices=["text", "json"], + default="text", + help="Output format.", + ) subparsers = parser.add_subparsers(dest="command", required=True) @@ -66,6 +89,13 @@ def build_parser() -> argparse.ArgumentParser: help="List root-level files in the Ghidra project.", ) + dump_parser = subparsers.add_parser( + "dump-region", + help="Dump instructions and resolved call targets for an address range.", + ) + dump_parser.add_argument("--start", required=True, help="Start address.") + dump_parser.add_argument("--end", required=True, help="Inclusive end address.") + create_parser = subparsers.add_parser( "create-function", help="Create a function at an address with an optional explicit body range.", @@ -92,6 +122,15 @@ def build_parser() -> argparse.ArgumentParser: rename_parser.add_argument("--entry", required=True, help="Function entry address.") rename_parser.add_argument("--name", required=True, help="New function name.") + rename_by_address_parser = subparsers.add_parser( + "rename-function-by-address", + help="Rename an existing function by entry address (MCP-style alias).", + ) + rename_by_address_parser.add_argument( + "--entry", required=True, help="Function entry address." + ) + rename_by_address_parser.add_argument("--name", required=True, help="New function name.") + comment_parser = subparsers.add_parser( "set-comment", help="Set a code-unit comment by address.", @@ -105,6 +144,161 @@ def build_parser() -> argparse.ArgumentParser: help="Comment type.", ) + decompiler_comment_parser = subparsers.add_parser( + "set-decompiler-comment", + help="Set a decompiler-visible pre-comment by address.", + ) + decompiler_comment_parser.add_argument("--address", required=True, help="Comment target address.") + decompiler_comment_parser.add_argument("--text", required=True, help="Comment text.") + + disassembly_comment_parser = subparsers.add_parser( + "set-disassembly-comment", + help="Set a disassembly EOL comment by address.", + ) + disassembly_comment_parser.add_argument("--address", required=True, help="Comment target address.") + disassembly_comment_parser.add_argument("--text", required=True, help="Comment text.") + + get_function_parser = subparsers.add_parser( + "get-function-by-address", + help="Show function metadata for an exact entry address.", + ) + get_function_parser.add_argument("--address", required=True, help="Function entry address.") + + get_function_containing_parser = subparsers.add_parser( + "get-function-containing", + help="Show function metadata for the function containing an address.", + ) + get_function_containing_parser.add_argument( + "--address", required=True, help="Address inside the desired function body." + ) + + list_functions_parser = subparsers.add_parser( + "list-functions", + help="List all defined functions.", + ) + list_functions_parser.add_argument("--offset", type=int, default=0, help="Pagination offset.") + list_functions_parser.add_argument("--limit", type=int, default=100, help="Maximum functions to print.") + + list_segments_parser = subparsers.add_parser( + "list-segments", + help="List memory segments or blocks.", + ) + list_segments_parser.add_argument("--offset", type=int, default=0, help="Pagination offset.") + list_segments_parser.add_argument("--limit", type=int, default=100, help="Maximum segments to print.") + + list_data_items_parser = subparsers.add_parser( + "list-data-items", + help="List defined data items.", + ) + list_data_items_parser.add_argument("--offset", type=int, default=0, help="Pagination offset.") + list_data_items_parser.add_argument("--limit", type=int, default=100, help="Maximum data items to print.") + + list_classes_parser = subparsers.add_parser( + "list-classes", + help="List class namespaces.", + ) + list_classes_parser.add_argument("--offset", type=int, default=0, help="Pagination offset.") + list_classes_parser.add_argument("--limit", type=int, default=100, help="Maximum classes to print.") + + list_strings_parser = subparsers.add_parser( + "list-strings", + help="List defined strings in the program.", + ) + list_strings_parser.add_argument("--offset", type=int, default=0, help="Pagination offset.") + list_strings_parser.add_argument("--limit", type=int, default=2000, help="Maximum strings to print.") + list_strings_parser.add_argument("--filter", help="Optional substring filter.") + + list_imports_parser = subparsers.add_parser( + "list-imports", + help="List imported external symbols.", + ) + list_imports_parser.add_argument("--offset", type=int, default=0, help="Pagination offset.") + list_imports_parser.add_argument("--limit", type=int, default=100, help="Maximum imports to print.") + + list_exports_parser = subparsers.add_parser( + "list-exports", + help="List exported entry points and symbols.", + ) + list_exports_parser.add_argument("--offset", type=int, default=0, help="Pagination offset.") + list_exports_parser.add_argument("--limit", type=int, default=100, help="Maximum exports to print.") + + list_namespaces_parser = subparsers.add_parser( + "list-namespaces", + help="List non-global namespaces, classes, and libraries.", + ) + list_namespaces_parser.add_argument("--offset", type=int, default=0, help="Pagination offset.") + list_namespaces_parser.add_argument("--limit", type=int, default=100, help="Maximum namespaces to print.") + + search_functions_parser = subparsers.add_parser( + "search-functions-by-name", + help="List functions whose names contain a substring.", + ) + search_functions_parser.add_argument("--query", required=True, help="Substring to match.") + search_functions_parser.add_argument("--offset", type=int, default=0, help="Pagination offset.") + search_functions_parser.add_argument("--limit", type=int, default=100, help="Maximum functions to print.") + + decompile_name_parser = subparsers.add_parser( + "decompile-function", + help="Decompile an exact-named function.", + ) + decompile_name_parser.add_argument("--name", required=True, help="Exact function name.") + decompile_name_parser.add_argument("--timeout", type=int, default=30, help="Decompile timeout in seconds.") + + decompile_address_parser = subparsers.add_parser( + "decompile-function-by-address", + help="Decompile a function by entry address.", + ) + decompile_address_parser.add_argument("--address", required=True, help="Function entry address.") + decompile_address_parser.add_argument("--timeout", type=int, default=30, help="Decompile timeout in seconds.") + + disassemble_parser = subparsers.add_parser( + "disassemble-function", + help="Disassemble a function body by entry address.", + ) + disassemble_parser.add_argument("--address", required=True, help="Function entry address.") + + read_region_parser = subparsers.add_parser( + "read-region", + help="Dump raw bytes for an inclusive address range.", + ) + read_region_parser.add_argument("--start", required=True, help="Start address.") + read_region_parser.add_argument("--end", required=True, help="Inclusive end address.") + + run_script_parser = subparsers.add_parser( + "run-script", + help="Execute a Python file with project/program context to avoid interactive shell quoting issues.", + ) + run_script_parser.add_argument("--script", required=True, help="Path to the Python script file.") + run_script_parser.add_argument( + "--read-only", + action="store_true", + help="Open the program read-only for script execution.", + ) + + xrefs_to_parser = subparsers.add_parser( + "get-xrefs-to", + help="List references to an address.", + ) + xrefs_to_parser.add_argument("--address", required=True, help="Target address.") + xrefs_to_parser.add_argument("--offset", type=int, default=0, help="Pagination offset.") + xrefs_to_parser.add_argument("--limit", type=int, default=100, help="Maximum references to print.") + + xrefs_from_parser = subparsers.add_parser( + "get-xrefs-from", + help="List references from an address.", + ) + xrefs_from_parser.add_argument("--address", required=True, help="Source address.") + xrefs_from_parser.add_argument("--offset", type=int, default=0, help="Pagination offset.") + xrefs_from_parser.add_argument("--limit", type=int, default=100, help="Maximum references to print.") + + function_xrefs_parser = subparsers.add_parser( + "get-function-xrefs", + help="List references to a function entry by exact function name.", + ) + function_xrefs_parser.add_argument("--name", required=True, help="Exact function name.") + function_xrefs_parser.add_argument("--offset", type=int, default=0, help="Pagination offset.") + function_xrefs_parser.add_argument("--limit", type=int, default=100, help="Maximum references to print.") + plan_parser = subparsers.add_parser( "apply-plan", help="Apply a JSON edit plan containing function and comment operations.", @@ -128,13 +322,85 @@ def build_config(args: argparse.Namespace) -> ProjectConfig: folder_path=args.folder_path, restore_project=args.restore_project, ) + + +def _emit(args: argparse.Namespace, payload, text: str | None = None) -> int: + if args.format == "json": + print(json.dumps(payload, indent=2, sort_keys=True)) + return 0 + if text is not None: + print(text) + return 0 + if isinstance(payload, list): + for item in payload: + print(item) + return 0 + if isinstance(payload, dict): + print(json.dumps(payload, indent=2, sort_keys=True)) + return 0 + print(payload) + return 0 + + +def _function_to_dict(function) -> dict[str, str]: + summary_text = format_function_summary(function) + lines = summary_text.splitlines() + body_line = lines[3].split(": ", 1)[1] + body_start, body_end = body_line.split(" - ", 1) + return { + "name": function.getName(), + "signature": lines[1].split(": ", 1)[1], + "entry": str(function.getEntryPoint()), + "body_start": body_start, + "body_end": body_end, + } + + +def _function_line(function) -> str: + return f"{function.getName()} @ {function.getEntryPoint()}" + + +def _text_or_empty(lines: list[str], empty_message: str) -> str: + return "\n".join(lines) if lines else empty_message + + def command_project_files(config: ProjectConfig, _args: argparse.Namespace) -> int: project = open_project(config) try: - for name in list_root_files(project): - print(name) + names = list_root_files(project) finally: project.close() + return _emit(_args, names, "\n".join(names)) + + +def command_dump_region(config: ProjectConfig, args: argparse.Namespace) -> int: + from .common import to_address + + with open_program(config, read_only=True) as (_project, program): + listing = program.getListing() + memory = program.getMemory() + start = to_address(program, args.start) + end = to_address(program, args.end) + size = end.subtract(start) + 1 + buf = bytearray(size) + memory.getBytes(start, buf) + print(f"REGION {args.start}..{args.end} BYTES {bytes(buf[:32]).hex()}") + + instruction = listing.getInstructionAt(start) + while instruction is not None and instruction.getAddress().compareTo(end) <= 0: + line = f"{instruction.getAddress()}: {instruction.toString()}" + if instruction.getFlowType().isCall(): + references = instruction.getReferencesFrom() + if references: + target = references[0].getToAddress() + function = program.getFunctionManager().getFunctionAt(target) + if function is not None: + line += f" -> {function.getName()} @ {target}" + else: + line += f" -> {target}" + print(line) + instruction = instruction.getNext() + return 0 @@ -145,8 +411,11 @@ def command_create_function(config: ProjectConfig, args: argparse.Namespace) -> if args.plate_comment: set_comment(program, args.entry, args.plate_comment, "plate") save_program(project, program) - print(f"created {function.getName()} at {args.entry}") - return 0 + return _emit( + args, + {"status": "ok", "entry": args.entry, "name": function.getName(), "action": "create-function"}, + f"created {function.getName()} at {args.entry}", + ) def command_delete_function(config: ProjectConfig, args: argparse.Namespace) -> int: @@ -156,8 +425,11 @@ def command_delete_function(config: ProjectConfig, args: argparse.Namespace) -> if not removed: raise RuntimeError(f"no function removed at {args.entry}") save_program(project, program) - print(f"deleted function at {args.entry}") - return 0 + return _emit( + args, + {"status": "ok", "entry": args.entry, "action": "delete-function"}, + f"deleted function at {args.entry}", + ) def command_rename_function(config: ProjectConfig, args: argparse.Namespace) -> int: @@ -165,17 +437,276 @@ def command_rename_function(config: ProjectConfig, args: argparse.Namespace) -> with transaction(program, f"Rename function {args.entry}"): function = rename_function(program, args.entry, args.name) save_program(project, program) - print(f"renamed {args.entry} to {function.getName()}") - return 0 + return _emit( + args, + {"status": "ok", "entry": args.entry, "name": function.getName(), "action": "rename-function"}, + f"renamed {args.entry} to {function.getName()}", + ) + + +def _set_comment_with_type(config: ProjectConfig, args: argparse.Namespace, address: str, text: str, comment_type: str) -> int: + with open_program(config, read_only=False) as (project, program): + with transaction(program, f"Set comment {address}"): + set_comment(program, address, text, comment_type) + save_program(project, program) + return _emit( + args, + {"status": "ok", "address": address, "type": comment_type, "text": text, "action": "set-comment"}, + f"set {comment_type} comment at {address}", + ) def command_set_comment(config: ProjectConfig, args: argparse.Namespace) -> int: - with open_program(config, read_only=False) as (project, program): - with transaction(program, f"Set comment {args.address}"): - set_comment(program, args.address, args.text, args.type) - save_program(project, program) - print(f"set {args.type} comment at {args.address}") - return 0 + return _set_comment_with_type(config, args, args.address, args.text, args.type) + + +def command_set_decompiler_comment(config: ProjectConfig, args: argparse.Namespace) -> int: + return _set_comment_with_type(config, args, args.address, args.text, "pre") + + +def command_set_disassembly_comment(config: ProjectConfig, args: argparse.Namespace) -> int: + return _set_comment_with_type(config, args, args.address, args.text, "eol") + + +def _require_function_by_address(program, address_text: str): + function = get_function(program, address_text) + if function is None: + raise RuntimeError(f"no function found at {address_text}") + return function + + +def _require_single_function_by_name(program, name: str): + matches = get_functions_by_exact_name(program, name) + if not matches: + raise RuntimeError(f"no function found with exact name '{name}'") + if len(matches) > 1: + raise RuntimeError( + f"multiple functions match exact name '{name}'; use search-functions-by-name or an address-specific command" + ) + return matches[0] + + +def _print_function_lines(functions) -> None: + for function in functions: + print(f"{function.getName()} @ {function.getEntryPoint()}") + + +def _print_reference_lines(references: list[dict[str, str | int]]) -> None: + for reference in references: + print( + f"{reference['from']} -> {reference['to']} [{reference['type']}] operand={reference['operand_index']}" + ) + + +def command_get_function_by_address(config: ProjectConfig, args: argparse.Namespace) -> int: + with open_program(config, read_only=True) as (_project, program): + function = _require_function_by_address(program, args.address) + payload = _function_to_dict(function) + text = format_function_summary(function) + return _emit(args, payload, text) + + +def command_get_function_containing(config: ProjectConfig, args: argparse.Namespace) -> int: + with open_program(config, read_only=True) as (_project, program): + function = get_function_containing(program, args.address) + if function is None: + raise RuntimeError(f"no containing function found at {args.address}") + payload = _function_to_dict(function) + text = format_function_summary(function) + return _emit(args, payload, text) + + +def command_list_functions(config: ProjectConfig, args: argparse.Namespace) -> int: + with open_program(config, read_only=True) as (_project, program): + functions = search_functions_by_name(program, "", offset=args.offset, limit=args.limit) + payload = [{"name": function.getName(), "entry": str(function.getEntryPoint())} for function in functions] + text = _text_or_empty([_function_line(function) for function in functions], "no functions found") + return _emit(args, payload, text) + + +def command_search_functions_by_name(config: ProjectConfig, args: argparse.Namespace) -> int: + with open_program(config, read_only=True) as (_project, program): + functions = search_functions_by_name(program, args.query, offset=args.offset, limit=args.limit) + payload = [{"name": function.getName(), "entry": str(function.getEntryPoint())} for function in functions] + text = _text_or_empty([_function_line(function) for function in functions], "no matching functions found") + return _emit(args, payload, text) + + +def command_list_strings(config: ProjectConfig, args: argparse.Namespace) -> int: + with open_program(config, read_only=True) as (_project, program): + strings = list_strings(program, offset=args.offset, limit=args.limit, filter_text=args.filter) + text = _text_or_empty([f"{entry['address']}: {entry['text']}" for entry in strings], "no strings found") + return _emit(args, strings, text) + + +def command_list_segments(config: ProjectConfig, args: argparse.Namespace) -> int: + with open_program(config, read_only=True) as (_project, program): + segments = list_segments(program, offset=args.offset, limit=args.limit) + text = _text_or_empty( + [ + f"{entry['name']} {entry['start']} - {entry['end']} len={entry['length']}" + f" r={entry['read']} w={entry['write']} x={entry['execute']} init={entry['initialized']}" + for entry in segments + ], + "no segments found", + ) + return _emit(args, segments, text) + + +def command_list_data_items(config: ProjectConfig, args: argparse.Namespace) -> int: + with open_program(config, read_only=True) as (_project, program): + items = list_data_items(program, offset=args.offset, limit=args.limit) + text = _text_or_empty( + [ + f"{entry['address']} {entry['mnemonic']} len={entry['length']}" + + (f" value={entry['value']}" if entry['value'] is not None else "") + for entry in items + ], + "no data items found", + ) + return _emit(args, items, text) + + +def command_list_classes(config: ProjectConfig, args: argparse.Namespace) -> int: + with open_program(config, read_only=True) as (_project, program): + classes = list_classes(program, offset=args.offset, limit=args.limit) + text = _text_or_empty( + [ + f"{entry['name']}" + (f" parent={entry['parent']}" if entry['parent'] else "") + for entry in classes + ], + "no classes found", + ) + return _emit(args, classes, text) + + +def command_list_imports(config: ProjectConfig, args: argparse.Namespace) -> int: + with open_program(config, read_only=True) as (_project, program): + imports = list_imports(program, offset=args.offset, limit=args.limit) + text = _text_or_empty([ + f"{entry['library']}!{entry['label'] or ''} @ {entry['address'] or ''}" + for entry in imports + ], "no imports found") + return _emit(args, imports, text) + + +def command_list_exports(config: ProjectConfig, args: argparse.Namespace) -> int: + with open_program(config, read_only=True) as (_project, program): + exports = list_exports(program, offset=args.offset, limit=args.limit) + text = _text_or_empty([ + f"{entry['name'] or ''} @ {entry['address']} [{entry['kind']}]" + for entry in exports + ], "no exports found") + return _emit(args, exports, text) + + +def command_list_namespaces(config: ProjectConfig, args: argparse.Namespace) -> int: + with open_program(config, read_only=True) as (_project, program): + namespaces = list_namespaces(program, offset=args.offset, limit=args.limit) + text = _text_or_empty([ + f"{entry['name']} [{entry['type']}]" + (f" parent={entry['parent']}" if entry['parent'] else "") + for entry in namespaces + ], "no namespaces found") + return _emit(args, namespaces, text) + + +def command_decompile_function_by_address(config: ProjectConfig, args: argparse.Namespace) -> int: + with open_program(config, read_only=True) as (_project, program): + function = _require_function_by_address(program, args.address) + output = decompile_function(program, function, args.timeout) + return _emit(args, {"address": args.address, "decompiled": output}, output) + + +def command_decompile_function(config: ProjectConfig, args: argparse.Namespace) -> int: + with open_program(config, read_only=True) as (_project, program): + function = _require_single_function_by_name(program, args.name) + output = decompile_function(program, function, args.timeout) + return _emit(args, {"name": args.name, "decompiled": output}, output) + + +def command_disassemble_function(config: ProjectConfig, args: argparse.Namespace) -> int: + with open_program(config, read_only=True) as (_project, program): + function = _require_function_by_address(program, args.address) + lines = disassemble_function(program, function) + if not lines: + code_unit = program.getListing().getCodeUnitAt(function.getEntryPoint()) + lines = [ + f"no instructions found in body {function.getBody().getMinAddress()} - {function.getBody().getMaxAddress()}; entry code unit = {code_unit}" + ] + return _emit(args, {"address": args.address, "lines": lines}, "\n".join(lines)) + + +def command_read_region(config: ProjectConfig, args: argparse.Namespace) -> int: + with open_program(config, read_only=True) as (_project, program): + data = read_region_bytes(program, args.start, args.end) + text = f"REGION {args.start}..{args.end} BYTES {data.hex()}" + return _emit(args, {"start": args.start, "end": args.end, "bytes": data.hex()}, text) + + +def command_get_xrefs_to(config: ProjectConfig, args: argparse.Namespace) -> int: + with open_program(config, read_only=True) as (_project, program): + references = get_xrefs_to(program, args.address, offset=args.offset, limit=args.limit) + text = _text_or_empty([ + f"{reference['from']} -> {reference['to']} [{reference['type']}] operand={reference['operand_index']}" + for reference in references + ], "no xrefs found") + return _emit(args, references, text) + + +def command_get_xrefs_from(config: ProjectConfig, args: argparse.Namespace) -> int: + with open_program(config, read_only=True) as (_project, program): + references = get_xrefs_from(program, args.address, offset=args.offset, limit=args.limit) + text = _text_or_empty([ + f"{reference['from']} -> {reference['to']} [{reference['type']}] operand={reference['operand_index']}" + for reference in references + ], "no xrefs found") + return _emit(args, references, text) + + +def command_get_function_xrefs(config: ProjectConfig, args: argparse.Namespace) -> int: + with open_program(config, read_only=True) as (_project, program): + function = _require_single_function_by_name(program, args.name) + references = get_xrefs_to( + program, + str(function.getEntryPoint()), + offset=args.offset, + limit=args.limit, + ) + text = _text_or_empty([ + f"{reference['from']} -> {reference['to']} [{reference['type']}] operand={reference['operand_index']}" + for reference in references + ], "no xrefs found") + return _emit(args, references, text) + + +def command_run_script(config: ProjectConfig, args: argparse.Namespace) -> int: + script_path = Path(args.script).resolve() + if not script_path.is_file(): + raise RuntimeError(f"script file not found: {script_path}") + + with open_program(config, read_only=args.read_only) as (project, program): + script_globals = { + "config": config, + "project": project, + "program": program, + "helpers": { + "create_function": create_function, + "decompile_function": decompile_function, + "disassemble_function": disassemble_function, + "format_function_summary": format_function_summary, + "get_function": get_function, + "get_function_containing": get_function_containing, + "get_xrefs_from": get_xrefs_from, + "get_xrefs_to": get_xrefs_to, + "read_region_bytes": read_region_bytes, + "rename_function": rename_function, + "set_comment": set_comment, + }, + } + run_script_file(script_path, script_globals) + if not args.read_only: + save_program(project, program) + return _emit(args, {"status": "ok", "script": str(script_path)}, f"ran script {script_path}") def _load_plan(plan_path: str) -> dict: @@ -190,6 +721,9 @@ def _print_plan(plan: dict) -> None: def command_apply_plan(config: ProjectConfig, args: argparse.Namespace) -> int: plan = _load_plan(args.plan) if args.dry_run: + if args.format == "json": + _print_plan(plan) + return 0 _print_plan(plan) return 0 @@ -234,8 +768,7 @@ def command_apply_plan(config: ProjectConfig, args: argparse.Namespace) -> int: save_program(project, program) - print(f"applied plan {args.plan}") - return 0 + return _emit(args, {"status": "ok", "plan": args.plan}, f"applied plan {args.plan}") def main(argv: list[str] | None = None) -> int: @@ -244,11 +777,34 @@ def main(argv: list[str] | None = None) -> int: config = build_config(args) command_map = { + "dump-region": command_dump_region, "project-files": command_project_files, "create-function": command_create_function, "delete-function": command_delete_function, "rename-function": command_rename_function, + "rename-function-by-address": command_rename_function, "set-comment": command_set_comment, + "set-decompiler-comment": command_set_decompiler_comment, + "set-disassembly-comment": command_set_disassembly_comment, + "get-function-by-address": command_get_function_by_address, + "get-function-containing": command_get_function_containing, + "list-functions": command_list_functions, + "list-segments": command_list_segments, + "list-data-items": command_list_data_items, + "list-classes": command_list_classes, + "list-strings": command_list_strings, + "list-imports": command_list_imports, + "list-exports": command_list_exports, + "list-namespaces": command_list_namespaces, + "search-functions-by-name": command_search_functions_by_name, + "decompile-function": command_decompile_function, + "decompile-function-by-address": command_decompile_function_by_address, + "disassemble-function": command_disassemble_function, + "read-region": command_read_region, + "get-xrefs-to": command_get_xrefs_to, + "get-xrefs-from": command_get_xrefs_from, + "get-function-xrefs": command_get_function_xrefs, + "run-script": command_run_script, "apply-plan": command_apply_plan, } return command_map[args.command](config, args) diff --git a/tools/pyghidra_crusader/common.py b/tools/pyghidra_crusader/common.py index 48cd972..54279b3 100644 --- a/tools/pyghidra_crusader/common.py +++ b/tools/pyghidra_crusader/common.py @@ -4,6 +4,7 @@ from contextlib import contextmanager from dataclasses import dataclass from pathlib import Path import os +import sys REPO_ROOT = Path(__file__).resolve().parents[2] @@ -31,10 +32,29 @@ def ensure_pyghidra_started(install_dir: Path | None = None): resolved_dir = Path(install_dir or DEFAULT_INSTALL_DIR) if not pyghidra.started(): - pyghidra.start(install_dir=resolved_dir) + with suppress_process_output(): + pyghidra.start(install_dir=resolved_dir) return pyghidra +@contextmanager +def suppress_process_output(): + with open(os.devnull, "w", encoding="utf-8") as devnull: + original_stdout = os.dup(1) + original_stderr = os.dup(2) + try: + sys.stdout.flush() + sys.stderr.flush() + os.dup2(devnull.fileno(), 1) + os.dup2(devnull.fileno(), 2) + yield + finally: + os.dup2(original_stdout, 1) + os.dup2(original_stderr, 2) + os.close(original_stdout) + os.close(original_stderr) + + def parse_address_text(address_text: str) -> int: text = address_text.strip() if ":" in text: @@ -48,6 +68,19 @@ def to_address(program, address_text: str): return address_space.getAddress(parse_address_text(address_text)) +def format_address(address) -> str: + return str(address) + + +def iter_java_items(items): + if hasattr(items, "hasNext") and hasattr(items, "next"): + while items.hasNext(): + yield items.next() + return + for item in items: + yield item + + def format_project_error(config: ProjectConfig, exc: Exception) -> RuntimeError: lock_path = config.project_dir / f"{config.project_name}.lock" details = [ @@ -127,6 +160,141 @@ def get_function(program, entry_text: str): return program.getFunctionManager().getFunctionAt(to_address(program, entry_text)) +def get_function_containing(program, address_text: str): + return program.getFunctionManager().getFunctionContaining(to_address(program, address_text)) + + +def read_region_bytes(program, start_text: str, end_text: str) -> bytes: + memory = program.getMemory() + start = to_address(program, start_text) + end = to_address(program, end_text) + size = end.subtract(start) + 1 + if size < 0: + raise ValueError(f"invalid address range: {start_text}..{end_text}") + + data = bytearray() + current = start + for _ in range(size): + data.append(int(memory.getByte(current)) & 0xFF) + current = current.next() + return bytes(data) + + +def iter_functions(program): + return program.getFunctionManager().getFunctions(True) + + +def function_signature(function) -> str: + return function.getPrototypeString(True, True) + + +def function_body_range(function) -> tuple[str, str]: + body = function.getBody() + return format_address(body.getMinAddress()), format_address(body.getMaxAddress()) + + +def format_function_summary(function) -> str: + body_start, body_end = function_body_range(function) + return ( + f"Function: {function.getName()} at {format_address(function.getEntryPoint())}\n" + f"Signature: {function_signature(function)}\n" + f"Entry: {format_address(function.getEntryPoint())}\n" + f"Body: {body_start} - {body_end}" + ) + + +def list_segments(program, offset: int = 0, limit: int = 100): + memory = program.getMemory() + matches = [] + skipped = 0 + for block in memory.getBlocks(): + if skipped < offset: + skipped += 1 + continue + matches.append( + { + "name": block.getName(), + "start": format_address(block.getStart()), + "end": format_address(block.getEnd()), + "length": int(block.getSize()), + "initialized": bool(block.isInitialized()), + "read": bool(block.isRead()), + "write": bool(block.isWrite()), + "execute": bool(block.isExecute()), + } + ) + if len(matches) >= limit: + break + return matches + + +def list_data_items(program, offset: int = 0, limit: int = 100): + listing = program.getListing() + matches = [] + skipped = 0 + for data in iter_java_items(listing.getDefinedData(True)): + if skipped < offset: + skipped += 1 + continue + value = data.getValue() + matches.append( + { + "address": format_address(data.getAddress()), + "length": int(data.getLength()), + "mnemonic": data.getMnemonicString(), + "value": None if value is None else str(value), + } + ) + if len(matches) >= limit: + break + return matches + + +def list_classes(program, offset: int = 0, limit: int = 100): + from ghidra.program.model.symbol import SymbolType + + symbol_table = program.getSymbolTable() + matches = [] + skipped = 0 + for symbol in iter_java_items(symbol_table.getDefinedSymbols()): + if symbol.getSymbolType() != SymbolType.CLASS: + continue + namespace = symbol.getObject() + parent = namespace.getParentNamespace() if namespace is not None else None + matches.append( + { + "name": symbol.getName(), + "parent": None if parent is None or parent.isGlobal() else parent.getName(), + } + ) + matches.sort(key=lambda entry: (entry["parent"] or "", entry["name"])) + return matches[offset: offset + limit] + + +def search_functions_by_name(program, query: str, offset: int = 0, limit: int = 100): + lowered = query.lower() + matches = [] + skipped = 0 + for function in iter_java_items(iter_functions(program)): + if lowered not in function.getName().lower(): + continue + if skipped < offset: + skipped += 1 + continue + matches.append(function) + if len(matches) >= limit: + break + return matches + + +def get_functions_by_exact_name(program, name: str): + matches = [] + for function in iter_java_items(iter_functions(program)): + if function.getName() == name: + matches.append(function) + return matches + + def create_function(program, entry_text: str, name: str, body_start: str | None, body_end: str | None): from ghidra.program.model.address import AddressSet from ghidra.program.model.symbol import SourceType @@ -157,6 +325,199 @@ def rename_function(program, entry_text: str, new_name: str): return function +def decompile_function(program, function, timeout_seconds: int = 30) -> str: + from ghidra.app.decompiler import DecompInterface + from ghidra.util.task import ConsoleTaskMonitor + + interface = DecompInterface() + interface.openProgram(program) + try: + result = interface.decompileFunction(function, timeout_seconds, ConsoleTaskMonitor()) + if not result.decompileCompleted(): + error_message = result.getErrorMessage() or "decompilation did not complete" + raise RuntimeError(error_message) + decompiled = result.getDecompiledFunction() + if decompiled is None: + raise RuntimeError("decompiler returned no function text") + return decompiled.getC() + finally: + interface.dispose() + + +def disassemble_function(program, function) -> list[str]: + from ghidra.program.model.listing import CodeUnit + + listing = program.getListing() + lines = [] + for instruction in iter_java_items(listing.getInstructions(function.getBody(), True)): + line = f"{format_address(instruction.getAddress())}: {instruction.toString()}" + if instruction.getFlowType().isCall(): + references = instruction.getReferencesFrom() + if references: + target = references[0].getToAddress() + target_function = program.getFunctionManager().getFunctionAt(target) + if target_function is not None: + line += f" -> {target_function.getName()} @ {format_address(target)}" + else: + line += f" -> {format_address(target)}" + comment = instruction.getComment(CodeUnit.EOL_COMMENT) + if comment: + line += f" ; {comment}" + lines.append(line) + return lines + + +def _reference_dict(reference) -> dict[str, str | int]: + return { + "from": format_address(reference.getFromAddress()), + "to": format_address(reference.getToAddress()), + "type": str(reference.getReferenceType()), + "operand_index": int(reference.getOperandIndex()), + } + + +def get_xrefs_to(program, address_text: str, offset: int = 0, limit: int = 100) -> list[dict[str, str | int]]: + reference_manager = program.getReferenceManager() + target_address = to_address(program, address_text) + results = [] + skipped = 0 + for reference in iter_java_items(reference_manager.getReferencesTo(target_address)): + if skipped < offset: + skipped += 1 + continue + results.append(_reference_dict(reference)) + if len(results) >= limit: + break + return results + + +def get_xrefs_from(program, address_text: str, offset: int = 0, limit: int = 100) -> list[dict[str, str | int]]: + reference_manager = program.getReferenceManager() + source_address = to_address(program, address_text) + results = [] + skipped = 0 + for reference in iter_java_items(reference_manager.getReferencesFrom(source_address)): + if skipped < offset: + skipped += 1 + continue + results.append(_reference_dict(reference)) + if len(results) >= limit: + break + return results + + +def list_strings(program, offset: int = 0, limit: int = 2000, filter_text: str | None = None): + listing = program.getListing() + matches = [] + skipped = 0 + lowered_filter = filter_text.lower() if filter_text else None + for data in iter_java_items(listing.getDefinedData(True)): + if not data.hasStringValue(): + continue + text = str(data.getValue()) + if lowered_filter and lowered_filter not in text.lower(): + continue + if skipped < offset: + skipped += 1 + continue + matches.append( + { + "address": format_address(data.getAddress()), + "length": int(data.getLength()), + "text": text, + } + ) + if len(matches) >= limit: + break + return matches + + +def list_imports(program, offset: int = 0, limit: int = 100): + external_manager = program.getExternalManager() + matches = [] + skipped = 0 + for library_name in external_manager.getExternalLibraryNames(): + for location in iter_java_items(external_manager.getExternalLocations(library_name)): + if skipped < offset: + skipped += 1 + continue + label = location.getLabel() + address = location.getAddress() + matches.append( + { + "library": str(library_name), + "label": str(label) if label is not None else None, + "address": format_address(address) if address is not None else None, + } + ) + if len(matches) >= limit: + return matches + return matches + + +def list_exports(program, offset: int = 0, limit: int = 100): + symbol_table = program.getSymbolTable() + function_manager = program.getFunctionManager() + matches = [] + skipped = 0 + for address in iter_java_items(symbol_table.getExternalEntryPointIterator()): + if skipped < offset: + skipped += 1 + continue + function = function_manager.getFunctionAt(address) + primary_symbol = symbol_table.getPrimarySymbol(address) + matches.append( + { + "address": format_address(address), + "name": function.getName() if function is not None else (primary_symbol.getName() if primary_symbol is not None else None), + "kind": "function" if function is not None else (str(primary_symbol.getSymbolType()) if primary_symbol is not None else "unknown"), + } + ) + if len(matches) >= limit: + break + return matches + + +def list_namespaces(program, offset: int = 0, limit: int = 100): + from ghidra.program.model.symbol import SymbolType + + symbol_table = program.getSymbolTable() + matches = [] + skipped = 0 + for symbol in iter_java_items(symbol_table.getDefinedSymbols()): + symbol_type = symbol.getSymbolType() + if symbol_type not in (SymbolType.NAMESPACE, SymbolType.CLASS, SymbolType.LIBRARY): + continue + namespace = symbol.getObject() + parent = namespace.getParentNamespace() if namespace is not None else None + if parent is not None and parent.isGlobal(): + parent_name = None + else: + parent_name = parent.getName() if parent is not None else None + if skipped < offset: + skipped += 1 + continue + matches.append( + { + "name": symbol.getName(), + "type": str(symbol_type), + "parent": parent_name, + } + ) + if len(matches) >= limit: + break + return matches + + +def run_script_file(script_path: Path, globals_dict: dict): + script_globals = dict(globals_dict) + script_globals.setdefault("__name__", "__main__") + script_globals.setdefault("__file__", str(script_path)) + code = compile(script_path.read_text(encoding="utf-8"), str(script_path), "exec") + exec(code, script_globals, script_globals) + return script_globals + + def set_comment(program, address_text: str, comment: str, comment_type: str): from ghidra.program.model.listing import CodeUnit @@ -171,9 +532,14 @@ def set_comment(program, address_text: str, comment: str, comment_type: str): raise ValueError(f"unsupported comment type: {comment_type}") listing = program.getListing() - code_unit = listing.getCodeUnitAt(to_address(program, address_text)) + target_address = to_address(program, address_text) + code_unit = listing.getCodeUnitAt(target_address) if code_unit is None: - raise ValueError(f"no code unit found at {address_text}") + function = program.getFunctionManager().getFunctionAt(target_address) + if function is not None: + function.setComment(comment) + return + raise ValueError(f"no code unit or function found at {address_text}") code_unit.setComment(comment_types[comment_type], comment)