Projekt air_mqtt_rpzw to lekki skrypt działający na Raspberry Pi Zero W, który cyklicznie pobiera dane jakości powietrza z publicznego API GIOŚ, wybiera najważniejsze wskaźniki jakości powietrza i wysyła je do brokera MQTT w Twojej lokalnej sieci. Skrypt działa jako usługa systemowa (systemd), automatycznie uruchamia się po starcie systemu i reaguje miganiem diody LED:
– pojedynczy błysk sygnalizuje udany odczyt,
– pięć szybkich błysków oznacza błąd pobierania lub połączenia.
Projekt jest przeznaczony do integracji z systemem IoT (np. Home Assistant, Node-RED, EPD, wyświetlacze MQTT itd.) i ma być możliwie prosty, niezawodny i energooszczędny.

Oto krótki tutorial pokazujący tworzenie projektu air_mqtt_rpzw na Raspberry Pi Zero W. Bardzo pomógł mi w tym ChatGPT wersja szybka.
1. Utwórz katalog projektu
Zaloguj się na RPZW i przejdź do katalogu domowego:
cd ~
mkdir -p pi-project
cd pi-project
2. Utwórz strukturę katalogów
Zorganizuj projekt w logiczne podkatalogi:
mkdir config
mkdir systemd
config– pliki konfiguracyjne (np.config.json)systemd– pliki usług systemd
3. Przygotuj plik konfiguracyjny
W katalogu config utwórz plik config.json z ustawieniami:
- ID stacji pomiarowej
- Adres serwera MQTT
- Temat MQTT
- Interwał pobierania danych
- Numer pinu LED
Przykładowy plik konfiguracyjny:
{
"STATION_ID": 732,
"MQTT_SERVER": "192.168.1.199",
"MQTT_TOPIC": "city/air",
"INTERVAL": 600, // 600 sekund = 10 minut
"LED_PIN": 17
}
4. Utwórz główny skrypt
W katalogu głównym pi-project utwórz skrypt air_mqtt_rpzw.py:
- import bibliotek:
requests,paho-mqtt,RPi.GPIO,time,json,os - wczytywanie konfiguracji z
config.json - konfiguracja GPIO dla diody
- funkcje: pobranie danych z API, filtrowanie danych, wysyłanie do MQTT, miganie diodą
- główna pętla z obsługą wyjątków, z interwałem zdefiniowanym w konfiguracji
- sprzątanie GPIO przy zakończeniu
Przykładowy skrypt:
import requests
import json
import paho.mqtt.client as mqtt
import time
import RPi.GPIO as GPIO
import os
# ===== Wczytanie konfiguracji =====
CONFIG_FILE = os.path.join(os.path.dirname(__file__), "config", "config.json")
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
config = json.load(f)
STATION_ID = config.get("STATION_ID", 732)
MQTT_SERVER = config.get("MQTT_SERVER", "192.168.1.199")
MQTT_TOPIC = config.get("MQTT_TOPIC", "city/air")
INTERVAL = config.get("INTERVAL", 600) # w sekundach
LED_PIN = config.get("LED_PIN", 17)
# ===== Konfiguracja GPIO =====
GPIO.setmode(GPIO.BCM)
GPIO.setup(LED_PIN, GPIO.OUT)
def blink_led(times, duration=0.2):
"""Błyśnięcie diodą: times razy"""
for _ in range(times):
GPIO.output(LED_PIN, GPIO.HIGH)
time.sleep(duration)
GPIO.output(LED_PIN, GPIO.LOW)
time.sleep(duration)
# ===== Funkcje =====
def fetch_air_quality():
url = f"https://api.gios.gov.pl/pjp-api/v1/rest/aqindex/getIndex/{STATION_ID}"
r = requests.get(url, timeout=5)
r.raise_for_status()
return r.json()
def send_to_mqtt(data):
client = mqtt.Client()
client.connect(MQTT_SERVER)
payload = json.dumps(data, ensure_ascii=False)
client.publish(MQTT_TOPIC, payload)
client.disconnect()
print("Dane wysłane do MQTT:", payload)
def get_filtered_data():
data = fetch_air_quality()
aq = data.get("AqIndex", {})
# klucze do pobrania
keys = [
"Data wykonania obliczeń indeksu",
"Nazwa kategorii indeksu",
"Nazwa kategorii indeksu dla wskażnika PM2.5",
"Nazwa kategorii indeksu dla wskażnika PM10",
"Nazwa kategorii indeksu dla wskażnika O3"
]
# pobranie danych bez KeyError
filtered = {key: aq.get(key, "Brak danych") for key in keys}
return filtered
# ===== Główna pętla =====
try:
while True:
try:
filtered_data = get_filtered_data()
print("Dane do wysłania:", filtered_data)
send_to_mqtt(filtered_data)
blink_led(1) # 1 błysk = sukces
except requests.RequestException as e:
print("Błąd pobierania danych:", e)
blink_led(5) # 5 błysków = problem z API
except Exception as e:
print("Inny błąd:", e)
blink_led(5) # 5 błysków = inny błąd
time.sleep(INTERVAL)
finally:
GPIO.cleanup()
5. Zainstaluj wymagane pakiety przez apt
sudo apt update
sudo apt install -y python3-requests python3-paho-mqtt python3-rpi.gpio
Nie używamy pip3, aby nie było problemów z wersjami bibliotek.
6. Utwórz plik systemd
W katalogu systemd utwórz plik air_mqtt.service z konfiguracją:
- nazwa usługi
- użytkownik, katalog roboczy
- ścieżka do skryptu
- restart po awarii
- logowanie do journal
Przykładowy plik systemowy:
[Unit]
Description=Air Quality MQTT Script for RPZW
After=network.target
[Service]
User=minimj
WorkingDirectory=/home/minimj/pi-project
ExecStart=/usr/bin/python3 /home/minimj/pi-project/air_mqtt_rpzw.py
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
7. Skopiuj plik systemd do katalogu systemowego
sudo cp ~/pi-project/systemd/air_mqtt.service /etc/systemd/system/
sudo systemctl daemon-reload
sudo systemctl enable air_mqtt.service
8. Uruchom i sprawdź działanie
- Start usługi:
sudo systemctl start air_mqtt.service
- Sprawdzenie statusu:
sudo systemctl status air_mqtt.service
sudo journalctl -u air_mqtt.service -f
- Dioda LED miga po pobraniu danych (1 błysk = sukces, 5 błysków = problem).
- Dane są wysyłane do brokera MQTT.
9. Testowanie interwału
- Interwał pobierania i wysyłki ustawiany jest w pliku
config.json - Po zmianie interwału zrestartuj usługę:
sudo systemctl restart air_mqtt.service