Programme "pont" complet

"""
MQTT → MySQL Bridge
====================
Subscribes to serre/+/bac/+ and controleur/+ topics on a Mosquitto broker
and persists every measurement and controller info into the `parc` database.

Environment variables (injected via docker-compose):
 MQTT_HOST – hostname of the Mosquitto container (default: mosquitto)
 MQTT_PORT – broker port (default: 8883)
 MQTT_KEEPALIVE – keepalive in seconds (default: 60)
 MQTT_USER – optional broker username
 MQTT_PASSWORD – optional broker password

 DB_HOST – MySQL host (default: db)
 DB_PORT – MySQL port (default: 3306)
 DB_NAME – database name (default: parc)
 DB_USER – database user (default: parc)
 DB_PASSWORD – database password

 DB_RETRY_DELAY – seconds between DB reconnection attempts (default: 5)
 MQTT_RETRY_DELAY – seconds between MQTT reconnection attempts (default: 5)
 LOG_LEVEL – DEBUG / INFO / WARNING / ERROR (default: INFO)
"""

import json
import logging
import os
import re
import sys
import time
from datetime import datetime

import paho.mqtt.client as mqtt
import pymysql
import pymysql.cursors

# ---------------------------------------------------------------------------
# Logging setup
# ---------------------------------------------------------------------------
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
logging.basicConfig(
 stream=sys.stdout,
 level=getattr(logging, LOG_LEVEL, logging.INFO),
 format="%(asctime)s %(levelname)-8s %(name)s %(message)s",
 datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("mqtt-bridge")

# ---------------------------------------------------------------------------
# Configuration from environment
# ---------------------------------------------------------------------------
MQTT_HOST = os.getenv("MQTT_HOST", "mosquitto")
MQTT_PORT = int(os.getenv("MQTT_PORT", "8883"))
MQTT_KEEPALIVE = int(os.getenv("MQTT_KEEPALIVE", "60"))
MQTT_USER = os.getenv("MQTT_USER", None)
MQTT_PASSWORD = os.getenv("MQTT_PASSWORD", None)

DB_HOST = os.getenv("DB_HOST", "db")
DB_PORT = int(os.getenv("DB_PORT", "3306"))
DB_NAME = os.getenv("DB_NAME", "parc")
DB_USER = os.getenv("DB_USER", "parc")
DB_PASSWORD = os.getenv("DB_PASSWORD", "")

DB_RETRY_DELAY = int(os.getenv("DB_RETRY_DELAY", "5"))
MQTT_RETRY_DELAY = int(os.getenv("MQTT_RETRY_DELAY", "5"))

# Fixed capteur IDs as specified in the project brief
CAPTEUR_IDS: dict[str, int] = {
 "humiditeAmbiante": 1,
 "humiditeSol": 2,
 "temperatureAmbiante": 3,
}

# Topic patterns
TOPIC_RE = re.compile(r"^serre/(\d+)/bac/(\d+)$")
CONTROLEUR_RE = re.compile(r"^controleur/([^/]+)$")
DISCONNECT_RE = re.compile(r"^controleur/([^/]+)/disconnect$")

# ---------------------------------------------------------------------------
# Global DB connection
# ---------------------------------------------------------------------------
_db_conn: pymysql.connections.Connection | None = None

# ---------------------------------------------------------------------------
# DB helpers
# ---------------------------------------------------------------------------

def db_connect() -> pymysql.connections.Connection:
 """Connect to MySQL, retrying indefinitely on failure."""
 while True:
 try:
 conn = pymysql.connect(
 host=DB_HOST,
 port=DB_PORT,
 user=DB_USER,
 password=DB_PASSWORD,
 database=DB_NAME,
 charset="utf8mb4",
 autocommit=True,
 connect_timeout=10,
 cursorclass=pymysql.cursors.DictCursor,
 )
 logger.info("Connected to database %s@%s:%s/%s", DB_USER, DB_HOST, DB_PORT, DB_NAME)
 return conn
 except pymysql.MySQLError as exc:
 logger.error("DB connection failed: %s – retrying in %ss", exc, DB_RETRY_DELAY)
 time.sleep(DB_RETRY_DELAY)

def ensure_db() -> pymysql.connections.Connection:
 """Return a live DB connection, reconnecting if necessary."""
 global _db_conn
 try:
 if _db_conn is None:
 raise pymysql.MySQLError("no connection yet")
 _db_conn.ping(reconnect=False)
 except pymysql.MySQLError:
 logger.warning("DB connection lost – reconnecting…")
 try:
 if _db_conn is not None:
 _db_conn.close()
 except Exception:
 pass
 _db_conn = db_connect()
 invalidate_caches()
 return _db_conn

def insert_error(type_erreur: str, message: str, valeur: str | None = None) -> None:
 """Write a row to the error table. Never raises – errors are only logged."""
 try:
 conn = ensure_db()
 with conn.cursor() as cur:
 cur.execute(
 "INSERT INTO error (type_erreur, message, valeur, erreur_a) "
 "VALUES (%s, %s, %s, %s)",
 (type_erreur[:64], message, valeur, datetime.now()),
 )
 logger.debug("Error logged → [%s] %s", type_erreur, message)
 except Exception as exc:
 logger.critical("Could not write to error table: %s", exc)

def insert_mesure(bac_id: int, capteur_id: int, value: float) -> None:
 """Insert one measurement row."""
 conn = ensure_db()
 with conn.cursor() as cur:
 cur.execute(
 "INSERT INTO mesure (bac, value, mesure_a, capteur) VALUES (%s, %s, %s, %s)",
 (bac_id, value, datetime.now(), capteur_id),
 )

def upsert_controleur(mac: str, ip: str, status: bool) -> None:
 """
 Insert or update a controller row.
 The MAC address extracted from the topic is used as id_controleur (primary key).
 """
 conn = ensure_db()
 with conn.cursor() as cur:
 cur.execute(
 """
 INSERT INTO controleur (id_controleur, ip, status)
 VALUES (%s, %s, %s)
 ON DUPLICATE KEY UPDATE
 ip = VALUES(ip),
 status = VALUES(status)
 """,
 (mac, ip, int(status)),
 )
 logger.info(
 "Contrôleur enregistré mac=%s ip=%s status=%s",
 mac, ip, status,
 )

def set_controleur_offline(mac: str) -> None:
 """Set a controller status to false (offline)."""
 try:
 conn = ensure_db()
 with conn.cursor() as cur:
 cur.execute(
 "UPDATE controleur SET status = 0 WHERE id_controleur = %s",
 (mac,),
 )
 logger.info("Contrôleur %s passé hors ligne", mac)
 except Exception as exc:
 msg = f"Erreur DB lors de la mise hors ligne du contrôleur {mac}: {exc}"
 logger.error(msg)
 insert_error("DB_ERROR", msg, None)

def resolve_bac(serre_numero: int, bac_numero: int) -> int | None:
 """
 Return id_bac for (serre.numero=serre_numero, bac.numero=bac_numero).
 Returns None when no matching row is found.
 """
 conn = ensure_db()
 with conn.cursor() as cur:
 cur.execute(
 """
 SELECT b.id_bac
 FROM bac b
 JOIN serre s ON s.id_serre = b.serre
 WHERE s.numero = %s
 AND b.numero = %s
 LIMIT 1
 """,
 (serre_numero, bac_numero),
 )
 row = cur.fetchone()
 return row["id_bac"] if row else None

def get_capteur_limits(capteur_id: int) -> tuple[float | None, float | None]:
 """
 Return (valeurMinCapteur, valeurMaxCapteur) for a given capteur ID.
 Returns (None, None) when the capteur is not found.
 """
 conn = ensure_db()
 with conn.cursor() as cur:
 cur.execute(
 "SELECT valeurMinCapteur, valeurMaxCapteur FROM capteur WHERE id_capteur = %s",
 (capteur_id,),
 )
 row = cur.fetchone()
 if row:
 return row["valeurMinCapteur"], row["valeurMaxCapteur"]
 return None, None

# ---------------------------------------------------------------------------
# Cache
# ---------------------------------------------------------------------------
_bac_cache: dict[tuple[int, int], int | None] = {}
_capteur_cache: dict[int, tuple[float | None, float | None]] = {}

def cached_resolve_bac(serre_numero: int, bac_numero: int) -> int | None:
 key = (serre_numero, bac_numero)
 if key not in _bac_cache:
 _bac_cache[key] = resolve_bac(serre_numero, bac_numero)
 return _bac_cache[key]

def cached_capteur_limits(capteur_id: int) -> tuple[float | None, float | None]:
 if capteur_id not in _capteur_cache:
 _capteur_cache[capteur_id] = get_capteur_limits(capteur_id)
 return _capteur_cache[capteur_id]

def invalidate_caches() -> None:
 """Called after a DB reconnection so stale entries don't linger."""
 _bac_cache.clear()
 _capteur_cache.clear()
 logger.debug("Caches invalidated after DB reconnect")

# ---------------------------------------------------------------------------
# Message processing — mesures
# ---------------------------------------------------------------------------

def process_message(topic: str, raw_payload: str) -> None:
 """
 Full processing pipeline for one sensor MQTT message.

 1. Validate topic format
 2. Resolve serre / bac from DB
 3. Parse JSON payload
 4. For each measurement in the payload:
 a. Validate sensor name
 b. Validate value type
 c. Clamp value if out of bounds (and raise an error)
 d. Persist to mesure table
 """
 logger.debug("RECV topic=%s payload=%s", topic, raw_payload)

 # 1 – topic validation
 match = TOPIC_RE.match(topic)
 if not match:
 logger.warning("Ignored non-matching topic: %s", topic)
 return

 serre_numero = int(match.group(1))
 bac_numero = int(match.group(2))

 # 2 – bac resolution
 try:
 bac_id = cached_resolve_bac(serre_numero, bac_numero)
 except Exception as exc:
 logger.error("DB error resolving bac for topic %s: %s", topic, exc)
 insert_error(
 "DB_ERROR",
 f"Impossible de résoudre bac pour le topic {topic}: {exc}",
 raw_payload,
 )
 return

 if bac_id is None:
 msg = (
 f"Topic {topic} correspond à la serre n°{serre_numero} / bac n°{bac_numero} "
 f"qui n'existe pas dans la base de données."
 )
 logger.warning(msg)
 insert_error("BAC_NOT_FOUND", msg, raw_payload)
 return

 # 3 – payload parsing
 try:
 payload = json.loads(raw_payload)
 if not isinstance(payload, dict):
 raise ValueError("Le payload JSON n'est pas un objet")
 except (json.JSONDecodeError, ValueError) as exc:
 msg = f"Payload JSON invalide sur le topic {topic}: {exc}"
 logger.warning(msg)
 insert_error("INVALID_PAYLOAD", msg, raw_payload)
 return

 # 4 – iterate measurements
 for sensor_name, raw_value in payload.items():
 _process_single_measure(topic, bac_id, sensor_name, raw_value, raw_payload)

def _process_single_measure(
 topic: str,
 bac_id: int,
 sensor_name: str,
 raw_value,
 raw_payload: str,
) -> None:
 """Process one sensor key/value pair from a payload."""

 # 4a – validate sensor name
 capteur_id = CAPTEUR_IDS.get(sensor_name)
 if capteur_id is None:
 msg = (
 f"Capteur inconnu « {sensor_name} » reçu sur le topic {topic}. "
 f"Capteurs attendus : {list(CAPTEUR_IDS.keys())}."
 )
 logger.warning(msg)
 insert_error("UNKNOWN_SENSOR", msg, raw_payload)
 return

 # 4b – validate value type
 try:
 value = float(raw_value)
 except (TypeError, ValueError):
 msg = f"Valeur non numérique pour {sensor_name} sur {topic}: {raw_value!r}"
 logger.warning(msg)
 insert_error("INVALID_VALUE", msg, str(raw_value))
 return

 # 4c – clamp to capteur limits if needed
 try:
 v_min, v_max = cached_capteur_limits(capteur_id)
 except Exception as exc:
 logger.error("DB error fetching capteur limits for %s: %s", sensor_name, exc)
 insert_error(
 "DB_ERROR",
 f"Impossible de récupérer les limites du capteur {sensor_name}: {exc}",
 str(raw_value),
 )
 return

 clamped_value = value

 if v_min is not None and value < v_min:
 msg = (
 f"Valeur {value} pour {sensor_name} (id={capteur_id}) sur le topic {topic} "
 f"est inférieure au minimum autorisé ({v_min}). Valeur enregistrée : {v_min}."
 )
 logger.warning(msg)
 insert_error("VALUE_OUT_OF_RANGE", msg, str(value))
 clamped_value = v_min

 elif v_max is not None and value > v_max:
 msg = (
 f"Valeur {value} pour {sensor_name} (id={capteur_id}) sur le topic {topic} "
 f"est supérieure au maximum autorisé ({v_max}). Valeur enregistrée : {v_max}."
 )
 logger.warning(msg)
 insert_error("VALUE_OUT_OF_RANGE", msg, str(value))
 clamped_value = v_max

 # 4d – persist
 try:
 insert_mesure(bac_id, capteur_id, clamped_value)
 logger.info(
 "Mesure enregistrée serre=%s bac=%s (id=%s) capteur=%s (id=%s) valeur=%s",
 topic.split("/")[1],
 topic.split("/")[3],
 bac_id,
 sensor_name,
 capteur_id,
 clamped_value,
 )
 except Exception as exc:
 msg = f"Erreur DB lors de l'insertion de la mesure ({sensor_name}={value}) sur {topic}: {exc}"
 logger.error(msg)
 insert_error("DB_INSERT_ERROR", msg, str(value))

# ---------------------------------------------------------------------------
# Message processing — contrôleurs
# ---------------------------------------------------------------------------

def process_controleur_message(topic: str, raw_payload: str) -> None:
 """
 Process a message received on controleur/<mac>.

 The MAC address from the topic IS the id_controleur (primary key).
 Expected payload: {"ip": "192.168.x.x", "status": true}
 """
 match = CONTROLEUR_RE.match(topic)
 if not match:
 return
 mac = match.group(1)

 # Parse JSON
 try:
 payload = json.loads(raw_payload)
 if not isinstance(payload, dict):
 raise ValueError("Le payload JSON n'est pas un objet")
 except (json.JSONDecodeError, ValueError) as exc:
 msg = f"Payload JSON invalide sur le topic {topic}: {exc}"
 logger.warning(msg)
 insert_error("INVALID_PAYLOAD", msg, raw_payload)
 return

 # Validate required fields
 missing = [k for k in ("ip", "status") if k not in payload]
 if missing:
 msg = f"Champs manquants dans le payload contrôleur {topic}: {missing}"
 logger.warning(msg)
 insert_error("INVALID_PAYLOAD", msg, raw_payload)
 return

 # Validate types
 try:
 ip = str(payload["ip"])
 status = bool(payload["status"])
 except (TypeError, ValueError) as exc:
 msg = f"Type de champ invalide dans le payload contrôleur {topic}: {exc}"
 logger.warning(msg)
 insert_error("INVALID_VALUE", msg, raw_payload)
 return

 # Persist
 try:
 upsert_controleur(mac, ip, status)
 except Exception as exc:
 msg = f"Erreur DB lors de l'enregistrement du contrôleur {mac}: {exc}"
 logger.error(msg)
 insert_error("DB_INSERT_ERROR", msg, raw_payload)

# ---------------------------------------------------------------------------
# MQTT callbacks
# ---------------------------------------------------------------------------

def on_connect(client: mqtt.Client, userdata, flags, reason_code, properties=None):
 if reason_code == 0:
 logger.info("MQTT connecté à %s:%s", MQTT_HOST, MQTT_PORT)
 client.subscribe("serre/+/bac/+", qos=1)
 client.subscribe("controleur/+", qos=1)
 client.subscribe("controleur/+/disconnect", qos=1)
 logger.info(
 "Abonné aux topics serre/+/bac/+ | controleur/+ | controleur/+/disconnect"
 )
 else:
 logger.error("Échec de la connexion MQTT, code retour : %s", reason_code)

def on_disconnect(client: mqtt.Client, userdata, flags, reason_code, properties=None):
 if reason_code == 0:
 logger.info("Déconnexion MQTT propre")
 else:
 logger.warning(
 "Déconnexion MQTT inattendue (code=%s) – le client va retenter automatiquement",
 reason_code,
 )
 insert_error(
 "MQTT_DISCONNECT",
 f"Déconnexion MQTT inattendue (code={reason_code}). Reconnexion en cours…",
 None,
 )

def on_message(client: mqtt.Client, userdata, msg: mqtt.MQTTMessage):
 # Decode payload
 try:
 payload_str = msg.payload.decode("utf-8")
 except UnicodeDecodeError as exc:
 logger.warning("Payload non-UTF8 sur %s : %s", msg.topic, exc)
 insert_error(
 "INVALID_ENCODING",
 f"Payload non-UTF8 sur {msg.topic}: {exc}",
 repr(msg.payload),
 )
 return

 try:
 # Contrôleur déconnecté : controleur/<mac>/disconnect
 if DISCONNECT_RE.match(msg.topic):
 mac = msg.topic.split("/")[1]
 set_controleur_offline(mac)

 # Contrôleur connecté : controleur/<mac>
 elif CONTROLEUR_RE.match(msg.topic):
 process_controleur_message(msg.topic, payload_str)

 # Mesure capteur : serre/X/bac/Y
 else:
 process_message(msg.topic, payload_str)

 except Exception as exc:
 logger.exception(
 "Erreur inattendue lors du traitement du message %s: %s", msg.topic, exc
 )
 try:
 insert_error("UNEXPECTED_ERROR", str(exc), payload_str)
 except Exception:
 pass

def on_log(client, userdata, level, buf):
 if level == mqtt.MQTT_LOG_ERR:
 logger.debug("MQTT internal: %s", buf)

# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------

def main() -> None:
 logger.info("=== MQTT Bridge démarrage ===")
 logger.info(
 "Config MQTT : %s:%s | Config DB : %s@%s:%s/%s",
 MQTT_HOST, MQTT_PORT,
 DB_USER, DB_HOST, DB_PORT, DB_NAME,
 )

 # Initial DB connection – blocks until successful
 global _db_conn
 _db_conn = db_connect()

 # Pre-warm capteur cache
 for name, cid in CAPTEUR_IDS.items():
 limits = cached_capteur_limits(cid)
 logger.info("Capteur %-22s (id=%s) : min=%-6s max=%s", name, cid, limits[0], limits[1])

 # MQTT client setup
 client = mqtt.Client(
 mqtt.CallbackAPIVersion.VERSION2,
 client_id="mqtt-bridge",
 clean_session=True,
 )
 client.on_connect = on_connect
 client.on_disconnect = on_disconnect
 client.on_message = on_message
 client.on_log = on_log

 if MQTT_USER:
 client.username_pw_set(MQTT_USER, MQTT_PASSWORD)

 # TLS setup
 client.tls_set(
 ca_certs="/certs/ca.crt",
 certfile="/certs/client.crt",
 keyfile="/certs/client.key",
 )

 client.reconnect_delay_set(min_delay=MQTT_RETRY_DELAY, max_delay=60)

 # Connect loop – keep trying until the broker is up
 while True:
 try:
 logger.info("Connexion au broker MQTT %s:%s…", MQTT_HOST, MQTT_PORT)
 client.connect(MQTT_HOST, MQTT_PORT, MQTT_KEEPALIVE)
 break
 except (OSError, ConnectionRefusedError) as exc:
 logger.error(
 "Broker MQTT injoignable : %s – retry dans %ss", exc, MQTT_RETRY_DELAY
 )
 time.sleep(MQTT_RETRY_DELAY)

 logger.info("Entrée dans la boucle MQTT principale")
 client.loop_forever(retry_first_connection=True)

if __name__ == "__main__":
 main()