Skip to main content

Programme - Envois MQTT

"""
parc-stub · MQTT sensor simulator
Publishes sine-wave sensor data + controleur connect/disconnect over mTLS.
"""

import json
import logging
import math
import os
import random
import ssl
import threading
import time

import paho.mqtt.client as mqtt

logger = logging.getLogger("stub")

CONFIG_PATH = os.getenv("CONFIG_PATH", "/data/config.json")

# ── Persistence ───────────────────────────────────────────────────────────────
def _save(s: dict):
    """Save interval, bad_rate and serres layout (not log, not running)."""
    try:
        os.makedirs(os.path.dirname(CONFIG_PATH), exist_ok=True)
        payload = {
            "interval":  s["interval"],
            "bad_rate":  s["bad_rate"],
            "serres": [
                {k: v for k, v in serre.items() if k != "connected"}
                for serre in s["serres"]
            ],
        }
        tmp = CONFIG_PATH + ".tmp"
        with open(tmp, "w") as f:
            json.dump(payload, f, indent=2)
        os.replace(tmp, CONFIG_PATH)
    except Exception as e:
        logger.warning(f"Config save failed: {e}")

def _load() -> dict | None:
    try:
        with open(CONFIG_PATH) as f:
            return json.load(f)
    except FileNotFoundError:
        return None
    except Exception as e:
        logger.warning(f"Config load failed: {e}")
        return None


SENSORS = {
    "humiditeAmbiante":    {"min": 0,   "max": 100, "center": 60, "amp": 20},
    "humiditeSol":         {"min": 0,   "max": 100, "center": 55, "amp": 15},
    "temperatureAmbiante": {"min": -40, "max": 80,  "center": 22, "amp": 5},
}

def _default_serre(numero, mac=None, ip=None):
    return {
        "numero": numero,
        "mac": mac or _random_mac(),
        "ip": ip or f"192.168.1.{100 + numero}",
        "disconnect_rate": 0.0,   # probability per cycle of random disconnect
        "connected": False,       # current simulated connection state
        "bacs": [{"numero": 1}],
    }

def _random_mac():
    parts = [random.randint(0x00, 0xFF) for _ in range(6)]
    return ":".join(f"{p:02X}" for p in parts)

def _build_state() -> dict:
    saved = _load()
    if saved:
        serres = saved.get("serres", [])
        for s in serres:
            s["connected"] = False
            s.setdefault("disconnect_rate", 0.0)
        return {
            "running":  False,
            "interval": saved.get("interval", 10),
            "bad_rate": saved.get("bad_rate", 0.05),
            "serres":   serres,
            "log":      [],
        }
    return {
        "running": False,
        "interval": 10,
        "bad_rate": 0.05,
        "serres": [
            _default_serre(1, "24:D7:EB:38:DC:38", "192.168.1.101"),
            _default_serre(2, "24:D7:EB:AA:BB:CC", "192.168.1.102"),
        ],
        "log": [],
    }

state = _build_state()
# Per-serre manual connect/disconnect commands queued by API
# { serre_numero: "connect" | "disconnect" }
_cmd_queue: dict = {}

state_lock = threading.Lock()
_stop_event = threading.Event()
_thread = None


# ── Logging ───────────────────────────────────────────────────────────────────
def _log(msg: str, kind: str = "inf"):
    logger.info(msg)
    with state_lock:
        state["log"].append({"t": time.strftime("%H:%M:%S"), "msg": msg, "kind": kind})
        if len(state["log"]) > 150:
            state["log"].pop(0)


# ── Controleur MQTT helpers ───────────────────────────────────────────────────
def _publish_connect(client: mqtt.Client, serre: dict):
    mac = serre["mac"]
    topic = f"controleur/{mac}"
    payload = json.dumps({
        "topic":  topic,
        "ip":     serre["ip"],
        "status": True,
    })
    client.publish(topic, payload, retain=False)
    _log(f"[CONN] controleur/{mac}  ip={serre['ip']}", "conn")


def _publish_disconnect(client: mqtt.Client, serre: dict):
    mac = serre["mac"]
    lwt_topic = f"controleur/{mac}/disconnect"
    client.publish(lwt_topic, "", retain=True)
    _log(f"[DISC] controleur/{mac}/disconnect (LWT)", "disc")


# ── Sine / bad payload ────────────────────────────────────────────────────────
def _sine_value(sensor: str, t: float, phase: float = 0.0) -> float:
    cfg = SENSORS[sensor]
    val = cfg["center"] + cfg["amp"] * math.sin(2 * math.pi * t / 120 + phase)
    return round(val, 2)

