Projekt air_mqtt_rpzw to lekki skrypt działający na Raspberry Pi Zero W, który cyklicznie pobiera dane jakości powietrza z publicznego API GIOŚ, wybiera najważniejsze wskaźniki jakości powietrza i wysyła je do brokera MQTT w Twojej lokalnej sieci. Skrypt działa jako usługa systemowa (systemd), automatycznie uruchamia się po starcie systemu i reaguje miganiem diody LED:
– pojedynczy błysk sygnalizuje udany odczyt,
– pięć szybkich błysków oznacza błąd pobierania lub połączenia.
Projekt jest przeznaczony do integracji z systemem IoT (np. Home Assistant, Node-RED, EPD, wyświetlacze MQTT itd.) i ma być możliwie prosty, niezawodny i energooszczędny.

Oto krótki tutorial pokazujący tworzenie projektu air_mqtt_rpzw na Raspberry Pi Zero W. Bardzo pomógł mi w tym ChatGPT wersja szybka.
1. Utwórz katalog projektu
Zaloguj się na RPZW i przejdź do katalogu domowego:
cd ~
mkdir -p pi-project
cd pi-project
2. Utwórz strukturę katalogów
Zorganizuj projekt w logiczne podkatalogi:
mkdir config
mkdir systemd
config– pliki konfiguracyjne (np.config.json)systemd– pliki usług systemd
3. Przygotuj plik konfiguracyjny
W katalogu config utwórz plik config.json z ustawieniami:
- ID stacji pomiarowej
- Adres serwera MQTT
- Temat MQTT
- Interwał pobierania danych
- Numer pinu LED
Przykładowy plik konfiguracyjny:
{
"STATION_ID": 732,
"MQTT_SERVER": "192.168.1.199",
"MQTT_TOPIC": "city/air",
"INTERVAL": 600, // 600 sekund = 10 minut
"LED_PIN": 17
}
4. Utwórz główny skrypt
W katalogu głównym pi-project utwórz skrypt air_mqtt_rpzw.py:
- import bibliotek:
requests,paho-mqtt,RPi.GPIO,time,json,os - wczytywanie konfiguracji z
config.json - konfiguracja GPIO dla diody
- funkcje: pobranie danych z API, filtrowanie danych, wysyłanie do MQTT, miganie diodą
- główna pętla z obsługą wyjątków, z interwałem zdefiniowanym w konfiguracji
- sprzątanie GPIO przy zakończeniu
Przykładowy skrypt: (edytowany 11.12.2025 Dodane sprawdzanie połączenia z WiFi, w razie braku próba ponownego łączenia. Po wyłączeniu zasilania z routera raspberry nie powstał stąd dodana funkcja)
import requests
import json
import paho.mqtt.client as mqtt
import time
import RPi.GPIO as GPIO
import os
import subprocess
# ===== Wczytanie konfiguracji =====
CONFIG_FILE = os.path.join(os.path.dirname(__file__), "config", "config.json")
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
config = json.load(f)
STATION_ID = config.get("STATION_ID", 732)
MQTT_SERVER = config.get("MQTT_SERVER", "192.168.1.199")
MQTT_TOPIC = config.get("MQTT_TOPIC", "city/air")
INTERVAL = config.get("INTERVAL", 600) # w sekundach
LED_PIN = config.get("LED_PIN", 17)
# ===== Konfiguracja GPIO =====
GPIO.setmode(GPIO.BCM)
GPIO.setup(LED_PIN, GPIO.OUT)
# PWM – stałe świecenie 30%
led_pwm = GPIO.PWM(LED_PIN, 1000) # 1 kHz
led_pwm.start(0) # zaczynamy od zgaszonej
# ===== Funkcje =====
def wifi_ok():
"""Sprawdza, czy Raspberry jest połączone z Wi-Fi."""
try:
out = subprocess.check_output(["iwgetid"], stderr=subprocess.STDOUT).decode()
return "ESSID" in out
except:
return False
def wifi_reconnect():
"""Restart interfejsu wlan0 w razie utraty Wi-Fi."""
print("Restart WiFi...")
os.system("sudo ifconfig wlan0 down")
time.sleep(3)
os.system("sudo ifconfig wlan0 up")
time.sleep(5)
def fetch_air_quality():
url = f"https://api.gios.gov.pl/pjp-api/v1/rest/aqindex/getIndex/{STATION_ID}"
r = requests.get(url, timeout=8)
r.raise_for_status()
return r.json()
def send_to_mqtt(data):
client = mqtt.Client()
client.connect(MQTT_SERVER)
payload = json.dumps(data, ensure_ascii=False)
client.publish(MQTT_TOPIC, payload)
client.disconnect()
print("Dane wysłane do MQTT:", payload)
def get_filtered_data():
data = fetch_air_quality()
aq = data.get("AqIndex", {})
# klucze do pobrania
keys = [
"Data wykonania obliczeń indeksu",
"Nazwa kategorii indeksu",
"Nazwa kategorii indeksu dla wskażnika PM2.5",
"Nazwa kategorii indeksu dla wskażnika PM10",
"Nazwa kategorii indeksu dla wskażnika O3"
]
# pobranie danych bez KeyError
filtered = {key: aq.get(key, "Brak danych") for key in keys}
return filtered
# ===== Główna pętla =====
try:
while True:
# ---- Monitoring WiFi ----
if not wifi_ok():
print("WiFi rozłączone – próbuję połączyć ponownie...")
led_pwm.ChangeDutyCycle(0) # LED zgaszony = problem
wifi_reconnect()
time.sleep(10)
continue # wróć do pętli, nie próbuj pobierać danych
# ---- Główna logika ----
try:
filtered_data = get_filtered_data()
print("Dane do wysłania:", filtered_data)
send_to_mqtt(filtered_data)
# LED świeci, bo jest OK (pwm na 3, bo dioda zielona jaśniej świeci)
led_pwm.ChangeDutyCycle(3)
except requests.RequestException as e:
print("Błąd pobierania danych:", e)
led_pwm.ChangeDutyCycle(0) # problem = wyłącz LED
except Exception as e:
print("Inny błąd:", e)
led_pwm.ChangeDutyCycle(0) # problem = wyłącz LED
time.sleep(INTERVAL)
finally:
led_pwm.stop()
GPIO.cleanup()
5. Zainstaluj wymagane pakiety przez apt
sudo apt update
sudo apt install -y python3-requests python3-paho-mqtt python3-rpi.gpio
Nie używamy pip3, aby nie było problemów z wersjami bibliotek.
6. Utwórz plik systemd
W katalogu systemd utwórz plik air_mqtt.service z konfiguracją:
- nazwa usługi
- użytkownik, katalog roboczy
- ścieżka do skryptu
- restart po awarii
- logowanie do journal
Przykładowy plik systemowy:
[Unit]
Description=Air Quality MQTT Script for RPZW
After=network.target
[Service]
User=minimj
WorkingDirectory=/home/minimj/pi-project
ExecStart=/usr/bin/python3 /home/minimj/pi-project/air_mqtt_rpzw.py
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
7. Skopiuj plik systemd do katalogu systemowego
sudo cp ~/pi-project/systemd/air_mqtt.service /etc/systemd/system/
sudo systemctl daemon-reload
sudo systemctl enable air_mqtt.service
8. Uruchom i sprawdź działanie
- Start usługi:
sudo systemctl start air_mqtt.service
- Sprawdzenie statusu:
sudo systemctl status air_mqtt.service
sudo journalctl -u air_mqtt.service -f
- Dioda LED miga po pobraniu danych (1 błysk = sukces, 5 błysków = problem).
- Dane są wysyłane do brokera MQTT.
9. Testowanie interwału
- Interwał pobierania i wysyłki ustawiany jest w pliku
config.json - Po zmianie interwału zrestartuj usługę:
sudo systemctl restart air_mqtt.service