Programme - Envois MQTT
"""
parc-stub · MQTT sensor simulator
Publishes sine-wave sensor data + controleur connect/disconnect over mTLS.
"""
import json
import logging
import math
import os
import random
import ssl
import threading
import time
import paho.mqtt.client as mqtt
logger = logging.getLogger("stub")
CONFIG_PATH = os.getenv("CONFIG_PATH", "/data/config.json")
# ── Persistence ───────────────────────────────────────────────────────────────
def _save(s: dict):
"""Save interval, bad_rate and serres layout (not log, not running)."""
try:
os.makedirs(os.path.dirname(CONFIG_PATH), exist_ok=True)
payload = {
"interval": s["interval"],
"bad_rate": s["bad_rate"],
"serres": [
{k: v for k, v in serre.items() if k != "connected"}
for serre in s["serres"]
],
}
tmp = CONFIG_PATH + ".tmp"
with open(tmp, "w") as f:
json.dump(payload, f, indent=2)
os.replace(tmp, CONFIG_PATH)
except Exception as e:
logger.warning(f"Config save failed: {e}")
def _load() -> dict | None:
try:
with open(CONFIG_PATH) as f:
return json.load(f)
except FileNotFoundError:
return None
except Exception as e:
logger.warning(f"Config load failed: {e}")
return None
SENSORS = {
"humiditeAmbiante": {"min": 0, "max": 100, "center": 60, "amp": 20},
"humiditeSol": {"min": 0, "max": 100, "center": 55, "amp": 15},
"temperatureAmbiante": {"min": -40, "max": 80, "center": 22, "amp": 5},
}
def _default_serre(numero, mac=None, ip=None):
return {
"numero": numero,
"mac": mac or _random_mac(),
"ip": ip or f"192.168.1.{100 + numero}",
"disconnect_rate": 0.0, # probability per cycle of random disconnect
"connected": False, # current simulated connection state
"bacs": [{"numero": 1}],
}
def _random_mac():
parts = [random.randint(0x00, 0xFF) for _ in range(6)]
return ":".join(f"{p:02X}" for p in parts)
def _build_state() -> dict:
saved = _load()
if saved:
serres = saved.get("serres", [])
for s in serres:
s["connected"] = False
s.setdefault("disconnect_rate", 0.0)
return {
"running": False,
"interval": saved.get("interval", 10),
"bad_rate": saved.get("bad_rate", 0.05),
"serres": serres,
"log": [],
}
return {
"running": False,
"interval": 10,
"bad_rate": 0.05,
"serres": [
_default_serre(1, "24:D7:EB:38:DC:38", "192.168.1.101"),
_default_serre(2, "24:D7:EB:AA:BB:CC", "192.168.1.102"),
],
"log": [],
}
state = _build_state()
# Per-serre manual connect/disconnect commands queued by API
# { serre_numero: "connect" | "disconnect" }
_cmd_queue: dict = {}
state_lock = threading.Lock()
_stop_event = threading.Event()
_thread = None
# ── Logging ───────────────────────────────────────────────────────────────────
def _log(msg: str, kind: str = "inf"):
logger.info(msg)
with state_lock:
state["log"].append({"t": time.strftime("%H:%M:%S"), "msg": msg, "kind": kind})
if len(state["log"]) > 150:
state["log"].pop(0)
# ── Controleur MQTT helpers ───────────────────────────────────────────────────
def _publish_connect(client: mqtt.Client, serre: dict):
mac = serre["mac"]
topic = f"controleur/{mac}"
payload = json.dumps({
"topic": topic,
"ip": serre["ip"],
"status": True,
})
client.publish(topic, payload, retain=False)
_log(f"[CONN] controleur/{mac} ip={serre['ip']}", "conn")
def _publish_disconnect(client: mqtt.Client, serre: dict):
mac = serre["mac"]
lwt_topic = f"controleur/{mac}/disconnect"
client.publish(lwt_topic, "", retain=True)
_log(f"[DISC] controleur/{mac}/disconnect (LWT)", "disc")
# ── Sine / bad payload ────────────────────────────────────────────────────────
def _sine_value(sensor: str, t: float, phase: float = 0.0) -> float:
cfg = SENSORS[sensor]
val = cfg["center"] + cfg["amp"] * math.sin(2 * math.pi * t / 120 + phase)
return round(val, 2)
def _bad_payload(_t):
kind = random.choice(["out_of_range", "unknown_sensor", "malformed", "invalid_value"])
if kind == "out_of_range":
return json.dumps({"humiditeAmbiante": 150.0})
if kind == "unknown_sensor":
return json.dumps({"co2Level": 400.0})
if kind == "malformed":
return b"\xff\xfe not utf-8"
return json.dumps({"temperatureAmbiante": "hot"})
# ── MQTT client ───────────────────────────────────────────────────────────────
def _build_client() -> mqtt.Client:
host = os.getenv("MQTT_HOST", "mosquitto")
port = int(os.getenv("MQTT_PORT", 8883))
ca = os.getenv("CA_CERT", "/certs/ca.crt")
certfile = os.getenv("CLIENT_CERT", "/certs/client.crt")
keyfile = os.getenv("CLIENT_KEY", "/certs/client.key")
client = mqtt.Client(client_id="parc-stub", protocol=mqtt.MQTTv5)
client.tls_set(ca_certs=ca, certfile=certfile, keyfile=keyfile,
tls_version=ssl.PROTOCOL_TLS_CLIENT)
client.on_connect = lambda c, u, f, rc, p: _log(f"MQTT connecté (rc={rc})", "inf")
client.on_disconnect = lambda c, u, rc, p: _log(f"MQTT déconnecté (rc={rc})", "inf")
_log(f"Connexion à {host}:{port} …")
client.connect(host, port, keepalive=60)
return client
# ── Publisher loop ────────────────────────────────────────────────────────────
def _publish_loop():
try:
client = _build_client()
except Exception as e:
_log(f"Échec connexion MQTT : {e}", "err")
return
client.loop_start()
t0 = time.time()
phases: dict = {}
while not _stop_event.is_set():
with state_lock:
interval = state["interval"]
bad_rate = state["bad_rate"]
serres = [dict(s) for s in state["serres"]]
cmds = dict(_cmd_queue)
_cmd_queue.clear()
t = time.time() - t0
for serre in serres:
sn = serre["numero"]
mac = serre["mac"]
# ── handle manual commands ────────────────────────────────────────
cmd = cmds.get(sn)
if cmd == "connect" and not serre["connected"]:
_publish_connect(client, serre)
with state_lock:
for s in state["serres"]:
if s["numero"] == sn:
s["connected"] = True
serre["connected"] = True
elif cmd == "disconnect" and serre["connected"]:
_publish_disconnect(client, serre)
with state_lock:
for s in state["serres"]:
if s["numero"] == sn:
s["connected"] = False
serre["connected"] = False
# ── random disconnect ─────────────────────────────────────────────
if serre["connected"] and serre["disconnect_rate"] > 0:
if random.random() < serre["disconnect_rate"]:
_publish_disconnect(client, serre)
with state_lock:
for s in state["serres"]:
if s["numero"] == sn:
s["connected"] = False
serre["connected"] = False
# ── sensor data — only when connected ────────────────────────────
if not serre["connected"]:
continue
for bac in serre.get("bacs", []):
bn = bac["numero"]
key = (sn, bn)
if key not in phases:
phases[key] = {s: random.uniform(0, 2 * math.pi) for s in SENSORS}
topic = f"serre/{sn}/bac/{bn}"
if random.random() < bad_rate:
payload = _bad_payload(t)
_log(f"[BAD] {topic} → {payload!r}", "bad")
else:
data = {s: _sine_value(s, t, phases[key][s]) for s in SENSORS}
payload = json.dumps(data)
_log(f"[OK] {topic} → {payload}", "ok")
try:
client.publish(topic, payload)
except Exception as e:
_log(f"Erreur publish : {e}", "err")
_stop_event.wait(interval)
client.loop_stop()
client.disconnect()
_log("Stub arrêté.")
# ── Public API ────────────────────────────────────────────────────────────────
def start():
global _thread
if _thread and _thread.is_alive():
return
_stop_event.clear()
_thread = threading.Thread(target=_publish_loop, daemon=True)
_thread.start()
with state_lock:
state["running"] = True
_save(state)
_log("Stub démarré.")
def stop():
_stop_event.set()
with state_lock:
state["running"] = False
_log("Arrêt demandé.")
def connect_serre(numero: int):
with state_lock:
_cmd_queue[numero] = "connect"
_log(f"[CMD] connexion serre {numero} demandée", "inf")
def disconnect_serre(numero: int):
with state_lock:
_cmd_queue[numero] = "disconnect"
_log(f"[CMD] déconnexion serre {numero} demandée", "inf")
def save_config():
"""Called by Flask after any config/serres mutation."""
with state_lock:
_save(state)
No comments to display
No comments to display