Skip to main content

Programme "pont" complet

"""
MQTT → MySQL Bridge
====================
Subscribes to  serre/+/bac/+  and  controleur/+  topics on a Mosquitto broker
and persists every measurement and controller info into the `parc` database.

Environment variables (injected via docker-compose):
  MQTT_HOST          – hostname of the Mosquitto container  (default: mosquitto)
  MQTT_PORT          – broker port                          (default: 8883)
  MQTT_KEEPALIVE     – keepalive in seconds                 (default: 60)
  MQTT_USER          – optional broker username
  MQTT_PASSWORD      – optional broker password

  DB_HOST            – MySQL host                           (default: db)
  DB_PORT            – MySQL port                           (default: 3306)
  DB_NAME            – database name                        (default: parc)
  DB_USER            – database user                        (default: parc)
  DB_PASSWORD        – database password

  DB_RETRY_DELAY     – seconds between DB reconnection attempts (default: 5)
  MQTT_RETRY_DELAY   – seconds between MQTT reconnection attempts (default: 5)
  LOG_LEVEL          – DEBUG / INFO / WARNING / ERROR        (default: INFO)
"""

import json
import logging
import os
import re
import sys
import time
from datetime import datetime

import paho.mqtt.client as mqtt
import pymysql
import pymysql.cursors

