02 - Application "pont" BDD
Parc
– MQTT
→ App "pont" 
Un programme fiable et efficace qui fait le pont entre le broker MQTT Mosquitto et la base de données MySQL du projet.
Programme génére à l'aide de ClaudeCode ![]()
Architecture
[IoT sensors / controleurs]
│ MQTT serre/X/bac/Y
▼
┌─────────────┐ ┌──────────────────────┐
│ Mosquitto │ ───────▶│ bridge.py (Python) │ ──▶ MySQL (external)
│ (broker) │ │ subscribe + persist │
└─────────────┘ └──────────────────────┘
Docker container Docker container
Les 2 conteneurs partages le même réseau docker (parc-net).
Le serveur MySQL n'est pas géré sur cette machine.
Application docker
Lors du lancement du docker compose, l'application est construite à partir du DockerFile suivant.
FROM python:3.12-slim
# Dépendences système (mise à jours + installation de pip)
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc \
default-libmysqlclient-dev \
pkg-config \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY bridge.py .
# Lancé en non-root
RUN useradd -r -u 1001 bridge
USER bridge
CMD ["python", "-u", "bridge.py"]
MQTT topics
| Format topic MQTT | Description |
|---|---|
serre/<x>/bac/<y> |
Mesure du bac y dans la serre x |
Format du payload (JSON)
{ "temperatureAmbiante": 28.5 }
{ "humiditeAmbiante": 65.3, "temperatureAmbiante": 22.1 }
{ "humiditeAmbiante": 18.7, "temperatureAmbiante": 23, "humiditeSol": 58.6 }
Chaque payload peut contenir une seule ou plusieurs mesures.
Capteurs
| Clé JSON | id_capteur |
|---|---|
humiditeAmbiante |
1 |
humiditeSol |
2 |
temperatureAmbiante |
3 |
Support des erreurs
L'intégralité des erreurs sont enregistré dans la table error avec chacune une valeure dans la table type_erreur:
| Code | Déclencheur |
|---|---|
BAC_NOT_FOUND |
Numéro de bac ou serre non présent en base de données |
UNKNOWN_SENSOR |
La clé du capteur n'est pas dans la liste connu |
INVALID_PAYLOAD |
Erreur de format du JSON ou contenu du payload incorrect |
INVALID_VALUE |
Valeur non numérique reçu |
VALUE_OUT_OF_RANGE |
Valeur hors limite des valeurs min/max du capteur |
INVALID_ENCODING |
Non-UTF-8 payload |
MQTT_DISCONNECT |
Déconnexion du broker |
DB_ERROR |
Communications avec la base de données échoué |
DB_INSERT_ERROR |
Echec d'insertion d'une mesure en base de donnée |
UNEXPECTED_ERROR |
Erreur générale (cas non prévu) |
Quand une valeur est hors plage du capteur, le minimum / maximum est écrit en base de donné et un ligne d'erreur est inséré.
Variables d'environment
| Variable | valeur par défaut | Description |
|---|---|---|
DB_HOST |
(requis) | hébergeur MySQL |
DB_PORT |
3306 |
port du serveur MySQL |
DB_NAME |
parc |
nom de la base de données |
DB_USER |
(requis) | utilisateur MySQL |
DB_PASSWORD |
(requis) | mot de passe MySQL |
MQTT_HOST |
mosquitto |
nom de l'hébergeur Broker |
MQTT_PORT |
1883 |
port du Broker |
MQTT_KEEPALIVE |
60 |
MQTT keepalive (seconde) |
MQTT_USER |
(vide) | utilisateur du Broker (optionel) |
MQTT_PASSWORD |
(vide) | mot de passe du Broker (optionel) |
DB_RETRY_DELAY |
5 |
secondes entre les essais d'écriture en bdd |
MQTT_RETRY_DELAY |
5 |
secondes entre les essais MQTT |
LOG_LEVEL |
INFO |
DEBUG/INFO/WARNING/ERROR |
Arborescence des fichiers
.
└── mqtt-bridge
├── bridge.py
├── docker-compose.yml
├── Dockerfile
├── mosquitto
│ ├── client-certs
│ │ ├── client.crt
│ │ ├── client.csr
│ │ └── client.key
│ ├── clients-certs.sh
│ ├── config
│ │ ├── mosquitto.conf
│ │ └── mosquitto.conf.bak
│ ├── server-certs
│ │ ├── ca.crt
│ │ ├── ca.key
│ │ ├── ca.srl
│ │ ├── server.crt
│ │ ├── server.csr
│ │ └── server.key
│ ├── server-certs.sh
│ └── v3.ext
├── README.md
└── requirements.txt
Programme complet
import json
import logging
import re
from datetime import datetime
import paho.mqtt.client as mqtt
import mysql.connector
from mysql.connector import Error
# --- Configuration ---
MQTT_BROKER = "mosquitto"
MQTT_PORT = 1883
MQTT_TOPIC = "serre/+/bac/+"
DB_CONFIG = {
"host": "mosquitto_db",
"database": "parc",
"user": "root", #changer le mot de passe et l'utilisateur pour la production
"password": "root",
}
SENSOR_MAP = {
"humiditeAmb": 1, # ambiantMoisture
"tempAmb": 2, # ambiantTemperature
"humiditeSol": 3, # soilMoisture
}
TOPIC_PATTERN = re.compile(r"^serre/(\d+)/bac/(\d+)$")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
# --- DB helpers ---
def get_connection():
return mysql.connector.connect(**DB_CONFIG)
def get_bac_id(cursor, serre_numero: int, bac_numero: int):
cursor.execute(
"""
SELECT b.id FROM bac b
JOIN serre s ON b.serre = s.id
WHERE s.numero = %s AND b.numero = %s
LIMIT 1
""",
(serre_numero, bac_numero),
)
row = cursor.fetchone()
return row[0] if row else None
def insert_mesure(cursor, bac_id: int, capteur_id: int, value: float):
cursor.execute(
"""
INSERT INTO mesure (bac, value, mesure_a, capteur)
VALUES (%s, %s, %s, %s)
""",
(bac_id, value, datetime.now(), capteur_id),
)
def insert_error(cursor, error_type: str, message: str, value: str):
cursor.execute(
"""
INSERT INTO error (type_erreur, message, valeur, erreur_a)
VALUES (%s, %s, %s, %s)
""",
(error_type, message, value, datetime.now()),
)
def log_and_store_error(conn, error_type: str, message: str, value: str):
logging.error("error: %s, value: %s", message, value)
try:
cursor = conn.cursor()
insert_error(cursor, error_type, message, value)
conn.commit()
cursor.close()
except Error as db_err:
logging.error("Failed to write error to DB: %s", db_err)
# --- MQTT callbacks ---
def on_connect(client, userdata, flags, rc):
if rc == 0:
logging.info("Connected to MQTT broker, subscribing to %s", MQTT_TOPIC)
client.subscribe(MQTT_TOPIC)
else:
logging.error("MQTT connection failed with code %d", rc)
def on_message(client, userdata, msg):
conn = userdata["conn"]
topic = msg.topic
# Parse topic
match = TOPIC_PATTERN.match(topic)
if not match:
logging.warning("Unexpected topic format: %s", topic)
return
serre_numero = int(match.group(1))
bac_numero = int(match.group(2))
# Parse payload
try:
payload = json.loads(msg.payload.decode("utf-8"))
except (json.JSONDecodeError, UnicodeDecodeError) as e:
log_and_store_error(
conn,
"INVALID_PAYLOAD",
f"Failed to parse JSON on topic {topic}: {e}",
msg.payload.decode("utf-8", errors="replace"),
)
return
cursor = conn.cursor()
try:
# Resolve bac
bac_id = get_bac_id(cursor, serre_numero, bac_numero)
if bac_id is None:
log_and_store_error(
conn,
"BAC_NOT_FOUND",
f"No bac found for serre {serre_numero} bac {bac_numero}",
str(payload),
)
return
# Process each key in the payload
for key, value in payload.items():
capteur_id = SENSOR_MAP.get(key)
if capteur_id is None:
log_and_store_error(
conn,
"UNKNOWN_SENSOR",
f"Unknown sensor type '{key}' on topic {topic}",
str(value),
)
continue
insert_mesure(cursor, bac_id, capteur_id, float(value))
logging.info(
"Inserted mesure: serre=%d bac=%d capteur=%d value=%s",
serre_numero, bac_numero, capteur_id, value,
)
conn.commit()
except Error as db_err:
logging.error("DB error while processing message: %s", db_err)
conn.rollback()
finally:
cursor.close()
# --- Entry point ---
def main():
try:
conn = get_connection()
logging.info("Connected to database")
except Error as e:
logging.critical("Cannot connect to database: %s", e)
raise SystemExit(1)
client = mqtt.Client(userdata={"conn": conn})
client.on_connect = on_connect
client.on_message = on_message
try:
client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60)
client.loop_forever()
except KeyboardInterrupt:
logging.info("Shutting down")
finally:
conn.close()
client.disconnect()
if __name__ == "__main__":
main()