HomeNetwork/scripts/mqtt_z2m_diag.py

460 lines
17 KiB
Python
Raw Normal View History

#!/usr/bin/env python
"""Reusable MQTT and Zigbee2MQTT diagnostics for this repository."""
from __future__ import annotations
import argparse
import json
import os
import subprocess
import sys
import time
from pathlib import Path
from typing import Any, Iterable
REPO_ROOT = Path(__file__).resolve().parents[1]
DEFAULT_ENV_PATH = REPO_ROOT / ".local" / "mqtt-home.env"
DEFAULT_BASE_TOPIC = "zigbee2mqtt_2"
def ensure_paho():
try:
import paho.mqtt.client as mqtt # type: ignore
except Exception:
subprocess.check_call([sys.executable, "-m", "pip", "install", "--quiet", "paho-mqtt"])
import paho.mqtt.client as mqtt # type: ignore
return mqtt
def load_env_file(path: Path) -> None:
if not path.exists():
raise FileNotFoundError(
f"MQTT env file not found at {path}. Create it first (see .github/copilot-instructions.md)."
)
for raw_line in path.read_text(encoding="utf-8").splitlines():
line = raw_line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, value = line.split("=", 1)
os.environ.setdefault(key.strip(), value.strip())
def mqtt_config(env_path: Path) -> dict[str, Any]:
load_env_file(env_path)
missing = [
key
for key in ("MQTT_HOST", "MQTT_PORT", "MQTT_USERNAME", "MQTT_PASSWORD")
if not os.environ.get(key)
]
if missing:
raise RuntimeError(f"Missing MQTT settings in {env_path}: {', '.join(missing)}")
return {
"host": os.environ["MQTT_HOST"],
"port": int(os.environ["MQTT_PORT"]),
"username": os.environ["MQTT_USERNAME"],
"password": os.environ["MQTT_PASSWORD"],
"base_topic": os.environ.get("MQTT_BASE_TOPIC", DEFAULT_BASE_TOPIC),
}
def parse_json_if_possible(payload: str) -> Any:
try:
return json.loads(payload)
except Exception:
return payload
def format_payload(payload: str) -> str:
parsed = parse_json_if_possible(payload)
if isinstance(parsed, (dict, list)):
return json.dumps(parsed, indent=2, sort_keys=True)
return str(parsed)
def print_message(topic: str, payload: str) -> None:
print(f"TOPIC: {topic}")
print(format_payload(payload))
print("---")
def normalize_topic(base_topic: str, topic: str) -> str:
if topic.startswith(base_topic + "/") or topic.startswith("$"):
return topic
return f"{base_topic}/{topic}"
class CaptureClient:
def __init__(
self,
config: dict[str, Any],
subscriptions: Iterable[str],
publications: Iterable[dict[str, Any]] | None = None,
duration: float = 5.0,
) -> None:
self._mqtt = ensure_paho()
self.config = config
self.subscriptions = list(dict.fromkeys(subscriptions))
self.publications = list(publications or [])
self.duration = duration
self.messages: list[tuple[str, str]] = []
def run(self) -> list[tuple[str, str]]:
client = self._mqtt.Client(self._mqtt.CallbackAPIVersion.VERSION2)
client.username_pw_set(self.config["username"], self.config["password"])
def on_connect(client, userdata, flags, reason_code, properties=None):
for topic in self.subscriptions:
client.subscribe(topic)
for publication in self.publications:
client.publish(
publication["topic"],
publication["payload"],
qos=publication.get("qos", 0),
retain=publication.get("retain", False),
)
def on_message(client, userdata, msg):
self.messages.append((msg.topic, msg.payload.decode("utf-8", "replace")))
client.on_connect = on_connect
client.on_message = on_message
client.connect(self.config["host"], self.config["port"], 20)
client.loop_start()
time.sleep(self.duration)
client.loop_stop()
client.disconnect()
return self.messages
def unique_messages(messages: Iterable[tuple[str, str]]) -> list[tuple[str, str]]:
seen: set[tuple[str, str]] = set()
result: list[tuple[str, str]] = []
for message in messages:
if message in seen:
continue
seen.add(message)
result.append(message)
return result
def capture(args, subscriptions: list[str], publications: list[dict[str, Any]] | None = None) -> list[tuple[str, str]]:
config = mqtt_config(Path(args.env_file))
client = CaptureClient(config, subscriptions, publications, duration=args.duration)
return unique_messages(client.run())
def cmd_health(args) -> int:
config = mqtt_config(Path(args.env_file))
messages = CaptureClient(config, [f"{config['base_topic']}/bridge/state"], duration=args.duration).run()
print(f"Broker: {config['host']}:{config['port']}")
print(f"Base topic: {config['base_topic']}")
print("---")
for topic, payload in unique_messages(messages):
print_message(topic, payload)
return 0
def cmd_device_record(args) -> int:
config = mqtt_config(Path(args.env_file))
messages = CaptureClient(config, [f"{config['base_topic']}/bridge/devices"], duration=args.duration).run()
devices_payload = next((payload for topic, payload in messages if topic.endswith("/bridge/devices")), None)
if not devices_payload:
print("No bridge/devices payload received.", file=sys.stderr)
return 1
devices = parse_json_if_possible(devices_payload)
if not isinstance(devices, list):
print("Unexpected bridge/devices payload.", file=sys.stderr)
return 1
matched = [
device
for device in devices
if not args.devices
or device.get("friendly_name") in args.devices
or device.get("ieee_address") in args.devices
]
if not matched:
print("No matching devices found.")
return 1
print(json.dumps(matched, indent=2, sort_keys=True))
return 0
def cmd_state(args) -> int:
config = mqtt_config(Path(args.env_file))
subscriptions: list[str] = []
for device in args.devices:
subscriptions.append(normalize_topic(config["base_topic"], device))
if args.include_availability:
subscriptions.append(normalize_topic(config["base_topic"], f"{device}/availability"))
messages = capture(args, subscriptions)
if not messages:
print("No state payloads received.")
return 1
for topic, payload in messages:
print_message(topic, payload)
return 0
def build_get_payload(keys: list[str]) -> str:
if not keys:
return json.dumps({"state": ""})
return json.dumps({key: "" for key in keys})
def cmd_get(args) -> int:
config = mqtt_config(Path(args.env_file))
subscriptions = [
normalize_topic(config["base_topic"], args.device),
normalize_topic(config["base_topic"], f"{args.device}/availability"),
]
publications = [
{
"topic": normalize_topic(config["base_topic"], f"{args.device}/get"),
"payload": build_get_payload(args.keys),
}
]
messages = capture(args, subscriptions, publications)
if not messages:
print("No response received.")
return 1
for topic, payload in messages:
print_message(topic, payload)
return 0
def cmd_configure(args) -> int:
config = mqtt_config(Path(args.env_file))
messages = capture(
args,
[
f"{config['base_topic']}/bridge/response/device/configure",
f"{config['base_topic']}/bridge/log",
normalize_topic(config["base_topic"], args.device),
],
[
{
"topic": f"{config['base_topic']}/bridge/request/device/configure",
"payload": json.dumps({"id": args.device}),
}
],
)
if not messages:
print("No configure response received.")
return 1
for topic, payload in messages:
print_message(topic, payload)
return 0
def cmd_permit_join(args) -> int:
config = mqtt_config(Path(args.env_file))
subscriptions = [f"{config['base_topic']}/bridge/response/permit_join"]
if args.watch:
subscriptions.extend(
[
f"{config['base_topic']}/bridge/log",
f"{config['base_topic']}/bridge/event",
]
)
messages = capture(
args,
subscriptions,
[
{
"topic": f"{config['base_topic']}/bridge/request/permit_join",
"payload": json.dumps({"value": True, "time": args.time}),
}
],
)
if not messages:
print("No permit_join response received.")
return 1
for topic, payload in messages:
print_message(topic, payload)
return 0
def cmd_pair_watch(args) -> int:
config = mqtt_config(Path(args.env_file))
subscriptions = [
f"{config['base_topic']}/bridge/response/permit_join",
f"{config['base_topic']}/bridge/response/device/configure",
f"{config['base_topic']}/bridge/response/device/interview",
f"{config['base_topic']}/bridge/event",
f"{config['base_topic']}/bridge/log",
normalize_topic(config["base_topic"], args.device),
normalize_topic(config["base_topic"], f"{args.device}/availability"),
]
messages = capture(
args,
subscriptions,
[
{
"topic": f"{config['base_topic']}/bridge/request/permit_join",
"payload": json.dumps({"value": True, "time": args.time}),
}
],
)
if not messages:
print("No pair-watch messages received.")
return 1
for topic, payload in messages:
print_message(topic, payload)
return 0
def cmd_compare(args) -> int:
compare_args = argparse.Namespace(**vars(args))
compare_args.devices = [args.device_a, args.device_b]
compare_args.include_availability = True
print("== DEVICE RECORDS ==")
record_rc = cmd_device_record(compare_args)
print("== CURRENT STATE ==")
state_rc = cmd_state(compare_args)
if args.configure:
for device in (args.device_a, args.device_b):
print(f"== CONFIGURE {device} ==")
configure_args = argparse.Namespace(**vars(args))
configure_args.device = device
cmd_configure(configure_args)
return 0 if record_rc == 0 and state_rc == 0 else 1
def cmd_watch(args) -> int:
config = mqtt_config(Path(args.env_file))
subscriptions = [normalize_topic(config["base_topic"], topic) for topic in args.topics]
if args.include_bridge_log:
subscriptions.append(f"{config['base_topic']}/bridge/log")
if args.include_bridge_event:
subscriptions.append(f"{config['base_topic']}/bridge/event")
messages = capture(args, subscriptions)
if not messages:
print("No watched messages received.")
return 1
for topic, payload in messages:
print_message(topic, payload)
return 0
def cmd_publish(args) -> int:
config = mqtt_config(Path(args.env_file))
topic = normalize_topic(config["base_topic"], args.topic)
payload = args.payload
if args.json:
payload = json.dumps(json.loads(payload))
messages = capture(
args,
[topic] if args.echo else [],
[{"topic": topic, "payload": payload, "retain": args.retain, "qos": args.qos}],
)
print(f"Published to {topic}")
if args.echo:
for message_topic, message_payload in messages:
print_message(message_topic, message_payload)
return 0
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="MQTT and Zigbee2MQTT diagnostics for this repository.",
epilog=(
"Examples:\n"
" python scripts\\mqtt_z2m_diag.py health\n"
" python scripts\\mqtt_z2m_diag.py device-record nitori_salotto_1\n"
" python scripts\\mqtt_z2m_diag.py get nitori_salotto_1 --keys state_left state_center state_right\n"
" python scripts\\mqtt_z2m_diag.py configure nitori_salotto_1\n"
" python scripts\\mqtt_z2m_diag.py pair-watch nitori_salotto_1 --time 90 --duration 95\n"
" python scripts\\mqtt_z2m_diag.py compare nitori_salotto_1 Switch_Cucina_Neo --configure\n"
),
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"--env-file",
default=str(DEFAULT_ENV_PATH),
help="Path to the local MQTT env file. Default: .local\\mqtt-home.env",
)
subparsers = parser.add_subparsers(dest="command", required=True)
health = subparsers.add_parser("health", help="Check broker connectivity and Zigbee2MQTT bridge state.")
health.add_argument("--duration", type=float, default=3.0)
health.set_defaults(func=cmd_health)
device_record = subparsers.add_parser(
"device-record",
help="Read Zigbee2MQTT bridge/devices and print selected device records.",
)
device_record.add_argument("devices", nargs="*", help="Friendly names or IEEE addresses.")
device_record.add_argument("--duration", type=float, default=4.0)
device_record.set_defaults(func=cmd_device_record)
state = subparsers.add_parser("state", help="Read retained/live device state topics.")
state.add_argument("devices", nargs="+", help="Device topics without the base topic prefix.")
state.add_argument("--include-availability", action="store_true", default=True)
state.add_argument("--duration", type=float, default=4.0)
state.set_defaults(func=cmd_state)
get_parser = subparsers.add_parser("get", help="Send a Zigbee2MQTT device get request and wait for replies.")
get_parser.add_argument("device", help="Device topic without the base topic prefix.")
get_parser.add_argument("--keys", nargs="*", default=[], help="Keys to request. Default requests state.")
get_parser.add_argument("--duration", type=float, default=6.0)
get_parser.set_defaults(func=cmd_get)
configure = subparsers.add_parser("configure", help="Ask Zigbee2MQTT to configure a device.")
configure.add_argument("device", help="Device friendly name.")
configure.add_argument("--duration", type=float, default=10.0)
configure.set_defaults(func=cmd_configure)
permit_join = subparsers.add_parser("permit-join", help="Open Zigbee permit-join for a short time.")
permit_join.add_argument("--time", type=int, default=90, help="Permit-join duration in seconds.")
permit_join.add_argument("--watch", action="store_true", help="Also watch bridge log/event topics.")
permit_join.add_argument("--duration", type=float, default=8.0)
permit_join.set_defaults(func=cmd_permit_join)
pair_watch = subparsers.add_parser(
"pair-watch",
help="Open permit-join and watch a device for announce/interview/configure/state traffic.",
)
pair_watch.add_argument("device", help="Device friendly name.")
pair_watch.add_argument("--time", type=int, default=90, help="Permit-join duration in seconds.")
pair_watch.add_argument("--duration", type=float, default=95.0)
pair_watch.set_defaults(func=cmd_pair_watch)
compare = subparsers.add_parser(
"compare",
help="Compare two devices by bridge record and current state, with optional configure probes.",
)
compare.add_argument("device_a")
compare.add_argument("device_b")
compare.add_argument("--configure", action="store_true", help="Also issue configure requests.")
compare.add_argument("--duration", type=float, default=6.0)
compare.set_defaults(func=cmd_compare)
watch = subparsers.add_parser("watch", help="Watch arbitrary MQTT topics under the configured base topic.")
watch.add_argument("topics", nargs="+", help="Topics relative to the base topic unless already absolute.")
watch.add_argument("--include-bridge-log", action="store_true")
watch.add_argument("--include-bridge-event", action="store_true")
watch.add_argument("--duration", type=float, default=10.0)
watch.set_defaults(func=cmd_watch)
publish = subparsers.add_parser("publish", help="Publish a raw MQTT payload, optionally echoing replies.")
publish.add_argument("topic", help="Topic relative to the base topic unless already absolute.")
publish.add_argument("payload", help="Payload to publish.")
publish.add_argument("--json", action="store_true", help="Validate and normalize payload as JSON before publish.")
publish.add_argument("--retain", action="store_true")
publish.add_argument("--qos", type=int, default=0, choices=(0, 1, 2))
publish.add_argument("--echo", action="store_true", help="Subscribe to the same topic and print any messages seen.")
publish.add_argument("--duration", type=float, default=4.0)
publish.set_defaults(func=cmd_publish)
return parser
def main() -> int:
parser = build_parser()
args = parser.parse_args()
return args.func(args)
if __name__ == "__main__":
raise SystemExit(main())