- Introduced `scripts/mqtt_z2m_diag.py` for reusable MQTT and Zigbee2MQTT diagnostics. - Added `copilot-instructions.md` section for MQTT/Zigbee diagnostics tool usage. - Created `docs/mqtt-broker-broad-analysis.md` for comprehensive MQTT broker analysis. - Documented Salotto Overview Switch investigation in `docs/salotto-overview-switch-investigation.md`.
460 lines
17 KiB
Python
460 lines
17 KiB
Python
#!/usr/bin/env python
|
|
"""Reusable MQTT and Zigbee2MQTT diagnostics for this repository."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any, Iterable
|
|
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[1]
|
|
DEFAULT_ENV_PATH = REPO_ROOT / ".local" / "mqtt-home.env"
|
|
DEFAULT_BASE_TOPIC = "zigbee2mqtt_2"
|
|
|
|
|
|
def ensure_paho():
|
|
try:
|
|
import paho.mqtt.client as mqtt # type: ignore
|
|
except Exception:
|
|
subprocess.check_call([sys.executable, "-m", "pip", "install", "--quiet", "paho-mqtt"])
|
|
import paho.mqtt.client as mqtt # type: ignore
|
|
return mqtt
|
|
|
|
|
|
def load_env_file(path: Path) -> None:
|
|
if not path.exists():
|
|
raise FileNotFoundError(
|
|
f"MQTT env file not found at {path}. Create it first (see .github/copilot-instructions.md)."
|
|
)
|
|
for raw_line in path.read_text(encoding="utf-8").splitlines():
|
|
line = raw_line.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
key, value = line.split("=", 1)
|
|
os.environ.setdefault(key.strip(), value.strip())
|
|
|
|
|
|
def mqtt_config(env_path: Path) -> dict[str, Any]:
|
|
load_env_file(env_path)
|
|
missing = [
|
|
key
|
|
for key in ("MQTT_HOST", "MQTT_PORT", "MQTT_USERNAME", "MQTT_PASSWORD")
|
|
if not os.environ.get(key)
|
|
]
|
|
if missing:
|
|
raise RuntimeError(f"Missing MQTT settings in {env_path}: {', '.join(missing)}")
|
|
return {
|
|
"host": os.environ["MQTT_HOST"],
|
|
"port": int(os.environ["MQTT_PORT"]),
|
|
"username": os.environ["MQTT_USERNAME"],
|
|
"password": os.environ["MQTT_PASSWORD"],
|
|
"base_topic": os.environ.get("MQTT_BASE_TOPIC", DEFAULT_BASE_TOPIC),
|
|
}
|
|
|
|
|
|
def parse_json_if_possible(payload: str) -> Any:
|
|
try:
|
|
return json.loads(payload)
|
|
except Exception:
|
|
return payload
|
|
|
|
|
|
def format_payload(payload: str) -> str:
|
|
parsed = parse_json_if_possible(payload)
|
|
if isinstance(parsed, (dict, list)):
|
|
return json.dumps(parsed, indent=2, sort_keys=True)
|
|
return str(parsed)
|
|
|
|
|
|
def print_message(topic: str, payload: str) -> None:
|
|
print(f"TOPIC: {topic}")
|
|
print(format_payload(payload))
|
|
print("---")
|
|
|
|
|
|
def normalize_topic(base_topic: str, topic: str) -> str:
|
|
if topic.startswith(base_topic + "/") or topic.startswith("$"):
|
|
return topic
|
|
return f"{base_topic}/{topic}"
|
|
|
|
|
|
class CaptureClient:
|
|
def __init__(
|
|
self,
|
|
config: dict[str, Any],
|
|
subscriptions: Iterable[str],
|
|
publications: Iterable[dict[str, Any]] | None = None,
|
|
duration: float = 5.0,
|
|
) -> None:
|
|
self._mqtt = ensure_paho()
|
|
self.config = config
|
|
self.subscriptions = list(dict.fromkeys(subscriptions))
|
|
self.publications = list(publications or [])
|
|
self.duration = duration
|
|
self.messages: list[tuple[str, str]] = []
|
|
|
|
def run(self) -> list[tuple[str, str]]:
|
|
client = self._mqtt.Client(self._mqtt.CallbackAPIVersion.VERSION2)
|
|
client.username_pw_set(self.config["username"], self.config["password"])
|
|
|
|
def on_connect(client, userdata, flags, reason_code, properties=None):
|
|
for topic in self.subscriptions:
|
|
client.subscribe(topic)
|
|
for publication in self.publications:
|
|
client.publish(
|
|
publication["topic"],
|
|
publication["payload"],
|
|
qos=publication.get("qos", 0),
|
|
retain=publication.get("retain", False),
|
|
)
|
|
|
|
def on_message(client, userdata, msg):
|
|
self.messages.append((msg.topic, msg.payload.decode("utf-8", "replace")))
|
|
|
|
client.on_connect = on_connect
|
|
client.on_message = on_message
|
|
client.connect(self.config["host"], self.config["port"], 20)
|
|
client.loop_start()
|
|
time.sleep(self.duration)
|
|
client.loop_stop()
|
|
client.disconnect()
|
|
return self.messages
|
|
|
|
|
|
def unique_messages(messages: Iterable[tuple[str, str]]) -> list[tuple[str, str]]:
|
|
seen: set[tuple[str, str]] = set()
|
|
result: list[tuple[str, str]] = []
|
|
for message in messages:
|
|
if message in seen:
|
|
continue
|
|
seen.add(message)
|
|
result.append(message)
|
|
return result
|
|
|
|
|
|
def capture(args, subscriptions: list[str], publications: list[dict[str, Any]] | None = None) -> list[tuple[str, str]]:
|
|
config = mqtt_config(Path(args.env_file))
|
|
client = CaptureClient(config, subscriptions, publications, duration=args.duration)
|
|
return unique_messages(client.run())
|
|
|
|
|
|
def cmd_health(args) -> int:
|
|
config = mqtt_config(Path(args.env_file))
|
|
messages = CaptureClient(config, [f"{config['base_topic']}/bridge/state"], duration=args.duration).run()
|
|
print(f"Broker: {config['host']}:{config['port']}")
|
|
print(f"Base topic: {config['base_topic']}")
|
|
print("---")
|
|
for topic, payload in unique_messages(messages):
|
|
print_message(topic, payload)
|
|
return 0
|
|
|
|
|
|
def cmd_device_record(args) -> int:
|
|
config = mqtt_config(Path(args.env_file))
|
|
messages = CaptureClient(config, [f"{config['base_topic']}/bridge/devices"], duration=args.duration).run()
|
|
devices_payload = next((payload for topic, payload in messages if topic.endswith("/bridge/devices")), None)
|
|
if not devices_payload:
|
|
print("No bridge/devices payload received.", file=sys.stderr)
|
|
return 1
|
|
devices = parse_json_if_possible(devices_payload)
|
|
if not isinstance(devices, list):
|
|
print("Unexpected bridge/devices payload.", file=sys.stderr)
|
|
return 1
|
|
matched = [
|
|
device
|
|
for device in devices
|
|
if not args.devices
|
|
or device.get("friendly_name") in args.devices
|
|
or device.get("ieee_address") in args.devices
|
|
]
|
|
if not matched:
|
|
print("No matching devices found.")
|
|
return 1
|
|
print(json.dumps(matched, indent=2, sort_keys=True))
|
|
return 0
|
|
|
|
|
|
def cmd_state(args) -> int:
|
|
config = mqtt_config(Path(args.env_file))
|
|
subscriptions: list[str] = []
|
|
for device in args.devices:
|
|
subscriptions.append(normalize_topic(config["base_topic"], device))
|
|
if args.include_availability:
|
|
subscriptions.append(normalize_topic(config["base_topic"], f"{device}/availability"))
|
|
messages = capture(args, subscriptions)
|
|
if not messages:
|
|
print("No state payloads received.")
|
|
return 1
|
|
for topic, payload in messages:
|
|
print_message(topic, payload)
|
|
return 0
|
|
|
|
|
|
def build_get_payload(keys: list[str]) -> str:
|
|
if not keys:
|
|
return json.dumps({"state": ""})
|
|
return json.dumps({key: "" for key in keys})
|
|
|
|
|
|
def cmd_get(args) -> int:
|
|
config = mqtt_config(Path(args.env_file))
|
|
subscriptions = [
|
|
normalize_topic(config["base_topic"], args.device),
|
|
normalize_topic(config["base_topic"], f"{args.device}/availability"),
|
|
]
|
|
publications = [
|
|
{
|
|
"topic": normalize_topic(config["base_topic"], f"{args.device}/get"),
|
|
"payload": build_get_payload(args.keys),
|
|
}
|
|
]
|
|
messages = capture(args, subscriptions, publications)
|
|
if not messages:
|
|
print("No response received.")
|
|
return 1
|
|
for topic, payload in messages:
|
|
print_message(topic, payload)
|
|
return 0
|
|
|
|
|
|
def cmd_configure(args) -> int:
|
|
config = mqtt_config(Path(args.env_file))
|
|
messages = capture(
|
|
args,
|
|
[
|
|
f"{config['base_topic']}/bridge/response/device/configure",
|
|
f"{config['base_topic']}/bridge/log",
|
|
normalize_topic(config["base_topic"], args.device),
|
|
],
|
|
[
|
|
{
|
|
"topic": f"{config['base_topic']}/bridge/request/device/configure",
|
|
"payload": json.dumps({"id": args.device}),
|
|
}
|
|
],
|
|
)
|
|
if not messages:
|
|
print("No configure response received.")
|
|
return 1
|
|
for topic, payload in messages:
|
|
print_message(topic, payload)
|
|
return 0
|
|
|
|
|
|
def cmd_permit_join(args) -> int:
|
|
config = mqtt_config(Path(args.env_file))
|
|
subscriptions = [f"{config['base_topic']}/bridge/response/permit_join"]
|
|
if args.watch:
|
|
subscriptions.extend(
|
|
[
|
|
f"{config['base_topic']}/bridge/log",
|
|
f"{config['base_topic']}/bridge/event",
|
|
]
|
|
)
|
|
messages = capture(
|
|
args,
|
|
subscriptions,
|
|
[
|
|
{
|
|
"topic": f"{config['base_topic']}/bridge/request/permit_join",
|
|
"payload": json.dumps({"value": True, "time": args.time}),
|
|
}
|
|
],
|
|
)
|
|
if not messages:
|
|
print("No permit_join response received.")
|
|
return 1
|
|
for topic, payload in messages:
|
|
print_message(topic, payload)
|
|
return 0
|
|
|
|
|
|
def cmd_pair_watch(args) -> int:
|
|
config = mqtt_config(Path(args.env_file))
|
|
subscriptions = [
|
|
f"{config['base_topic']}/bridge/response/permit_join",
|
|
f"{config['base_topic']}/bridge/response/device/configure",
|
|
f"{config['base_topic']}/bridge/response/device/interview",
|
|
f"{config['base_topic']}/bridge/event",
|
|
f"{config['base_topic']}/bridge/log",
|
|
normalize_topic(config["base_topic"], args.device),
|
|
normalize_topic(config["base_topic"], f"{args.device}/availability"),
|
|
]
|
|
messages = capture(
|
|
args,
|
|
subscriptions,
|
|
[
|
|
{
|
|
"topic": f"{config['base_topic']}/bridge/request/permit_join",
|
|
"payload": json.dumps({"value": True, "time": args.time}),
|
|
}
|
|
],
|
|
)
|
|
if not messages:
|
|
print("No pair-watch messages received.")
|
|
return 1
|
|
for topic, payload in messages:
|
|
print_message(topic, payload)
|
|
return 0
|
|
|
|
|
|
def cmd_compare(args) -> int:
|
|
compare_args = argparse.Namespace(**vars(args))
|
|
compare_args.devices = [args.device_a, args.device_b]
|
|
compare_args.include_availability = True
|
|
print("== DEVICE RECORDS ==")
|
|
record_rc = cmd_device_record(compare_args)
|
|
print("== CURRENT STATE ==")
|
|
state_rc = cmd_state(compare_args)
|
|
if args.configure:
|
|
for device in (args.device_a, args.device_b):
|
|
print(f"== CONFIGURE {device} ==")
|
|
configure_args = argparse.Namespace(**vars(args))
|
|
configure_args.device = device
|
|
cmd_configure(configure_args)
|
|
return 0 if record_rc == 0 and state_rc == 0 else 1
|
|
|
|
|
|
def cmd_watch(args) -> int:
|
|
config = mqtt_config(Path(args.env_file))
|
|
subscriptions = [normalize_topic(config["base_topic"], topic) for topic in args.topics]
|
|
if args.include_bridge_log:
|
|
subscriptions.append(f"{config['base_topic']}/bridge/log")
|
|
if args.include_bridge_event:
|
|
subscriptions.append(f"{config['base_topic']}/bridge/event")
|
|
messages = capture(args, subscriptions)
|
|
if not messages:
|
|
print("No watched messages received.")
|
|
return 1
|
|
for topic, payload in messages:
|
|
print_message(topic, payload)
|
|
return 0
|
|
|
|
|
|
def cmd_publish(args) -> int:
|
|
config = mqtt_config(Path(args.env_file))
|
|
topic = normalize_topic(config["base_topic"], args.topic)
|
|
payload = args.payload
|
|
if args.json:
|
|
payload = json.dumps(json.loads(payload))
|
|
messages = capture(
|
|
args,
|
|
[topic] if args.echo else [],
|
|
[{"topic": topic, "payload": payload, "retain": args.retain, "qos": args.qos}],
|
|
)
|
|
print(f"Published to {topic}")
|
|
if args.echo:
|
|
for message_topic, message_payload in messages:
|
|
print_message(message_topic, message_payload)
|
|
return 0
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(
|
|
description="MQTT and Zigbee2MQTT diagnostics for this repository.",
|
|
epilog=(
|
|
"Examples:\n"
|
|
" python scripts\\mqtt_z2m_diag.py health\n"
|
|
" python scripts\\mqtt_z2m_diag.py device-record nitori_salotto_1\n"
|
|
" python scripts\\mqtt_z2m_diag.py get nitori_salotto_1 --keys state_left state_center state_right\n"
|
|
" python scripts\\mqtt_z2m_diag.py configure nitori_salotto_1\n"
|
|
" python scripts\\mqtt_z2m_diag.py pair-watch nitori_salotto_1 --time 90 --duration 95\n"
|
|
" python scripts\\mqtt_z2m_diag.py compare nitori_salotto_1 Switch_Cucina_Neo --configure\n"
|
|
),
|
|
formatter_class=argparse.RawTextHelpFormatter,
|
|
)
|
|
parser.add_argument(
|
|
"--env-file",
|
|
default=str(DEFAULT_ENV_PATH),
|
|
help="Path to the local MQTT env file. Default: .local\\mqtt-home.env",
|
|
)
|
|
subparsers = parser.add_subparsers(dest="command", required=True)
|
|
|
|
health = subparsers.add_parser("health", help="Check broker connectivity and Zigbee2MQTT bridge state.")
|
|
health.add_argument("--duration", type=float, default=3.0)
|
|
health.set_defaults(func=cmd_health)
|
|
|
|
device_record = subparsers.add_parser(
|
|
"device-record",
|
|
help="Read Zigbee2MQTT bridge/devices and print selected device records.",
|
|
)
|
|
device_record.add_argument("devices", nargs="*", help="Friendly names or IEEE addresses.")
|
|
device_record.add_argument("--duration", type=float, default=4.0)
|
|
device_record.set_defaults(func=cmd_device_record)
|
|
|
|
state = subparsers.add_parser("state", help="Read retained/live device state topics.")
|
|
state.add_argument("devices", nargs="+", help="Device topics without the base topic prefix.")
|
|
state.add_argument("--include-availability", action="store_true", default=True)
|
|
state.add_argument("--duration", type=float, default=4.0)
|
|
state.set_defaults(func=cmd_state)
|
|
|
|
get_parser = subparsers.add_parser("get", help="Send a Zigbee2MQTT device get request and wait for replies.")
|
|
get_parser.add_argument("device", help="Device topic without the base topic prefix.")
|
|
get_parser.add_argument("--keys", nargs="*", default=[], help="Keys to request. Default requests state.")
|
|
get_parser.add_argument("--duration", type=float, default=6.0)
|
|
get_parser.set_defaults(func=cmd_get)
|
|
|
|
configure = subparsers.add_parser("configure", help="Ask Zigbee2MQTT to configure a device.")
|
|
configure.add_argument("device", help="Device friendly name.")
|
|
configure.add_argument("--duration", type=float, default=10.0)
|
|
configure.set_defaults(func=cmd_configure)
|
|
|
|
permit_join = subparsers.add_parser("permit-join", help="Open Zigbee permit-join for a short time.")
|
|
permit_join.add_argument("--time", type=int, default=90, help="Permit-join duration in seconds.")
|
|
permit_join.add_argument("--watch", action="store_true", help="Also watch bridge log/event topics.")
|
|
permit_join.add_argument("--duration", type=float, default=8.0)
|
|
permit_join.set_defaults(func=cmd_permit_join)
|
|
|
|
pair_watch = subparsers.add_parser(
|
|
"pair-watch",
|
|
help="Open permit-join and watch a device for announce/interview/configure/state traffic.",
|
|
)
|
|
pair_watch.add_argument("device", help="Device friendly name.")
|
|
pair_watch.add_argument("--time", type=int, default=90, help="Permit-join duration in seconds.")
|
|
pair_watch.add_argument("--duration", type=float, default=95.0)
|
|
pair_watch.set_defaults(func=cmd_pair_watch)
|
|
|
|
compare = subparsers.add_parser(
|
|
"compare",
|
|
help="Compare two devices by bridge record and current state, with optional configure probes.",
|
|
)
|
|
compare.add_argument("device_a")
|
|
compare.add_argument("device_b")
|
|
compare.add_argument("--configure", action="store_true", help="Also issue configure requests.")
|
|
compare.add_argument("--duration", type=float, default=6.0)
|
|
compare.set_defaults(func=cmd_compare)
|
|
|
|
watch = subparsers.add_parser("watch", help="Watch arbitrary MQTT topics under the configured base topic.")
|
|
watch.add_argument("topics", nargs="+", help="Topics relative to the base topic unless already absolute.")
|
|
watch.add_argument("--include-bridge-log", action="store_true")
|
|
watch.add_argument("--include-bridge-event", action="store_true")
|
|
watch.add_argument("--duration", type=float, default=10.0)
|
|
watch.set_defaults(func=cmd_watch)
|
|
|
|
publish = subparsers.add_parser("publish", help="Publish a raw MQTT payload, optionally echoing replies.")
|
|
publish.add_argument("topic", help="Topic relative to the base topic unless already absolute.")
|
|
publish.add_argument("payload", help="Payload to publish.")
|
|
publish.add_argument("--json", action="store_true", help="Validate and normalize payload as JSON before publish.")
|
|
publish.add_argument("--retain", action="store_true")
|
|
publish.add_argument("--qos", type=int, default=0, choices=(0, 1, 2))
|
|
publish.add_argument("--echo", action="store_true", help="Subscribe to the same topic and print any messages seen.")
|
|
publish.add_argument("--duration", type=float, default=4.0)
|
|
publish.set_defaults(func=cmd_publish)
|
|
|
|
return parser
|
|
|
|
|
|
def main() -> int:
|
|
parser = build_parser()
|
|
args = parser.parse_args()
|
|
return args.func(args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|