from __future__ import annotations import argparse import json from pathlib import Path from .common import ( DEFAULT_INSTALL_DIR, DEFAULT_PROJECT_DIR, DEFAULT_PROJECT_NAME, DEFAULT_PROGRAM_NAME, DEFAULT_FOLDER_PATH, ProjectConfig, create_function, decompile_function, disassemble_function, format_function_summary, function_signature, get_function, get_function_containing, get_functions_by_exact_name, get_xrefs_from, get_xrefs_to, list_classes, list_data_items, list_exports, list_root_files, list_imports, list_namespaces, list_segments, list_strings, open_program, open_project, read_region_bytes, remove_function, rename_function, run_script_file, save_program, search_functions_by_name, set_comment, transaction, ) OUTPUT_SCHEMA_VERSION = "1.0" FUNCTION_SCHEMA = { "type": "object", "required": ["name", "signature", "entry", "body_start", "body_end"], "properties": { "name": {"type": "string"}, "signature": {"type": "string"}, "entry": {"type": "string"}, "body_start": {"type": "string"}, "body_end": {"type": "string"}, }, } REFERENCE_SCHEMA = { "type": "object", "required": ["from", "to", "type", "operand_index"], "properties": { "from": {"type": "string"}, "to": {"type": "string"}, "type": {"type": "string"}, "operand_index": {"type": "integer"}, }, } STATUS_SCHEMA = { "type": "object", "required": ["status", "action"], "properties": { "status": {"type": "string"}, "action": {"type": "string"}, "entry": {"type": "string"}, "name": {"type": "string"}, "address": {"type": "string"}, "type": {"type": "string"}, "text": {"type": "string"}, "script": {"type": "string"}, "plan": {"type": "string"}, }, } STRING_SCHEMA = { "type": "object", "required": ["address", "length", "text"], "properties": { "address": {"type": "string"}, "length": {"type": "integer"}, "text": {"type": "string"}, }, } SEGMENT_SCHEMA = { "type": "object", "required": ["name", "start", "end", "length", "initialized", "read", "write", "execute"], "properties": { "name": {"type": "string"}, "start": {"type": "string"}, "end": {"type": "string"}, "length": {"type": "integer"}, "initialized": {"type": "boolean"}, "read": {"type": "boolean"}, "write": {"type": "boolean"}, "execute": {"type": "boolean"}, }, } DATA_ITEM_SCHEMA = { "type": "object", "required": ["address", "length", "mnemonic", "value"], "properties": { "address": {"type": "string"}, "length": {"type": "integer"}, "mnemonic": {"type": "string"}, "value": {"type": ["string", "null"]}, }, } IMPORT_SCHEMA = { "type": "object", "required": ["library", "label", "address"], "properties": { "library": {"type": "string"}, "label": {"type": ["string", "null"]}, "address": {"type": ["string", "null"]}, }, } EXPORT_SCHEMA = { "type": "object", "required": ["address", "name", "kind"], "properties": { "address": {"type": "string"}, "name": {"type": ["string", "null"]}, "kind": {"type": "string"}, }, } NAMESPACE_SCHEMA = { "type": "object", "required": ["name", "type", "parent"], "properties": { "name": {"type": "string"}, "type": {"type": "string"}, "parent": {"type": ["string", "null"]}, }, } CLASS_SCHEMA = { "type": "object", "required": ["name", "parent"], "properties": { "name": {"type": "string"}, "parent": {"type": ["string", "null"]}, }, } JSON_SCHEMAS = { "project-files": {"type": "array", "items": {"type": "string"}}, "dump-region": { "type": "object", "required": ["start", "end", "preview_bytes", "lines"], "properties": { "start": {"type": "string"}, "end": {"type": "string"}, "preview_bytes": {"type": "string"}, "lines": {"type": "array", "items": {"type": "string"}}, }, }, "create-function": STATUS_SCHEMA, "delete-function": STATUS_SCHEMA, "rename-function": STATUS_SCHEMA, "rename-function-by-address": STATUS_SCHEMA, "set-comment": STATUS_SCHEMA, "set-decompiler-comment": STATUS_SCHEMA, "set-disassembly-comment": STATUS_SCHEMA, "get-function-by-address": FUNCTION_SCHEMA, "get-function-containing": FUNCTION_SCHEMA, "list-functions": {"type": "array", "items": FUNCTION_SCHEMA}, "list-methods": {"type": "array", "items": FUNCTION_SCHEMA}, "search-functions-by-name": {"type": "array", "items": FUNCTION_SCHEMA}, "decompile-function": { "type": "object", "required": ["name", "decompiled"], "properties": {"name": {"type": "string"}, "decompiled": {"type": "string"}}, }, "decompile-function-by-address": { "type": "object", "required": ["address", "decompiled"], "properties": {"address": {"type": "string"}, "decompiled": {"type": "string"}}, }, "disassemble-function": { "type": "object", "required": ["address", "lines"], "properties": {"address": {"type": "string"}, "lines": {"type": "array", "items": {"type": "string"}}}, }, "read-region": { "type": "object", "required": ["start", "end", "bytes"], "properties": {"start": {"type": "string"}, "end": {"type": "string"}, "bytes": {"type": "string"}}, }, "get-xrefs-to": {"type": "array", "items": REFERENCE_SCHEMA}, "get-xrefs-from": {"type": "array", "items": REFERENCE_SCHEMA}, "get-function-xrefs": {"type": "array", "items": REFERENCE_SCHEMA}, "list-strings": {"type": "array", "items": STRING_SCHEMA}, "list-imports": {"type": "array", "items": IMPORT_SCHEMA}, "list-exports": {"type": "array", "items": EXPORT_SCHEMA}, "list-namespaces": {"type": "array", "items": NAMESPACE_SCHEMA}, "list-segments": {"type": "array", "items": SEGMENT_SCHEMA}, "list-data-items": {"type": "array", "items": DATA_ITEM_SCHEMA}, "list-classes": {"type": "array", "items": CLASS_SCHEMA}, "run-script": STATUS_SCHEMA, "apply-plan": STATUS_SCHEMA, "annotate-usecode": { "type": "object", "required": ["annotated", "skipped", "files"], "properties": { "annotated": {"type": "integer"}, "skipped": {"type": "integer"}, "files": {"type": "integer"}, }, }, } def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="PyGhidra helpers for the Crusader project." ) parser.add_argument( "--install-dir", default=str(DEFAULT_INSTALL_DIR), help="Ghidra install directory.", ) parser.add_argument( "--project-dir", default=str(DEFAULT_PROJECT_DIR), help="Directory containing the Ghidra project.", ) parser.add_argument( "--project-name", default=DEFAULT_PROJECT_NAME, help="Ghidra project name.", ) parser.add_argument( "--program-name", default=DEFAULT_PROGRAM_NAME, help="Program name inside the project.", ) parser.add_argument( "--folder-path", default=DEFAULT_FOLDER_PATH, help="Project folder path containing the program.", ) parser.add_argument( "--restore-project", action="store_true", help="Restore project tool state while opening the project.", ) parser.add_argument( "--format", choices=["text", "json"], default="text", help="Output format.", ) subparsers = parser.add_subparsers(dest="command", required=True) subparsers.add_parser( "project-files", aliases=["project_files"], help="List root-level files in the Ghidra project.", ) dump_parser = subparsers.add_parser( "dump-region", aliases=["dump_region"], help="Dump instructions and resolved call targets for an address range.", ) dump_parser.add_argument("--start", required=True, help="Start address.") dump_parser.add_argument("--end", required=True, help="Inclusive end address.") create_parser = subparsers.add_parser( "create-function", aliases=["create_function"], help="Create a function at an address with an optional explicit body range.", ) create_parser.add_argument("--entry", required=True, help="Function entry address.") create_parser.add_argument("--name", required=True, help="New function name.") create_parser.add_argument("--body-start", help="Function body start address.") create_parser.add_argument("--body-end", help="Function body end address.") create_parser.add_argument( "--plate-comment", help="Optional plate comment to set at the entry address after creation.", ) delete_parser = subparsers.add_parser( "delete-function", aliases=["delete_function"], help="Delete a function at an address.", ) delete_parser.add_argument("--entry", required=True, help="Function entry address.") rename_parser = subparsers.add_parser( "rename-function", aliases=["rename_function"], help="Rename an existing function by entry address.", ) rename_parser.add_argument("--entry", required=True, help="Function entry address.") rename_parser.add_argument("--name", required=True, help="New function name.") rename_by_address_parser = subparsers.add_parser( "rename-function-by-address", aliases=["rename_function_by_address"], help="Rename an existing function by entry address (MCP-style alias).", ) rename_by_address_parser.add_argument( "--entry", "--function-address", dest="entry", required=True, help="Function entry address." ) rename_by_address_parser.add_argument("--name", "--new-name", dest="name", required=True, help="New function name.") comment_parser = subparsers.add_parser( "set-comment", aliases=["set_comment"], help="Set a code-unit comment by address.", ) comment_parser.add_argument("--address", required=True, help="Comment target address.") comment_parser.add_argument("--text", required=True, help="Comment text.") comment_parser.add_argument( "--type", choices=["pre", "plate", "eol", "repeatable", "post"], default="plate", help="Comment type.", ) decompiler_comment_parser = subparsers.add_parser( "set-decompiler-comment", aliases=["set_decompiler_comment"], help="Set a decompiler-visible pre-comment by address.", ) decompiler_comment_parser.add_argument("--address", required=True, help="Comment target address.") decompiler_comment_parser.add_argument("--text", "--comment", dest="text", required=True, help="Comment text.") disassembly_comment_parser = subparsers.add_parser( "set-disassembly-comment", aliases=["set_disassembly_comment"], help="Set a disassembly EOL comment by address.", ) disassembly_comment_parser.add_argument("--address", required=True, help="Comment target address.") disassembly_comment_parser.add_argument("--text", "--comment", dest="text", required=True, help="Comment text.") get_function_parser = subparsers.add_parser( "get-function-by-address", aliases=["get_function_by_address"], help="Show function metadata for an exact entry address.", ) get_function_parser.add_argument("--address", required=True, help="Function entry address.") get_function_containing_parser = subparsers.add_parser( "get-function-containing", aliases=["get_function_containing"], help="Show function metadata for the function containing an address.", ) get_function_containing_parser.add_argument( "--address", required=True, help="Address inside the desired function body." ) list_functions_parser = subparsers.add_parser( "list-functions", aliases=["list_functions"], help="List all defined functions.", ) list_functions_parser.add_argument("--offset", type=int, default=0, help="Pagination offset.") list_functions_parser.add_argument("--limit", type=int, default=100, help="Maximum functions to print.") list_methods_parser = subparsers.add_parser( "list-methods", aliases=["list_methods"], help="List defined function names and entries with signatures/body ranges in JSON mode.", ) list_methods_parser.add_argument("--offset", type=int, default=0, help="Pagination offset.") list_methods_parser.add_argument("--limit", type=int, default=100, help="Maximum methods to print.") list_segments_parser = subparsers.add_parser( "list-segments", aliases=["list_segments"], help="List memory segments or blocks.", ) list_segments_parser.add_argument("--offset", type=int, default=0, help="Pagination offset.") list_segments_parser.add_argument("--limit", type=int, default=100, help="Maximum segments to print.") list_data_items_parser = subparsers.add_parser( "list-data-items", aliases=["list_data_items"], help="List defined data items.", ) list_data_items_parser.add_argument("--offset", type=int, default=0, help="Pagination offset.") list_data_items_parser.add_argument("--limit", type=int, default=100, help="Maximum data items to print.") list_classes_parser = subparsers.add_parser( "list-classes", aliases=["list_classes"], help="List class namespaces.", ) list_classes_parser.add_argument("--offset", type=int, default=0, help="Pagination offset.") list_classes_parser.add_argument("--limit", type=int, default=100, help="Maximum classes to print.") list_strings_parser = subparsers.add_parser( "list-strings", aliases=["list_strings"], help="List defined strings in the program.", ) list_strings_parser.add_argument("--offset", type=int, default=0, help="Pagination offset.") list_strings_parser.add_argument("--limit", type=int, default=2000, help="Maximum strings to print.") list_strings_parser.add_argument("--filter", "--filter-text", dest="filter", help="Optional substring filter.") list_imports_parser = subparsers.add_parser( "list-imports", aliases=["list_imports"], help="List imported external symbols.", ) list_imports_parser.add_argument("--offset", type=int, default=0, help="Pagination offset.") list_imports_parser.add_argument("--limit", type=int, default=100, help="Maximum imports to print.") list_exports_parser = subparsers.add_parser( "list-exports", aliases=["list_exports"], help="List exported entry points and symbols.", ) list_exports_parser.add_argument("--offset", type=int, default=0, help="Pagination offset.") list_exports_parser.add_argument("--limit", type=int, default=100, help="Maximum exports to print.") list_namespaces_parser = subparsers.add_parser( "list-namespaces", aliases=["list_namespaces"], help="List non-global namespaces, classes, and libraries.", ) list_namespaces_parser.add_argument("--offset", type=int, default=0, help="Pagination offset.") list_namespaces_parser.add_argument("--limit", type=int, default=100, help="Maximum namespaces to print.") search_functions_parser = subparsers.add_parser( "search-functions-by-name", aliases=["search_functions_by_name"], help="List functions whose names contain a substring.", ) search_functions_parser.add_argument("--query", required=True, help="Substring to match.") search_functions_parser.add_argument("--offset", type=int, default=0, help="Pagination offset.") search_functions_parser.add_argument("--limit", type=int, default=100, help="Maximum functions to print.") decompile_name_parser = subparsers.add_parser( "decompile-function", aliases=["decompile_function"], help="Decompile an exact-named function.", ) decompile_name_parser.add_argument("--name", required=True, help="Exact function name.") decompile_name_parser.add_argument("--timeout", type=int, default=30, help="Decompile timeout in seconds.") decompile_address_parser = subparsers.add_parser( "decompile-function-by-address", aliases=["decompile_function_by_address"], help="Decompile a function by entry address.", ) decompile_address_parser.add_argument("--address", required=True, help="Function entry address.") decompile_address_parser.add_argument("--timeout", type=int, default=30, help="Decompile timeout in seconds.") disassemble_parser = subparsers.add_parser( "disassemble-function", aliases=["disassemble_function"], help="Disassemble a function body by entry address.", ) disassemble_parser.add_argument("--address", required=True, help="Function entry address.") read_region_parser = subparsers.add_parser( "read-region", aliases=["read_region"], help="Dump raw bytes for an inclusive address range.", ) read_region_parser.add_argument("--start", required=True, help="Start address.") read_region_parser.add_argument("--end", required=True, help="Inclusive end address.") run_script_parser = subparsers.add_parser( "run-script", aliases=["run_script"], help="Execute a Python file with project/program context to avoid interactive shell quoting issues.", ) run_script_parser.add_argument("--script", required=True, help="Path to the Python script file.") run_script_parser.add_argument( "--read-only", action="store_true", help="Open the program read-only for script execution.", ) xrefs_to_parser = subparsers.add_parser( "get-xrefs-to", aliases=["get_xrefs_to"], help="List references to an address.", ) xrefs_to_parser.add_argument("--address", required=True, help="Target address.") xrefs_to_parser.add_argument("--offset", type=int, default=0, help="Pagination offset.") xrefs_to_parser.add_argument("--limit", type=int, default=100, help="Maximum references to print.") xrefs_from_parser = subparsers.add_parser( "get-xrefs-from", aliases=["get_xrefs_from"], help="List references from an address.", ) xrefs_from_parser.add_argument("--address", required=True, help="Source address.") xrefs_from_parser.add_argument("--offset", type=int, default=0, help="Pagination offset.") xrefs_from_parser.add_argument("--limit", type=int, default=100, help="Maximum references to print.") function_xrefs_parser = subparsers.add_parser( "get-function-xrefs", aliases=["get_function_xrefs"], help="List references to a function entry by exact function name.", ) function_xrefs_parser.add_argument("--name", required=True, help="Exact function name.") function_xrefs_parser.add_argument("--offset", type=int, default=0, help="Pagination offset.") function_xrefs_parser.add_argument("--limit", type=int, default=100, help="Maximum references to print.") plan_parser = subparsers.add_parser( "apply-plan", aliases=["apply_plan"], help="Apply a JSON edit plan containing function and comment operations.", ) plan_parser.add_argument("--plan", required=True, help="Path to the JSON plan file.") plan_parser.add_argument( "--dry-run", action="store_true", help="Validate and print the plan without modifying the project.", ) annotate_usecode_parser = subparsers.add_parser( "annotate-usecode", aliases=["annotate_usecode"], help="Import USECODE IR JSON annotation hints as Ghidra comments on compiled anchors.", ) annotate_usecode_parser.add_argument( "--ir-file", dest="ir_files", metavar="IR_FILE", action="append", required=True, help="Path to an IR JSON file produced by poc_crusader_usecode_parser.py. May be given multiple times.", ) annotate_usecode_parser.add_argument( "--comment-type", choices=["pre", "plate", "eol", "repeatable", "post"], default="plate", help="Ghidra comment type to use for anchor annotations (default: plate).", ) annotate_usecode_parser.add_argument( "--dry-run", action="store_true", help="Print what would be annotated without modifying the project.", ) return parser def build_config(args: argparse.Namespace) -> ProjectConfig: return ProjectConfig( install_dir=Path(args.install_dir), project_dir=Path(args.project_dir), project_name=args.project_name, program_name=args.program_name, folder_path=args.folder_path, restore_project=args.restore_project, ) def _canonical_command_name(command_name: str) -> str: return command_name.replace("_", "-") def _emit(args: argparse.Namespace, payload, text: str | None = None) -> int: if args.format == "json": command_name = _canonical_command_name(args.command) response = { "schema_version": OUTPUT_SCHEMA_VERSION, "command": command_name, "ok": True, "schema": JSON_SCHEMAS.get(command_name, {"type": "object"}), "data": payload, } print(json.dumps(response, indent=2, sort_keys=True)) return 0 if text is not None: print(text) return 0 if isinstance(payload, list): for item in payload: print(item) return 0 if isinstance(payload, dict): print(json.dumps(payload, indent=2, sort_keys=True)) return 0 print(payload) return 0 def _function_to_dict(function) -> dict[str, str]: return { "name": function.getName(), "signature": function_signature(function), "entry": str(function.getEntryPoint()), "body_start": str(function.getBody().getMinAddress()), "body_end": str(function.getBody().getMaxAddress()), } def _function_line(function) -> str: return f"{function.getName()} @ {function.getEntryPoint()}" def _text_or_empty(lines: list[str], empty_message: str) -> str: return "\n".join(lines) if lines else empty_message def command_project_files(config: ProjectConfig, _args: argparse.Namespace) -> int: project = open_project(config) try: names = list_root_files(project) finally: project.close() return _emit(_args, names, "\n".join(names)) def command_dump_region(config: ProjectConfig, args: argparse.Namespace) -> int: from .common import to_address with open_program(config, read_only=True) as (_project, program): listing = program.getListing() memory = program.getMemory() start = to_address(program, args.start) end = to_address(program, args.end) size = end.subtract(start) + 1 buf = bytearray(size) memory.getBytes(start, buf) print(f"REGION {args.start}..{args.end} BYTES {bytes(buf[:32]).hex()}") instruction = listing.getInstructionAt(start) while instruction is not None and instruction.getAddress().compareTo(end) <= 0: line = f"{instruction.getAddress()}: {instruction.toString()}" if instruction.getFlowType().isCall(): references = instruction.getReferencesFrom() if references: target = references[0].getToAddress() function = program.getFunctionManager().getFunctionAt(target) if function is not None: line += f" -> {function.getName()} @ {target}" else: line += f" -> {target}" print(line) instruction = instruction.getNext() return 0 def command_create_function(config: ProjectConfig, args: argparse.Namespace) -> int: with open_program(config, read_only=False) as (project, program): with transaction(program, f"Create function {args.entry}"): function = create_function(program, args.entry, args.name, args.body_start, args.body_end) if args.plate_comment: set_comment(program, args.entry, args.plate_comment, "plate") save_program(project, program) return _emit( args, {"status": "ok", "entry": args.entry, "name": function.getName(), "action": "create-function"}, f"created {function.getName()} at {args.entry}", ) def command_delete_function(config: ProjectConfig, args: argparse.Namespace) -> int: with open_program(config, read_only=False) as (project, program): with transaction(program, f"Delete function {args.entry}"): removed = remove_function(program, args.entry) if not removed: raise RuntimeError(f"no function removed at {args.entry}") save_program(project, program) return _emit( args, {"status": "ok", "entry": args.entry, "action": "delete-function"}, f"deleted function at {args.entry}", ) def command_rename_function(config: ProjectConfig, args: argparse.Namespace) -> int: with open_program(config, read_only=False) as (project, program): with transaction(program, f"Rename function {args.entry}"): function = rename_function(program, args.entry, args.name) save_program(project, program) return _emit( args, {"status": "ok", "entry": args.entry, "name": function.getName(), "action": "rename-function"}, f"renamed {args.entry} to {function.getName()}", ) def _set_comment_with_type(config: ProjectConfig, args: argparse.Namespace, address: str, text: str, comment_type: str) -> int: with open_program(config, read_only=False) as (project, program): with transaction(program, f"Set comment {address}"): set_comment(program, address, text, comment_type) save_program(project, program) return _emit( args, {"status": "ok", "address": address, "type": comment_type, "text": text, "action": "set-comment"}, f"set {comment_type} comment at {address}", ) def command_set_comment(config: ProjectConfig, args: argparse.Namespace) -> int: return _set_comment_with_type(config, args, args.address, args.text, args.type) def command_set_decompiler_comment(config: ProjectConfig, args: argparse.Namespace) -> int: return _set_comment_with_type(config, args, args.address, args.text, "pre") def command_set_disassembly_comment(config: ProjectConfig, args: argparse.Namespace) -> int: return _set_comment_with_type(config, args, args.address, args.text, "eol") def _require_function_by_address(program, address_text: str): function = get_function(program, address_text) if function is None: raise RuntimeError(f"no function found at {address_text}") return function def _require_single_function_by_name(program, name: str): matches = get_functions_by_exact_name(program, name) if not matches: raise RuntimeError(f"no function found with exact name '{name}'") if len(matches) > 1: raise RuntimeError( f"multiple functions match exact name '{name}'; use search-functions-by-name or an address-specific command" ) return matches[0] def _print_function_lines(functions) -> None: for function in functions: print(f"{function.getName()} @ {function.getEntryPoint()}") def _print_reference_lines(references: list[dict[str, str | int]]) -> None: for reference in references: print( f"{reference['from']} -> {reference['to']} [{reference['type']}] operand={reference['operand_index']}" ) def command_get_function_by_address(config: ProjectConfig, args: argparse.Namespace) -> int: with open_program(config, read_only=True) as (_project, program): function = _require_function_by_address(program, args.address) payload = _function_to_dict(function) text = format_function_summary(function) return _emit(args, payload, text) def command_get_function_containing(config: ProjectConfig, args: argparse.Namespace) -> int: with open_program(config, read_only=True) as (_project, program): function = get_function_containing(program, args.address) if function is None: raise RuntimeError(f"no containing function found at {args.address}") payload = _function_to_dict(function) text = format_function_summary(function) return _emit(args, payload, text) def command_list_functions(config: ProjectConfig, args: argparse.Namespace) -> int: with open_program(config, read_only=True) as (_project, program): functions = search_functions_by_name(program, "", offset=args.offset, limit=args.limit) payload = [_function_to_dict(function) for function in functions] text = _text_or_empty([_function_line(function) for function in functions], "no functions found") return _emit(args, payload, text) def command_list_methods(config: ProjectConfig, args: argparse.Namespace) -> int: with open_program(config, read_only=True) as (_project, program): functions = search_functions_by_name(program, "", offset=args.offset, limit=args.limit) payload = [_function_to_dict(function) for function in functions] text = _text_or_empty([_function_line(function) for function in functions], "no methods found") return _emit(args, payload, text) def command_search_functions_by_name(config: ProjectConfig, args: argparse.Namespace) -> int: with open_program(config, read_only=True) as (_project, program): functions = search_functions_by_name(program, args.query, offset=args.offset, limit=args.limit) payload = [_function_to_dict(function) for function in functions] text = _text_or_empty([_function_line(function) for function in functions], "no matching functions found") return _emit(args, payload, text) def command_list_strings(config: ProjectConfig, args: argparse.Namespace) -> int: with open_program(config, read_only=True) as (_project, program): strings = list_strings(program, offset=args.offset, limit=args.limit, filter_text=args.filter) text = _text_or_empty([f"{entry['address']}: {entry['text']}" for entry in strings], "no strings found") return _emit(args, strings, text) def command_list_segments(config: ProjectConfig, args: argparse.Namespace) -> int: with open_program(config, read_only=True) as (_project, program): segments = list_segments(program, offset=args.offset, limit=args.limit) text = _text_or_empty( [ f"{entry['name']} {entry['start']} - {entry['end']} len={entry['length']}" f" r={entry['read']} w={entry['write']} x={entry['execute']} init={entry['initialized']}" for entry in segments ], "no segments found", ) return _emit(args, segments, text) def command_list_data_items(config: ProjectConfig, args: argparse.Namespace) -> int: with open_program(config, read_only=True) as (_project, program): items = list_data_items(program, offset=args.offset, limit=args.limit) text = _text_or_empty( [ f"{entry['address']} {entry['mnemonic']} len={entry['length']}" + (f" value={entry['value']}" if entry['value'] is not None else "") for entry in items ], "no data items found", ) return _emit(args, items, text) def command_list_classes(config: ProjectConfig, args: argparse.Namespace) -> int: with open_program(config, read_only=True) as (_project, program): classes = list_classes(program, offset=args.offset, limit=args.limit) text = _text_or_empty( [ f"{entry['name']}" + (f" parent={entry['parent']}" if entry['parent'] else "") for entry in classes ], "no classes found", ) return _emit(args, classes, text) def command_list_imports(config: ProjectConfig, args: argparse.Namespace) -> int: with open_program(config, read_only=True) as (_project, program): imports = list_imports(program, offset=args.offset, limit=args.limit) text = _text_or_empty([ f"{entry['library']}!{entry['label'] or ''} @ {entry['address'] or ''}" for entry in imports ], "no imports found") return _emit(args, imports, text) def command_list_exports(config: ProjectConfig, args: argparse.Namespace) -> int: with open_program(config, read_only=True) as (_project, program): exports = list_exports(program, offset=args.offset, limit=args.limit) text = _text_or_empty([ f"{entry['name'] or ''} @ {entry['address']} [{entry['kind']}]" for entry in exports ], "no exports found") return _emit(args, exports, text) def command_list_namespaces(config: ProjectConfig, args: argparse.Namespace) -> int: with open_program(config, read_only=True) as (_project, program): namespaces = list_namespaces(program, offset=args.offset, limit=args.limit) text = _text_or_empty([ f"{entry['name']} [{entry['type']}]" + (f" parent={entry['parent']}" if entry['parent'] else "") for entry in namespaces ], "no namespaces found") return _emit(args, namespaces, text) def command_decompile_function_by_address(config: ProjectConfig, args: argparse.Namespace) -> int: with open_program(config, read_only=True) as (_project, program): function = _require_function_by_address(program, args.address) output = decompile_function(program, function, args.timeout) return _emit(args, {"address": args.address, "decompiled": output}, output) def command_decompile_function(config: ProjectConfig, args: argparse.Namespace) -> int: with open_program(config, read_only=True) as (_project, program): function = _require_single_function_by_name(program, args.name) output = decompile_function(program, function, args.timeout) return _emit(args, {"name": args.name, "decompiled": output}, output) def command_disassemble_function(config: ProjectConfig, args: argparse.Namespace) -> int: with open_program(config, read_only=True) as (_project, program): function = _require_function_by_address(program, args.address) lines = disassemble_function(program, function) if not lines: code_unit = program.getListing().getCodeUnitAt(function.getEntryPoint()) lines = [ f"no instructions found in body {function.getBody().getMinAddress()} - {function.getBody().getMaxAddress()}; entry code unit = {code_unit}" ] return _emit(args, {"address": args.address, "lines": lines}, "\n".join(lines)) def command_read_region(config: ProjectConfig, args: argparse.Namespace) -> int: with open_program(config, read_only=True) as (_project, program): data = read_region_bytes(program, args.start, args.end) text = f"REGION {args.start}..{args.end} BYTES {data.hex()}" return _emit(args, {"start": args.start, "end": args.end, "bytes": data.hex()}, text) def command_get_xrefs_to(config: ProjectConfig, args: argparse.Namespace) -> int: with open_program(config, read_only=True) as (_project, program): references = get_xrefs_to(program, args.address, offset=args.offset, limit=args.limit) text = _text_or_empty([ f"{reference['from']} -> {reference['to']} [{reference['type']}] operand={reference['operand_index']}" for reference in references ], "no xrefs found") return _emit(args, references, text) def command_get_xrefs_from(config: ProjectConfig, args: argparse.Namespace) -> int: with open_program(config, read_only=True) as (_project, program): references = get_xrefs_from(program, args.address, offset=args.offset, limit=args.limit) text = _text_or_empty([ f"{reference['from']} -> {reference['to']} [{reference['type']}] operand={reference['operand_index']}" for reference in references ], "no xrefs found") return _emit(args, references, text) def command_get_function_xrefs(config: ProjectConfig, args: argparse.Namespace) -> int: with open_program(config, read_only=True) as (_project, program): function = _require_single_function_by_name(program, args.name) references = get_xrefs_to( program, str(function.getEntryPoint()), offset=args.offset, limit=args.limit, ) text = _text_or_empty([ f"{reference['from']} -> {reference['to']} [{reference['type']}] operand={reference['operand_index']}" for reference in references ], "no xrefs found") return _emit(args, references, text) def command_run_script(config: ProjectConfig, args: argparse.Namespace) -> int: script_path = Path(args.script).resolve() if not script_path.is_file(): raise RuntimeError(f"script file not found: {script_path}") with open_program(config, read_only=args.read_only) as (project, program): script_globals = { "config": config, "project": project, "program": program, "helpers": { "create_function": create_function, "decompile_function": decompile_function, "disassemble_function": disassemble_function, "format_function_summary": format_function_summary, "get_function": get_function, "get_function_containing": get_function_containing, "get_xrefs_from": get_xrefs_from, "get_xrefs_to": get_xrefs_to, "read_region_bytes": read_region_bytes, "rename_function": rename_function, "set_comment": set_comment, }, } run_script_file(script_path, script_globals) if not args.read_only: save_program(project, program) return _emit(args, {"status": "ok", "script": str(script_path)}, f"ran script {script_path}") def _load_plan(plan_path: str) -> dict: with open(plan_path, "r", encoding="utf-8") as handle: return json.load(handle) def _print_plan(plan: dict) -> None: print(json.dumps(plan, indent=2, sort_keys=True)) def command_annotate_usecode(config: ProjectConfig, args: argparse.Namespace) -> int: """Import USECODE IR JSON files and set Ghidra comments on compiled anchor addresses.""" import json as _json # Collect all annotation records from every IR file. pending: list[tuple[str, str]] = [] # (address, comment_text) for ir_path_str in args.ir_files: ir_path = Path(ir_path_str).resolve() if not ir_path.is_file(): raise RuntimeError(f"IR file not found: {ir_path}") ir = _json.loads(ir_path.read_text(encoding="utf-8")) cls = ir.get("class", {}) evt = ir.get("event", {}) hints = ir.get("annotation_hints", {}) class_name = cls.get("class_name", "?") slot = evt.get("slot", 0) event_name_hint = evt.get("event_name_hint") or f"slot_0x{slot:02X}" body_start = evt.get("derived_body_start", 0) body_end = evt.get("derived_body_end", 0) raw_word = evt.get("raw_event_entry_word", 0) for anchor in hints.get("compiled_anchors", []): address = anchor.get("address", "") role = anchor.get("role", "") if not address: continue comment = ( f"POC USECODE: {class_name} slot=0x{slot:02X} [{event_name_hint}]" f" body=0x{body_start:04X}..0x{body_end:04X}" f" raw_word=0x{raw_word:04X} | {role}" ) pending.append((address, comment)) if args.dry_run: for address, comment in pending: print(f"DRY-RUN {address} {comment}") return _emit( args, {"annotated": 0, "skipped": len(pending), "files": len(args.ir_files)}, f"dry-run: {len(pending)} annotations would be written from {len(args.ir_files)} file(s)", ) annotated = 0 skipped = 0 with open_program(config, read_only=False) as (project, program): with transaction(program, "USECODE annotation import"): for address, comment in pending: try: set_comment(program, address, comment, args.comment_type) annotated += 1 except Exception as exc: print(f"SKIP {address}: {exc}") skipped += 1 save_program(project, program) return _emit( args, {"annotated": annotated, "skipped": skipped, "files": len(args.ir_files)}, f"annotated {annotated} anchors ({skipped} skipped) from {len(args.ir_files)} file(s)", ) def command_apply_plan(config: ProjectConfig, args: argparse.Namespace) -> int: plan = _load_plan(args.plan) if args.dry_run: if args.format == "json": _print_plan(plan) return 0 _print_plan(plan) return 0 transaction_name = plan.get("transaction", f"Apply plan {args.plan}") with open_program(config, read_only=False) as (project, program): with transaction(program, transaction_name): for entry in plan.get("remove_functions", []): removed = remove_function(program, entry) if not removed: raise RuntimeError(f"no function removed at {entry}") for entry in plan.get("rename_functions", []): rename_function(program, entry["entry"], entry["name"]) for entry in plan.get("create_functions", []): create_function( program, entry["entry"], entry["name"], entry.get("body_start"), entry.get("body_end"), ) if entry.get("comment"): set_comment( program, entry["entry"], entry["comment"], entry.get("comment_type", "plate"), ) for entry in plan.get("comments", []): set_comment( program, entry["address"], entry["text"], entry.get("type", "plate"), ) for entry in plan.get("assert_functions", []): if get_function(program, entry) is None: raise RuntimeError(f"expected function missing at {entry}") save_program(project, program) return _emit(args, {"status": "ok", "plan": args.plan}, f"applied plan {args.plan}") def main(argv: list[str] | None = None) -> int: parser = build_parser() args = parser.parse_args(argv) args.command = _canonical_command_name(args.command) config = build_config(args) command_map = { "dump-region": command_dump_region, "project-files": command_project_files, "create-function": command_create_function, "delete-function": command_delete_function, "rename-function": command_rename_function, "rename-function-by-address": command_rename_function, "set-comment": command_set_comment, "set-decompiler-comment": command_set_decompiler_comment, "set-disassembly-comment": command_set_disassembly_comment, "get-function-by-address": command_get_function_by_address, "get-function-containing": command_get_function_containing, "list-functions": command_list_functions, "list-methods": command_list_methods, "list-segments": command_list_segments, "list-data-items": command_list_data_items, "list-classes": command_list_classes, "list-strings": command_list_strings, "list-imports": command_list_imports, "list-exports": command_list_exports, "list-namespaces": command_list_namespaces, "search-functions-by-name": command_search_functions_by_name, "decompile-function": command_decompile_function, "decompile-function-by-address": command_decompile_function_by_address, "disassemble-function": command_disassemble_function, "read-region": command_read_region, "get-xrefs-to": command_get_xrefs_to, "get-xrefs-from": command_get_xrefs_from, "get-function-xrefs": command_get_function_xrefs, "run-script": command_run_script, "apply-plan": command_apply_plan, "annotate-usecode": command_annotate_usecode, } return command_map[args.command](config, args) if __name__ == "__main__": raise SystemExit(main())