Skip to main content

02 - Application "pont" BDD

Parc MySQL icon – MQTT mosquitto icon → App "pont" python icon

Un programme fiable et efficace qui fait le pont entre le broker MQTT Mosquitto et la base de données MySQL du projet.

Programme génére à l'aide de ClaudeCode claude icon


Architecture

[IoT sensors / controleurs]
        │  MQTT  serre/X/bac/Y
        ▼
 ┌─────────────┐         ┌──────────────────────┐
 │  Mosquitto  │ ───────▶│  bridge.py (Python)  │ ──▶  MySQL (external)
 │  (broker)   │         │  subscribe + persist │
 └─────────────┘         └──────────────────────┘
 Docker container             Docker container

Les 2 conteneurs partages le même réseau docker (parc-net). Le serveur MySQL n'est pas géré sur cette machine.


Application docker

Lors du lancement du docker compose, l'application est construite à partir du DockerFile suivant.

FROM python:3.12-slim

# Dépendences système (mise à jours + installation de pip)
RUN apt-get update && apt-get install -y --no-install-recommends \
        gcc \
        default-libmysqlclient-dev \
        pkg-config \
    && rm -rf /var/lib/apt/lists/*

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY bridge.py .

# Lancé en non-root
RUN useradd -r -u 1001 bridge
USER bridge

CMD ["python", "-u", "bridge.py"]

MQTT topics

Format topic MQTT Description
serre/<x>/bac/<y> Mesure du bac y dans la serre x

Format du payload (JSON)

{ "temperatureAmbiante": 28.5 }
{ "humiditeAmbiante": 65.3, "temperatureAmbiante": 22.1 }
{ "humiditeAmbiante": 18.7, "temperatureAmbiante": 23, "humiditeSol": 58.6 }

Chaque payload peut contenir une seule ou plusieurs mesures.

Capteurs

Clé JSON id_capteur
humiditeAmbiante 1
humiditeSol 2
temperatureAmbiante 3

Support des erreurs

L'intégralité des erreurs sont enregistré dans la table error avec chacune une valeure dans la table type_erreur:

Code Déclencheur
BAC_NOT_FOUND Numéro de bac ou serre non présent en base de données
UNKNOWN_SENSOR La clé du capteur n'est pas dans la liste connu
INVALID_PAYLOAD Erreur de format du JSON ou contenu du payload incorrect
INVALID_VALUE Valeur non numérique reçu
VALUE_OUT_OF_RANGE Valeur hors limite des valeurs min/max du capteur
INVALID_ENCODING Non-UTF-8 payload
MQTT_DISCONNECT Déconnexion du broker
DB_ERROR Communications avec la base de données échoué
DB_INSERT_ERROR Echec d'insertion d'une mesure en base de donnée
UNEXPECTED_ERROR Erreur générale (cas non prévu)

Quand une valeur est hors plage du capteur, le minimum / maximum est écrit en base de donné et un ligne d'erreur est inséré.


Variables d'environment

Variable valeur par défaut Description
DB_HOST (requis) hébergeur MySQL
DB_PORT 3306 port du serveur MySQL
DB_NAME parc nom de la base de données
DB_USER (requis) utilisateur MySQL
DB_PASSWORD (requis) mot de passe MySQL
MQTT_HOST mosquitto nom de l'hébergeur Broker
MQTT_PORT 1883 port du Broker
MQTT_KEEPALIVE 60 MQTT keepalive (seconde)
MQTT_USER (vide) utilisateur du Broker (optionel)
MQTT_PASSWORD (vide) mot de passe du Broker (optionel)
DB_RETRY_DELAY 5 secondes entre les essais d'écriture en bdd
MQTT_RETRY_DELAY 5 secondes entre les essais MQTT
LOG_LEVEL INFO DEBUG/INFO/WARNING/ERROR

Arborescence des fichiers

.
└── mqtt-bridge
    ├── bridge.py
    ├── docker-compose.yml
    ├── Dockerfile
    ├── mosquitto
    │   ├── client-certs
    │   │   ├── client.crt
    │   │   ├── client.csr
    │   │   └── client.key
    │   ├── clients-certs.sh
    │   ├── config
    │   │   ├── mosquitto.conf
    │   │   └── mosquitto.conf.bak
    │   ├── server-certs
    │   │   ├── ca.crt
    │   │   ├── ca.key
    │   │   ├── ca.srl
    │   │   ├── server.crt
    │   │   ├── server.csr
    │   │   └── server.key
    │   ├── server-certs.sh
    │   └── v3.ext
    ├── README.md
    └── requirements.txt

Programme complet

import json
import logging
import re
from datetime import datetime

import paho.mqtt.client as mqtt
import mysql.connector
from mysql.connector import Error

# --- Configuration ---
MQTT_BROKER = "mosquitto"
MQTT_PORT = 1883
MQTT_TOPIC = "serre/+/bac/+"

DB_CONFIG = {
    "host": "mosquitto_db",
    "database": "parc",
    "user": "root",         #changer le mot de passe et l'utilisateur pour la production
    "password": "root",
}

SENSOR_MAP = {
    "humiditeAmb": 1,   # ambiantMoisture
    "tempAmb":     2,   # ambiantTemperature
    "humiditeSol": 3,   # soilMoisture
}

TOPIC_PATTERN = re.compile(r"^serre/(\d+)/bac/(\d+)$")

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
)


# --- DB helpers ---

def get_connection():
    return mysql.connector.connect(**DB_CONFIG)


def get_bac_id(cursor, serre_numero: int, bac_numero: int):
    cursor.execute(
        """
        SELECT b.id FROM bac b
        JOIN serre s ON b.serre = s.id
        WHERE s.numero = %s AND b.numero = %s
        LIMIT 1
        """,
        (serre_numero, bac_numero),
    )
    row = cursor.fetchone()
    return row[0] if row else None


def insert_mesure(cursor, bac_id: int, capteur_id: int, value: float):
    cursor.execute(
        """
        INSERT INTO mesure (bac, value, mesure_a, capteur)
        VALUES (%s, %s, %s, %s)
        """,
        (bac_id, value, datetime.now(), capteur_id),
    )


def insert_error(cursor, error_type: str, message: str, value: str):
    cursor.execute(
        """
        INSERT INTO error (type_erreur, message, valeur, erreur_a)
        VALUES (%s, %s, %s, %s)
        """,
        (error_type, message, value, datetime.now()),
    )


def log_and_store_error(conn, error_type: str, message: str, value: str):
    logging.error("error: %s, value: %s", message, value)
    try:
        cursor = conn.cursor()
        insert_error(cursor, error_type, message, value)
        conn.commit()
        cursor.close()
    except Error as db_err:
        logging.error("Failed to write error to DB: %s", db_err)


# --- MQTT callbacks ---

def on_connect(client, userdata, flags, rc):
    if rc == 0:
        logging.info("Connected to MQTT broker, subscribing to %s", MQTT_TOPIC)
        client.subscribe(MQTT_TOPIC)
    else:
        logging.error("MQTT connection failed with code %d", rc)


def on_message(client, userdata, msg):
    conn = userdata["conn"]
    topic = msg.topic

    # Parse topic
    match = TOPIC_PATTERN.match(topic)
    if not match:
        logging.warning("Unexpected topic format: %s", topic)
        return

    serre_numero = int(match.group(1))
    bac_numero   = int(match.group(2))

    # Parse payload
    try:
        payload = json.loads(msg.payload.decode("utf-8"))
    except (json.JSONDecodeError, UnicodeDecodeError) as e:
        log_and_store_error(
            conn,
            "INVALID_PAYLOAD",
            f"Failed to parse JSON on topic {topic}: {e}",
            msg.payload.decode("utf-8", errors="replace"),
        )
        return

    cursor = conn.cursor()
    try:
        # Resolve bac
        bac_id = get_bac_id(cursor, serre_numero, bac_numero)
        if bac_id is None:
            log_and_store_error(
                conn,
                "BAC_NOT_FOUND",
                f"No bac found for serre {serre_numero} bac {bac_numero}",
                str(payload),
            )
            return

        # Process each key in the payload
        for key, value in payload.items():
            capteur_id = SENSOR_MAP.get(key)
            if capteur_id is None:
                log_and_store_error(
                    conn,
                    "UNKNOWN_SENSOR",
                    f"Unknown sensor type '{key}' on topic {topic}",
                    str(value),
                )
                continue

            insert_mesure(cursor, bac_id, capteur_id, float(value))
            logging.info(
                "Inserted mesure: serre=%d bac=%d capteur=%d value=%s",
                serre_numero, bac_numero, capteur_id, value,
            )

        conn.commit()

    except Error as db_err:
        logging.error("DB error while processing message: %s", db_err)
        conn.rollback()
    finally:
        cursor.close()


# --- Entry point ---

def main():
    try:
        conn = get_connection()
        logging.info("Connected to database")
    except Error as e:
        logging.critical("Cannot connect to database: %s", e)
        raise SystemExit(1)

    client = mqtt.Client(userdata={"conn": conn})
    client.on_connect = on_connect
    client.on_message = on_message

    try:
        client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60)
        client.loop_forever()
    except KeyboardInterrupt:
        logging.info("Shutting down")
    finally:
        conn.close()
        client.disconnect()


if __name__ == "__main__":
    main()