Pobieramy z serwera API dane o zanieczyszczeniu powietrza używając RPZW i wysyłamy do lokalnego serwera MQTT na RPZ2W

Projekt air_mqtt_rpzw to lekki skrypt działający na Raspberry Pi Zero W, który cyklicznie pobiera dane jakości powietrza z publicznego API GIOŚ, wybiera najważniejsze wskaźniki jakości powietrza i wysyła je do brokera MQTT w Twojej lokalnej sieci. Skrypt działa jako usługa systemowa (systemd), automatycznie uruchamia się po starcie systemu i reaguje miganiem diody LED:
– pojedynczy błysk sygnalizuje udany odczyt,
– pięć szybkich błysków oznacza błąd pobierania lub połączenia.

Projekt jest przeznaczony do integracji z systemem IoT (np. Home Assistant, Node-RED, EPD, wyświetlacze MQTT itd.) i ma być możliwie prosty, niezawodny i energooszczędny.

SMOG
Obrazek AI Gemini, prompt: Narysuj obrazek nowoczesnego, zapylonego miasta ze smogiem w formie ilustracji komiksowej.


Oto krótki tutorial pokazujący tworzenie projektu air_mqtt_rpzw na Raspberry Pi Zero W. Bardzo pomógł mi w tym ChatGPT wersja szybka.

1. Utwórz katalog projektu

Zaloguj się na RPZW i przejdź do katalogu domowego:

cd ~
mkdir -p pi-project
cd pi-project

2. Utwórz strukturę katalogów

Zorganizuj projekt w logiczne podkatalogi:

mkdir config
mkdir systemd
  • config – pliki konfiguracyjne (np. config.json)
  • systemd – pliki usług systemd

3. Przygotuj plik konfiguracyjny

W katalogu config utwórz plik config.json z ustawieniami:

  • ID stacji pomiarowej
  • Adres serwera MQTT
  • Temat MQTT
  • Interwał pobierania danych
  • Numer pinu LED

Przykładowy plik konfiguracyjny:

{
    "STATION_ID": 732,
    "MQTT_SERVER": "192.168.1.199",
    "MQTT_TOPIC": "city/air",
    "INTERVAL": 600,        // 600 sekund = 10 minut
    "LED_PIN": 17
}

4. Utwórz główny skrypt

W katalogu głównym pi-project utwórz skrypt air_mqtt_rpzw.py:

  • import bibliotek: requests, paho-mqtt, RPi.GPIO, time, json, os
  • wczytywanie konfiguracji z config.json
  • konfiguracja GPIO dla diody
  • funkcje: pobranie danych z API, filtrowanie danych, wysyłanie do MQTT, miganie diodą
  • główna pętla z obsługą wyjątków, z interwałem zdefiniowanym w konfiguracji
  • sprzątanie GPIO przy zakończeniu

Przykładowy skrypt: (edytowany 11.12.2025 Dodane sprawdzanie połączenia z WiFi, w razie braku próba ponownego łączenia. Po wyłączeniu zasilania z routera raspberry nie powstał stąd dodana funkcja)

import requests
import json
import paho.mqtt.client as mqtt
import time
import RPi.GPIO as GPIO
import os
import subprocess

# ===== Wczytanie konfiguracji =====
CONFIG_FILE = os.path.join(os.path.dirname(__file__), "config", "config.json")

with open(CONFIG_FILE, "r", encoding="utf-8") as f:
    config = json.load(f)

STATION_ID = config.get("STATION_ID", 732)
MQTT_SERVER = config.get("MQTT_SERVER", "192.168.1.199")
MQTT_TOPIC = config.get("MQTT_TOPIC", "city/air")
INTERVAL = config.get("INTERVAL", 600)  # w sekundach
LED_PIN = config.get("LED_PIN", 17)

# ===== Konfiguracja GPIO =====
GPIO.setmode(GPIO.BCM)
GPIO.setup(LED_PIN, GPIO.OUT)

# PWM – stałe świecenie 30%
led_pwm = GPIO.PWM(LED_PIN, 1000)   # 1 kHz
led_pwm.start(0)  # zaczynamy od zgaszonej

# ===== Funkcje =====

def wifi_ok():
    """Sprawdza, czy Raspberry jest połączone z Wi-Fi."""
    try:
        out = subprocess.check_output(["iwgetid"], stderr=subprocess.STDOUT).decode()
        return "ESSID" in out
    except:
        return False

