#!/usr/bin/env python """Reusable MQTT and Zigbee2MQTT diagnostics for this repository.""" from __future__ import annotations import argparse import json import os import subprocess import sys import time from pathlib import Path from typing import Any, Iterable REPO_ROOT = Path(__file__).resolve().parents[1] DEFAULT_ENV_PATH = REPO_ROOT / ".local" / "mqtt-home.env" DEFAULT_BASE_TOPIC = "zigbee2mqtt_2" def ensure_paho(): try: import paho.mqtt.client as mqtt # type: ignore except Exception: subprocess.check_call([sys.executable, "-m", "pip", "install", "--quiet", "paho-mqtt"]) import paho.mqtt.client as mqtt # type: ignore return mqtt def load_env_file(path: Path) -> None: if not path.exists(): raise FileNotFoundError( f"MQTT env file not found at {path}. Create it first (see .github/copilot-instructions.md)." ) for raw_line in path.read_text(encoding="utf-8").splitlines(): line = raw_line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) os.environ.setdefault(key.strip(), value.strip()) def mqtt_config(env_path: Path) -> dict[str, Any]: load_env_file(env_path) missing = [ key for key in ("MQTT_HOST", "MQTT_PORT", "MQTT_USERNAME", "MQTT_PASSWORD") if not os.environ.get(key) ] if missing: raise RuntimeError(f"Missing MQTT settings in {env_path}: {', '.join(missing)}") return { "host": os.environ["MQTT_HOST"], "port": int(os.environ["MQTT_PORT"]), "username": os.environ["MQTT_USERNAME"], "password": os.environ["MQTT_PASSWORD"], "base_topic": os.environ.get("MQTT_BASE_TOPIC", DEFAULT_BASE_TOPIC), } def parse_json_if_possible(payload: str) -> Any: try: return json.loads(payload) except Exception: return payload def format_payload(payload: str) -> str: parsed = parse_json_if_possible(payload) if isinstance(parsed, (dict, list)): return json.dumps(parsed, indent=2, sort_keys=True) return str(parsed) def print_message(topic: str, payload: str) -> None: print(f"TOPIC: {topic}") print(format_payload(payload)) print("---") def normalize_topic(base_topic: str, topic: str) -> str: if topic.startswith(base_topic + "/") or topic.startswith("$"): return topic return f"{base_topic}/{topic}" class CaptureClient: def __init__( self, config: dict[str, Any], subscriptions: Iterable[str], publications: Iterable[dict[str, Any]] | None = None, duration: float = 5.0, ) -> None: self._mqtt = ensure_paho() self.config = config self.subscriptions = list(dict.fromkeys(subscriptions)) self.publications = list(publications or []) self.duration = duration self.messages: list[tuple[str, str]] = [] def run(self) -> list[tuple[str, str]]: client = self._mqtt.Client(self._mqtt.CallbackAPIVersion.VERSION2) client.username_pw_set(self.config["username"], self.config["password"]) def on_connect(client, userdata, flags, reason_code, properties=None): for topic in self.subscriptions: client.subscribe(topic) for publication in self.publications: client.publish( publication["topic"], publication["payload"], qos=publication.get("qos", 0), retain=publication.get("retain", False), ) def on_message(client, userdata, msg): self.messages.append((msg.topic, msg.payload.decode("utf-8", "replace"))) client.on_connect = on_connect client.on_message = on_message client.connect(self.config["host"], self.config["port"], 20) client.loop_start() time.sleep(self.duration) client.loop_stop() client.disconnect() return self.messages def unique_messages(messages: Iterable[tuple[str, str]]) -> list[tuple[str, str]]: seen: set[tuple[str, str]] = set() result: list[tuple[str, str]] = [] for message in messages: if message in seen: continue seen.add(message) result.append(message) return result def capture(args, subscriptions: list[str], publications: list[dict[str, Any]] | None = None) -> list[tuple[str, str]]: config = mqtt_config(Path(args.env_file)) client = CaptureClient(config, subscriptions, publications, duration=args.duration) return unique_messages(client.run()) def cmd_health(args) -> int: config = mqtt_config(Path(args.env_file)) messages = CaptureClient(config, [f"{config['base_topic']}/bridge/state"], duration=args.duration).run() print(f"Broker: {config['host']}:{config['port']}") print(f"Base topic: {config['base_topic']}") print("---") for topic, payload in unique_messages(messages): print_message(topic, payload) return 0 def cmd_device_record(args) -> int: config = mqtt_config(Path(args.env_file)) messages = CaptureClient(config, [f"{config['base_topic']}/bridge/devices"], duration=args.duration).run() devices_payload = next((payload for topic, payload in messages if topic.endswith("/bridge/devices")), None) if not devices_payload: print("No bridge/devices payload received.", file=sys.stderr) return 1 devices = parse_json_if_possible(devices_payload) if not isinstance(devices, list): print("Unexpected bridge/devices payload.", file=sys.stderr) return 1 matched = [ device for device in devices if not args.devices or device.get("friendly_name") in args.devices or device.get("ieee_address") in args.devices ] if not matched: print("No matching devices found.") return 1 print(json.dumps(matched, indent=2, sort_keys=True)) return 0 def cmd_state(args) -> int: config = mqtt_config(Path(args.env_file)) subscriptions: list[str] = [] for device in args.devices: subscriptions.append(normalize_topic(config["base_topic"], device)) if args.include_availability: subscriptions.append(normalize_topic(config["base_topic"], f"{device}/availability")) messages = capture(args, subscriptions) if not messages: print("No state payloads received.") return 1 for topic, payload in messages: print_message(topic, payload) return 0 def build_get_payload(keys: list[str]) -> str: if not keys: return json.dumps({"state": ""}) return json.dumps({key: "" for key in keys}) def cmd_get(args) -> int: config = mqtt_config(Path(args.env_file)) subscriptions = [ normalize_topic(config["base_topic"], args.device), normalize_topic(config["base_topic"], f"{args.device}/availability"), ] publications = [ { "topic": normalize_topic(config["base_topic"], f"{args.device}/get"), "payload": build_get_payload(args.keys), } ] messages = capture(args, subscriptions, publications) if not messages: print("No response received.") return 1 for topic, payload in messages: print_message(topic, payload) return 0 def cmd_configure(args) -> int: config = mqtt_config(Path(args.env_file)) messages = capture( args, [ f"{config['base_topic']}/bridge/response/device/configure", f"{config['base_topic']}/bridge/log", normalize_topic(config["base_topic"], args.device), ], [ { "topic": f"{config['base_topic']}/bridge/request/device/configure", "payload": json.dumps({"id": args.device}), } ], ) if not messages: print("No configure response received.") return 1 for topic, payload in messages: print_message(topic, payload) return 0 def cmd_permit_join(args) -> int: config = mqtt_config(Path(args.env_file)) subscriptions = [f"{config['base_topic']}/bridge/response/permit_join"] if args.watch: subscriptions.extend( [ f"{config['base_topic']}/bridge/log", f"{config['base_topic']}/bridge/event", ] ) messages = capture( args, subscriptions, [ { "topic": f"{config['base_topic']}/bridge/request/permit_join", "payload": json.dumps({"value": True, "time": args.time}), } ], ) if not messages: print("No permit_join response received.") return 1 for topic, payload in messages: print_message(topic, payload) return 0 def cmd_pair_watch(args) -> int: config = mqtt_config(Path(args.env_file)) subscriptions = [ f"{config['base_topic']}/bridge/response/permit_join", f"{config['base_topic']}/bridge/response/device/configure", f"{config['base_topic']}/bridge/response/device/interview", f"{config['base_topic']}/bridge/event", f"{config['base_topic']}/bridge/log", normalize_topic(config["base_topic"], args.device), normalize_topic(config["base_topic"], f"{args.device}/availability"), ] messages = capture( args, subscriptions, [ { "topic": f"{config['base_topic']}/bridge/request/permit_join", "payload": json.dumps({"value": True, "time": args.time}), } ], ) if not messages: print("No pair-watch messages received.") return 1 for topic, payload in messages: print_message(topic, payload) return 0 def cmd_compare(args) -> int: compare_args = argparse.Namespace(**vars(args)) compare_args.devices = [args.device_a, args.device_b] compare_args.include_availability = True print("== DEVICE RECORDS ==") record_rc = cmd_device_record(compare_args) print("== CURRENT STATE ==") state_rc = cmd_state(compare_args) if args.configure: for device in (args.device_a, args.device_b): print(f"== CONFIGURE {device} ==") configure_args = argparse.Namespace(**vars(args)) configure_args.device = device cmd_configure(configure_args) return 0 if record_rc == 0 and state_rc == 0 else 1 def cmd_watch(args) -> int: config = mqtt_config(Path(args.env_file)) subscriptions = [normalize_topic(config["base_topic"], topic) for topic in args.topics] if args.include_bridge_log: subscriptions.append(f"{config['base_topic']}/bridge/log") if args.include_bridge_event: subscriptions.append(f"{config['base_topic']}/bridge/event") messages = capture(args, subscriptions) if not messages: print("No watched messages received.") return 1 for topic, payload in messages: print_message(topic, payload) return 0 def cmd_publish(args) -> int: config = mqtt_config(Path(args.env_file)) topic = normalize_topic(config["base_topic"], args.topic) payload = args.payload if args.json: payload = json.dumps(json.loads(payload)) messages = capture( args, [topic] if args.echo else [], [{"topic": topic, "payload": payload, "retain": args.retain, "qos": args.qos}], ) print(f"Published to {topic}") if args.echo: for message_topic, message_payload in messages: print_message(message_topic, message_payload) return 0 def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="MQTT and Zigbee2MQTT diagnostics for this repository.", epilog=( "Examples:\n" " python scripts\\mqtt_z2m_diag.py health\n" " python scripts\\mqtt_z2m_diag.py device-record nitori_salotto_1\n" " python scripts\\mqtt_z2m_diag.py get nitori_salotto_1 --keys state_left state_center state_right\n" " python scripts\\mqtt_z2m_diag.py configure nitori_salotto_1\n" " python scripts\\mqtt_z2m_diag.py pair-watch nitori_salotto_1 --time 90 --duration 95\n" " python scripts\\mqtt_z2m_diag.py compare nitori_salotto_1 Switch_Cucina_Neo --configure\n" ), formatter_class=argparse.RawTextHelpFormatter, ) parser.add_argument( "--env-file", default=str(DEFAULT_ENV_PATH), help="Path to the local MQTT env file. Default: .local\\mqtt-home.env", ) subparsers = parser.add_subparsers(dest="command", required=True) health = subparsers.add_parser("health", help="Check broker connectivity and Zigbee2MQTT bridge state.") health.add_argument("--duration", type=float, default=3.0) health.set_defaults(func=cmd_health) device_record = subparsers.add_parser( "device-record", help="Read Zigbee2MQTT bridge/devices and print selected device records.", ) device_record.add_argument("devices", nargs="*", help="Friendly names or IEEE addresses.") device_record.add_argument("--duration", type=float, default=4.0) device_record.set_defaults(func=cmd_device_record) state = subparsers.add_parser("state", help="Read retained/live device state topics.") state.add_argument("devices", nargs="+", help="Device topics without the base topic prefix.") state.add_argument("--include-availability", action="store_true", default=True) state.add_argument("--duration", type=float, default=4.0) state.set_defaults(func=cmd_state) get_parser = subparsers.add_parser("get", help="Send a Zigbee2MQTT device get request and wait for replies.") get_parser.add_argument("device", help="Device topic without the base topic prefix.") get_parser.add_argument("--keys", nargs="*", default=[], help="Keys to request. Default requests state.") get_parser.add_argument("--duration", type=float, default=6.0) get_parser.set_defaults(func=cmd_get) configure = subparsers.add_parser("configure", help="Ask Zigbee2MQTT to configure a device.") configure.add_argument("device", help="Device friendly name.") configure.add_argument("--duration", type=float, default=10.0) configure.set_defaults(func=cmd_configure) permit_join = subparsers.add_parser("permit-join", help="Open Zigbee permit-join for a short time.") permit_join.add_argument("--time", type=int, default=90, help="Permit-join duration in seconds.") permit_join.add_argument("--watch", action="store_true", help="Also watch bridge log/event topics.") permit_join.add_argument("--duration", type=float, default=8.0) permit_join.set_defaults(func=cmd_permit_join) pair_watch = subparsers.add_parser( "pair-watch", help="Open permit-join and watch a device for announce/interview/configure/state traffic.", ) pair_watch.add_argument("device", help="Device friendly name.") pair_watch.add_argument("--time", type=int, default=90, help="Permit-join duration in seconds.") pair_watch.add_argument("--duration", type=float, default=95.0) pair_watch.set_defaults(func=cmd_pair_watch) compare = subparsers.add_parser( "compare", help="Compare two devices by bridge record and current state, with optional configure probes.", ) compare.add_argument("device_a") compare.add_argument("device_b") compare.add_argument("--configure", action="store_true", help="Also issue configure requests.") compare.add_argument("--duration", type=float, default=6.0) compare.set_defaults(func=cmd_compare) watch = subparsers.add_parser("watch", help="Watch arbitrary MQTT topics under the configured base topic.") watch.add_argument("topics", nargs="+", help="Topics relative to the base topic unless already absolute.") watch.add_argument("--include-bridge-log", action="store_true") watch.add_argument("--include-bridge-event", action="store_true") watch.add_argument("--duration", type=float, default=10.0) watch.set_defaults(func=cmd_watch) publish = subparsers.add_parser("publish", help="Publish a raw MQTT payload, optionally echoing replies.") publish.add_argument("topic", help="Topic relative to the base topic unless already absolute.") publish.add_argument("payload", help="Payload to publish.") publish.add_argument("--json", action="store_true", help="Validate and normalize payload as JSON before publish.") publish.add_argument("--retain", action="store_true") publish.add_argument("--qos", type=int, default=0, choices=(0, 1, 2)) publish.add_argument("--echo", action="store_true", help="Subscribe to the same topic and print any messages seen.") publish.add_argument("--duration", type=float, default=4.0) publish.set_defaults(func=cmd_publish) return parser def main() -> int: parser = build_parser() args = parser.parse_args() return args.func(args) if __name__ == "__main__": raise SystemExit(main())