# ---------------------------------------------------------------------------
# Logging setup
# ---------------------------------------------------------------------------
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
logging.basicConfig(
    stream=sys.stdout,
    level=getattr(logging, LOG_LEVEL, logging.INFO),
    format="%(asctime)s  %(levelname)-8s  %(name)s  %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("mqtt-bridge")

# ---------------------------------------------------------------------------
# Configuration from environment
# ---------------------------------------------------------------------------
MQTT_HOST        = os.getenv("MQTT_HOST",      "mosquitto")
MQTT_PORT        = int(os.getenv("MQTT_PORT",  "8883"))
MQTT_KEEPALIVE   = int(os.getenv("MQTT_KEEPALIVE", "60"))
MQTT_USER        = os.getenv("MQTT_USER",       None)
MQTT_PASSWORD    = os.getenv("MQTT_PASSWORD",   None)

DB_HOST          = os.getenv("DB_HOST",     "db")
DB_PORT          = int(os.getenv("DB_PORT", "3306"))
DB_NAME          = os.getenv("DB_NAME",     "parc")
DB_USER          = os.getenv("DB_USER",     "parc")
DB_PASSWORD      = os.getenv("DB_PASSWORD", "")

DB_RETRY_DELAY   = int(os.getenv("DB_RETRY_DELAY",   "5"))
MQTT_RETRY_DELAY = int(os.getenv("MQTT_RETRY_DELAY", "5"))

# Fixed capteur IDs as specified in the project brief
CAPTEUR_IDS: dict[str, int] = {
    "humiditeAmbiante":    1,
    "humiditeSol":         2,
    "temperatureAmbiante": 3,
}

# Topic patterns
TOPIC_RE        = re.compile(r"^serre/(\d+)/bac/(\d+)$")
CONTROLEUR_RE   = re.compile(r"^controleur/([^/]+)$")
DISCONNECT_RE   = re.compile(r"^controleur/([^/]+)/disconnect$")

# ---------------------------------------------------------------------------
# Global DB connection
# ---------------------------------------------------------------------------
_db_conn: pymysql.connections.Connection | None = None

# ---------------------------------------------------------------------------
# DB helpers
# ---------------------------------------------------------------------------

def db_connect() -> pymysql.connections.Connection:
    """Connect to MySQL, retrying indefinitely on failure."""
    while True:
        try:
            conn = pymysql.connect(
                host=DB_HOST,
                port=DB_PORT,
                user=DB_USER,
                password=DB_PASSWORD,
                database=DB_NAME,
                charset="utf8mb4",
                autocommit=True,
                connect_timeout=10,
                cursorclass=pymysql.cursors.DictCursor,
            )
            logger.info("Connected to database %s@%s:%s/%s", DB_USER, DB_HOST, DB_PORT, DB_NAME)
            return conn
        except pymysql.MySQLError as exc:
            logger.error("DB connection failed: %s – retrying in %ss", exc, DB_RETRY_DELAY)
            time.sleep(DB_RETRY_DELAY)


def ensure_db() -> pymysql.connections.Connection:
    """Return a live DB connection, reconnecting if necessary."""
    global _db_conn
    try:
        if _db_conn is None:
            raise pymysql.MySQLError("no connection yet")
        _db_conn.ping(reconnect=False)
    except pymysql.MySQLError:
        logger.warning("DB connection lost – reconnecting…")
        try:
            if _db_conn is not None:
                _db_conn.close()
        except Exception:
            pass
        _db_conn = db_connect()
        invalidate_caches()
    return _db_conn


def insert_error(type_erreur: str, message: str, valeur: str | None = None) -> None:
    """Write a row to the error table. Never raises – errors are only logged."""
    try:
        conn = ensure_db()
        with conn.cursor() as cur:
            cur.execute(
                "INSERT INTO error (type_erreur, message, valeur, erreur_a) "
                "VALUES (%s, %s, %s, %s)",
                (type_erreur[:64], message, valeur, datetime.now()),
            )
        logger.debug("Error logged → [%s] %s", type_erreur, message)
    except Exception as exc:
        logger.critical("Could not write to error table: %s", exc)


def insert_mesure(bac_id: int, capteur_id: int, value: float) -> None:
    """Insert one measurement row."""
    conn = ensure_db()
    with conn.cursor() as cur:
        cur.execute(
            "INSERT INTO mesure (bac, value, mesure_a, capteur) VALUES (%s, %s, %s, %s)",
            (bac_id, value, datetime.now(), capteur_id),
        )


def upsert_controleur(mac: str, ip: str, status: bool) -> None:
    """
    Insert or update a controller row.
    The MAC address extracted from the topic is used as id_controleur (primary key).
    """
    conn = ensure_db()
    with conn.cursor() as cur:
        cur.execute(
            """
            INSERT INTO controleur (id_controleur, ip, status)
            VALUES (%s, %s, %s)
            ON DUPLICATE KEY UPDATE
                ip     = VALUES(ip),
                status = VALUES(status)
            """,
            (mac, ip, int(status)),
        )
    logger.info(
        "Contrôleur enregistré  mac=%s  ip=%s  status=%s",
        mac, ip, status,
    )


def set_controleur_offline(mac: str) -> None:
    """Set a controller status to false (offline)."""
    try:
        conn = ensure_db()
        with conn.cursor() as cur:
            cur.execute(
                "UPDATE controleur SET status = 0 WHERE id_controleur = %s",
                (mac,),
            )
        logger.info("Contrôleur %s passé hors ligne", mac)
    except Exception as exc:
        msg = f"Erreur DB lors de la mise hors ligne du contrôleur {mac}: {exc}"
        logger.error(msg)
        insert_error("DB_ERROR", msg, None)


def resolve_bac(serre_numero: int, bac_numero: int) -> int | None:
    """
    Return id_bac for (serre.numero=serre_numero, bac.numero=bac_numero).
    Returns None when no matching row is found.
    """
    conn = ensure_db()
    with conn.cursor() as cur:
        cur.execute(
            """
            SELECT b.id_bac
            FROM   bac  b
            JOIN   serre s ON s.id_serre = b.serre
            WHERE  s.numero = %s
              AND  b.numero = %s
            LIMIT  1
            """,
            (serre_numero, bac_numero),
        )
        row = cur.fetchone()
    return row["id_bac"] if row else None


def get_capteur_limits(capteur_id: int) -> tuple[float | None, float | None]:
    """
    Return (valeurMinCapteur, valeurMaxCapteur) for a given capteur ID.
    Returns (None, None) when the capteur is not found.
    """
    conn = ensure_db()
    with conn.cursor() as cur:
        cur.execute(
            "SELECT valeurMinCapteur, valeurMaxCapteur FROM capteur WHERE id_capteur = %s",
            (capteur_id,),
        )
        row = cur.fetchone()
    if row:
        return row["valeurMinCapteur"], row["valeurMaxCapteur"]
    return None, None

# ---------------------------------------------------------------------------
# Cache
# ---------------------------------------------------------------------------
_bac_cache:     dict[tuple[int, int], int | None]           = {}
_capteur_cache: dict[int, tuple[float | None, float | None]] = {}


def cached_resolve_bac(serre_numero: int, bac_numero: int) -> int | None:
    key = (serre_numero, bac_numero)
    if key not in _bac_cache:
        _bac_cache[key] = resolve_bac(serre_numero, bac_numero)
    return _bac_cache[key]


def cached_capteur_limits(capteur_id: int) -> tuple[float | None, float | None]:
    if capteur_id not in _capteur_cache:
        _capteur_cache[capteur_id] = get_capteur_limits(capteur_id)
    return _capteur_cache[capteur_id]


def invalidate_caches() -> None:
    """Called after a DB reconnection so stale entries don't linger."""
    _bac_cache.clear()
    _capteur_cache.clear()
    logger.debug("Caches invalidated after DB reconnect")

# ---------------------------------------------------------------------------
# Message processing — mesures
# ---------------------------------------------------------------------------

def process_message(topic: str, raw_payload: str) -> None:
    """
    Full processing pipeline for one sensor MQTT message.

    1. Validate topic format
    2. Resolve serre / bac from DB
    3. Parse JSON payload
    4. For each measurement in the payload:
        a. Validate sensor name
        b. Validate value type
        c. Clamp value if out of bounds (and raise an error)
        d. Persist to mesure table
    """
    logger.debug("RECV  topic=%s  payload=%s", topic, raw_payload)

    # 1 – topic validation
    match = TOPIC_RE.match(topic)
    if not match:
        logger.warning("Ignored non-matching topic: %s", topic)
        return

    serre_numero = int(match.group(1))
    bac_numero   = int(match.group(2))

    # 2 – bac resolution
    try:
        bac_id = cached_resolve_bac(serre_numero, bac_numero)
    except Exception as exc:
        logger.error("DB error resolving bac for topic %s: %s", topic, exc)
        insert_error(
            "DB_ERROR",
            f"Impossible de résoudre bac pour le topic {topic}: {exc}",
            raw_payload,
        )
        return

    if bac_id is None:
        msg = (
            f"Topic {topic} correspond à la serre n°{serre_numero} / bac n°{bac_numero} "
            f"qui n'existe pas dans la base de données."
        )
        logger.warning(msg)
        insert_error("BAC_NOT_FOUND", msg, raw_payload)
        return

    # 3 – payload parsing
    try:
        payload = json.loads(raw_payload)
        if not isinstance(payload, dict):
            raise ValueError("Le payload JSON n'est pas un objet")
    except (json.JSONDecodeError, ValueError) as exc:
        msg = f"Payload JSON invalide sur le topic {topic}: {exc}"
        logger.warning(msg)
        insert_error("INVALID_PAYLOAD", msg, raw_payload)
        return

    # 4 – iterate measurements
    for sensor_name, raw_value in payload.items():
        _process_single_measure(topic, bac_id, sensor_name, raw_value, raw_payload)


def _process_single_measure(
    topic: str,
    bac_id: int,
    sensor_name: str,
    raw_value,
    raw_payload: str,
) -> None:
    """Process one sensor key/value pair from a payload."""

    # 4a – validate sensor name
    capteur_id = CAPTEUR_IDS.get(sensor_name)
    if capteur_id is None:
        msg = (
            f"Capteur inconnu « {sensor_name} » reçu sur le topic {topic}. "
            f"Capteurs attendus : {list(CAPTEUR_IDS.keys())}."
        )
        logger.warning(msg)
        insert_error("UNKNOWN_SENSOR", msg, raw_payload)
        return

    # 4b – validate value type
    try:
        value = float(raw_value)
    except (TypeError, ValueError):
        msg = f"Valeur non numérique pour {sensor_name} sur {topic}: {raw_value!r}"
        logger.warning(msg)
        insert_error("INVALID_VALUE", msg, str(raw_value))
        return

    # 4c – clamp to capteur limits if needed
    try:
        v_min, v_max = cached_capteur_limits(capteur_id)
    except Exception as exc:
        logger.error("DB error fetching capteur limits for %s: %s", sensor_name, exc)
        insert_error(
            "DB_ERROR",
            f"Impossible de récupérer les limites du capteur {sensor_name}: {exc}",
            str(raw_value),
        )
        return

    clamped_value = value

    if v_min is not None and value < v_min:
        msg = (
            f"Valeur {value} pour {sensor_name} (id={capteur_id}) sur le topic {topic} "
            f"est inférieure au minimum autorisé ({v_min}). Valeur enregistrée : {v_min}."
        )
        logger.warning(msg)
        insert_error("VALUE_OUT_OF_RANGE", msg, str(value))
        clamped_value = v_min

    elif v_max is not None and value > v_max:
        msg = (
            f"Valeur {value} pour {sensor_name} (id={capteur_id}) sur le topic {topic} "
            f"est supérieure au maximum autorisé ({v_max}). Valeur enregistrée : {v_max}."
        )
        logger.warning(msg)
        insert_error("VALUE_OUT_OF_RANGE", msg, str(value))
        clamped_value = v_max

    # 4d – persist
    try:
        insert_mesure(bac_id, capteur_id, clamped_value)
        logger.info(
            "Mesure enregistrée  serre=%s  bac=%s (id=%s)  capteur=%s (id=%s)  valeur=%s",
            topic.split("/")[1],
            topic.split("/")[3],
            bac_id,
            sensor_name,
            capteur_id,
            clamped_value,
        )
    except Exception as exc:
        msg = f"Erreur DB lors de l'insertion de la mesure ({sensor_name}={value}) sur {topic}: {exc}"
        logger.error(msg)
        insert_error("DB_INSERT_ERROR", msg, str(value))

# ---------------------------------------------------------------------------
# Message processing — contrôleurs
# ---------------------------------------------------------------------------

def process_controleur_message(topic: str, raw_payload: str) -> None:
    """
    Process a message received on controleur/<mac>.

    The MAC address from the topic IS the id_controleur (primary key).
    Expected payload: {"ip": "192.168.x.x", "status": true}
    """
    match = CONTROLEUR_RE.match(topic)
    if not match:
        return
    mac = match.group(1)

    # Parse JSON
    try:
        payload = json.loads(raw_payload)
        if not isinstance(payload, dict):
            raise ValueError("Le payload JSON n'est pas un objet")
    except (json.JSONDecodeError, ValueError) as exc:
        msg = f"Payload JSON invalide sur le topic {topic}: {exc}"
        logger.warning(msg)
        insert_error("INVALID_PAYLOAD", msg, raw_payload)
        return

    # Validate required fields
    missing = [k for k in ("ip", "status") if k not in payload]
    if missing:
        msg = f"Champs manquants dans le payload contrôleur {topic}: {missing}"
        logger.warning(msg)
        insert_error("INVALID_PAYLOAD", msg, raw_payload)
        return

    # Validate types
    try:
        ip     = str(payload["ip"])
        status = bool(payload["status"])
    except (TypeError, ValueError) as exc:
        msg = f"Type de champ invalide dans le payload contrôleur {topic}: {exc}"
        logger.warning(msg)
        insert_error("INVALID_VALUE", msg, raw_payload)
        return

    # Persist
    try:
        upsert_controleur(mac, ip, status)
    except Exception as exc:
        msg = f"Erreur DB lors de l'enregistrement du contrôleur {mac}: {exc}"
        logger.error(msg)
        insert_error("DB_INSERT_ERROR", msg, raw_payload)

# ---------------------------------------------------------------------------
# MQTT callbacks
# ---------------------------------------------------------------------------

def on_connect(client: mqtt.Client, userdata, flags, reason_code, properties=None):
    if reason_code == 0:
        logger.info("MQTT connecté à %s:%s", MQTT_HOST, MQTT_PORT)
        client.subscribe("serre/+/bac/+",           qos=1)
        client.subscribe("controleur/+",             qos=1)
        client.subscribe("controleur/+/disconnect",  qos=1)
        logger.info(
            "Abonné aux topics  serre/+/bac/+  |  controleur/+  |  controleur/+/disconnect"
        )
    else:
        logger.error("Échec de la connexion MQTT, code retour : %s", reason_code)


def on_disconnect(client: mqtt.Client, userdata, flags, reason_code, properties=None):
    if reason_code == 0:
        logger.info("Déconnexion MQTT propre")
    else:
        logger.warning(
            "Déconnexion MQTT inattendue (code=%s) – le client va retenter automatiquement",
            reason_code,
        )
        insert_error(
            "MQTT_DISCONNECT",
            f"Déconnexion MQTT inattendue (code={reason_code}). Reconnexion en cours…",
            None,
        )


def on_message(client: mqtt.Client, userdata, msg: mqtt.MQTTMessage):
    # Decode payload
    try:
        payload_str = msg.payload.decode("utf-8")
    except UnicodeDecodeError as exc:
        logger.warning("Payload non-UTF8 sur %s : %s", msg.topic, exc)
        insert_error(
            "INVALID_ENCODING",
            f"Payload non-UTF8 sur {msg.topic}: {exc}",
            repr(msg.payload),
        )
        return

    try:
        # Contrôleur déconnecté : controleur/<mac>/disconnect
        if DISCONNECT_RE.match(msg.topic):
            mac = msg.topic.split("/")[1]
            set_controleur_offline(mac)

        # Contrôleur connecté : controleur/<mac>
        elif CONTROLEUR_RE.match(msg.topic):
            process_controleur_message(msg.topic, payload_str)

        # Mesure capteur : serre/X/bac/Y
        else:
            process_message(msg.topic, payload_str)

    except Exception as exc:
        logger.exception(
            "Erreur inattendue lors du traitement du message %s: %s", msg.topic, exc
        )
        try:
            insert_error("UNEXPECTED_ERROR", str(exc), payload_str)
        except Exception:
            pass


def on_log(client, userdata, level, buf):
    if level == mqtt.MQTT_LOG_ERR:
        logger.debug("MQTT internal: %s", buf)

# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------

def main() -> None:
    logger.info("=== MQTT Bridge démarrage ===")
    logger.info(
        "Config MQTT : %s:%s  |  Config DB : %s@%s:%s/%s",
        MQTT_HOST, MQTT_PORT,
        DB_USER, DB_HOST, DB_PORT, DB_NAME,
    )

    # Initial DB connection – blocks until successful
    global _db_conn
    _db_conn = db_connect()

    # Pre-warm capteur cache
    for name, cid in CAPTEUR_IDS.items():
        limits = cached_capteur_limits(cid)
        logger.info("Capteur %-22s (id=%s) : min=%-6s max=%s", name, cid, limits[0], limits[1])

    # MQTT client setup
    client = mqtt.Client(
        mqtt.CallbackAPIVersion.VERSION2,
        client_id="mqtt-bridge",
        clean_session=True,
    )
    client.on_connect    = on_connect
    client.on_disconnect = on_disconnect
    client.on_message    = on_message
    client.on_log        = on_log

    if MQTT_USER:
        client.username_pw_set(MQTT_USER, MQTT_PASSWORD)

    # TLS setup
    client.tls_set(
        ca_certs="/certs/ca.crt",
        certfile="/certs/client.crt",
        keyfile="/certs/client.key",
    )

    client.reconnect_delay_set(min_delay=MQTT_RETRY_DELAY, max_delay=60)

    # Connect loop – keep trying until the broker is up
    while True:
        try:
            logger.info("Connexion au broker MQTT %s:%s…", MQTT_HOST, MQTT_PORT)
            client.connect(MQTT_HOST, MQTT_PORT, MQTT_KEEPALIVE)
            break
        except (OSError, ConnectionRefusedError) as exc:
            logger.error(
                "Broker MQTT injoignable : %s – retry dans %ss", exc, MQTT_RETRY_DELAY
            )
            time.sleep(MQTT_RETRY_DELAY)

    logger.info("Entrée dans la boucle MQTT principale")
    client.loop_forever(retry_first_connection=True)


if __name__ == "__main__":
    main()