def wifi_reconnect():
    """Restart interfejsu wlan0 w razie utraty Wi-Fi."""
    print("Restart WiFi...")
    os.system("sudo ifconfig wlan0 down")
    time.sleep(3)
    os.system("sudo ifconfig wlan0 up")
    time.sleep(5)

def fetch_air_quality():
    url = f"https://api.gios.gov.pl/pjp-api/v1/rest/aqindex/getIndex/{STATION_ID}"
    r = requests.get(url, timeout=8)
    r.raise_for_status()
    return r.json()

def send_to_mqtt(data):
    client = mqtt.Client()
    client.connect(MQTT_SERVER)
    payload = json.dumps(data, ensure_ascii=False)
    client.publish(MQTT_TOPIC, payload)
    client.disconnect()
    print("Dane wysłane do MQTT:", payload)

def get_filtered_data():
    data = fetch_air_quality()
    aq = data.get("AqIndex", {})

    # klucze do pobrania
    keys = [
        "Data wykonania obliczeń indeksu",
        "Nazwa kategorii indeksu",
        "Nazwa kategorii indeksu dla wskażnika PM2.5",
        "Nazwa kategorii indeksu dla wskażnika PM10",
        "Nazwa kategorii indeksu dla wskażnika O3"
    ]

    # pobranie danych bez KeyError
    filtered = {key: aq.get(key, "Brak danych") for key in keys}
    return filtered

# ===== Główna pętla =====
try:
    while True:

        # ---- Monitoring WiFi ----
        if not wifi_ok():
            print("WiFi rozłączone – próbuję połączyć ponownie...")
            led_pwm.ChangeDutyCycle(0)  # LED zgaszony = problem

            wifi_reconnect()
            time.sleep(10)
            continue  # wróć do pętli, nie próbuj pobierać danych

        # ---- Główna logika ----
        try:
            filtered_data = get_filtered_data()
            print("Dane do wysłania:", filtered_data)

            send_to_mqtt(filtered_data)

            # LED świeci, bo jest OK  (pwm na 3, bo dioda zielona jaśniej świeci)
            led_pwm.ChangeDutyCycle(3)

        except requests.RequestException as e:
            print("Błąd pobierania danych:", e)
            led_pwm.ChangeDutyCycle(0)  # problem = wyłącz LED

        except Exception as e:
            print("Inny błąd:", e)
            led_pwm.ChangeDutyCycle(0)  # problem = wyłącz LED

        time.sleep(INTERVAL)

finally:
    led_pwm.stop()
    GPIO.cleanup()

5. Zainstaluj wymagane pakiety przez apt

sudo apt update
sudo apt install -y python3-requests python3-paho-mqtt python3-rpi.gpio

Nie używamy pip3, aby nie było problemów z wersjami bibliotek.

6. Utwórz plik systemd

W katalogu systemd utwórz plik air_mqtt.service z konfiguracją:

  • nazwa usługi
  • użytkownik, katalog roboczy
  • ścieżka do skryptu
  • restart po awarii
  • logowanie do journal

Przykładowy plik systemowy:

[Unit]
Description=Air Quality MQTT Script for RPZW
After=network.target

[Service]
User=minimj
WorkingDirectory=/home/minimj/pi-project
ExecStart=/usr/bin/python3 /home/minimj/pi-project/air_mqtt_rpzw.py
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal

[Install]
WantedBy=multi-user.target

7. Skopiuj plik systemd do katalogu systemowego

sudo cp ~/pi-project/systemd/air_mqtt.service /etc/systemd/system/
sudo systemctl daemon-reload
sudo systemctl enable air_mqtt.service

8. Uruchom i sprawdź działanie

  • Start usługi:
sudo systemctl start air_mqtt.service
  • Sprawdzenie statusu:
sudo systemctl status air_mqtt.service
sudo journalctl -u air_mqtt.service -f
  • Dioda LED miga po pobraniu danych (1 błysk = sukces, 5 błysków = problem).
  • Dane są wysyłane do brokera MQTT.

9. Testowanie interwału

  • Interwał pobierania i wysyłki ustawiany jest w pliku config.json
  • Po zmianie interwału zrestartuj usługę:
sudo systemctl restart air_mqtt.service

Dodaj komentarz