def _bad_payload(_t):
    kind = random.choice(["out_of_range", "unknown_sensor", "malformed", "invalid_value"])
    if kind == "out_of_range":
        return json.dumps({"humiditeAmbiante": 150.0})
    if kind == "unknown_sensor":
        return json.dumps({"co2Level": 400.0})
    if kind == "malformed":
        return b"\xff\xfe not utf-8"
    return json.dumps({"temperatureAmbiante": "hot"})


# ── MQTT client ───────────────────────────────────────────────────────────────
def _build_client() -> mqtt.Client:
    host     = os.getenv("MQTT_HOST", "mosquitto")
    port     = int(os.getenv("MQTT_PORT", 8883))
    ca       = os.getenv("CA_CERT",     "/certs/ca.crt")
    certfile = os.getenv("CLIENT_CERT", "/certs/client.crt")
    keyfile  = os.getenv("CLIENT_KEY",  "/certs/client.key")

    client = mqtt.Client(client_id="parc-stub", protocol=mqtt.MQTTv5)
    client.tls_set(ca_certs=ca, certfile=certfile, keyfile=keyfile,
                   tls_version=ssl.PROTOCOL_TLS_CLIENT)
    client.on_connect    = lambda c, u, f, rc, p: _log(f"MQTT connecté (rc={rc})", "inf")
    client.on_disconnect = lambda c, u, rc, p:    _log(f"MQTT déconnecté (rc={rc})", "inf")
    _log(f"Connexion à {host}:{port} …")
    client.connect(host, port, keepalive=60)
    return client


# ── Publisher loop ────────────────────────────────────────────────────────────
def _publish_loop():
    try:
        client = _build_client()
    except Exception as e:
        _log(f"Échec connexion MQTT : {e}", "err")
        return

    client.loop_start()
    t0 = time.time()
    phases: dict = {}

    while not _stop_event.is_set():
        with state_lock:
            interval = state["interval"]
            bad_rate = state["bad_rate"]
            serres   = [dict(s) for s in state["serres"]]
            cmds     = dict(_cmd_queue)
            _cmd_queue.clear()

        t = time.time() - t0

        for serre in serres:
            sn  = serre["numero"]
            mac = serre["mac"]

            # ── handle manual commands ────────────────────────────────────────
            cmd = cmds.get(sn)
            if cmd == "connect" and not serre["connected"]:
                _publish_connect(client, serre)
                with state_lock:
                    for s in state["serres"]:
                        if s["numero"] == sn:
                            s["connected"] = True
                serre["connected"] = True
            elif cmd == "disconnect" and serre["connected"]:
                _publish_disconnect(client, serre)
                with state_lock:
                    for s in state["serres"]:
                        if s["numero"] == sn:
                            s["connected"] = False
                serre["connected"] = False

            # ── random disconnect ─────────────────────────────────────────────
            if serre["connected"] and serre["disconnect_rate"] > 0:
                if random.random() < serre["disconnect_rate"]:
                    _publish_disconnect(client, serre)
                    with state_lock:
                        for s in state["serres"]:
                            if s["numero"] == sn:
                                s["connected"] = False
                    serre["connected"] = False

            # ── sensor data — only when connected ────────────────────────────
            if not serre["connected"]:
                continue

            for bac in serre.get("bacs", []):
                bn  = bac["numero"]
                key = (sn, bn)
                if key not in phases:
                    phases[key] = {s: random.uniform(0, 2 * math.pi) for s in SENSORS}

                topic = f"serre/{sn}/bac/{bn}"
                if random.random() < bad_rate:
                    payload = _bad_payload(t)
                    _log(f"[BAD]  {topic} → {payload!r}", "bad")
                else:
                    data    = {s: _sine_value(s, t, phases[key][s]) for s in SENSORS}
                    payload = json.dumps(data)
                    _log(f"[OK]   {topic} → {payload}", "ok")
                try:
                    client.publish(topic, payload)
                except Exception as e:
                    _log(f"Erreur publish : {e}", "err")

        _stop_event.wait(interval)

    client.loop_stop()
    client.disconnect()
    _log("Stub arrêté.")


# ── Public API ────────────────────────────────────────────────────────────────
def start():
    global _thread
    if _thread and _thread.is_alive():
        return
    _stop_event.clear()
    _thread = threading.Thread(target=_publish_loop, daemon=True)
    _thread.start()
    with state_lock:
        state["running"] = True
        _save(state)
    _log("Stub démarré.")

def stop():
    _stop_event.set()
    with state_lock:
        state["running"] = False
    _log("Arrêt demandé.")

def connect_serre(numero: int):
    with state_lock:
        _cmd_queue[numero] = "connect"
    _log(f"[CMD] connexion serre {numero} demandée", "inf")

def disconnect_serre(numero: int):
    with state_lock:
        _cmd_queue[numero] = "disconnect"
    _log(f"[CMD] déconnexion serre {numero} demandée", "inf")

def save_config():
    """Called by Flask after any config/serres mutation."""
    with state_lock:
        _save(state)