Pobieramy z serwera API dane o zanieczyszczeniu powietrza używając RPZW i wysyłamy do lokalnego serwera MQTT na RPZ2W

Projekt air_mqtt_rpzw to lekki skrypt działający na Raspberry Pi Zero W, który cyklicznie pobiera dane jakości powietrza z publicznego API GIOŚ, wybiera najważniejsze wskaźniki jakości powietrza i wysyła je do brokera MQTT w Twojej lokalnej sieci. Skrypt działa jako usługa systemowa (systemd), automatycznie uruchamia się po starcie systemu i reaguje miganiem diody LED:
– pojedynczy błysk sygnalizuje udany odczyt,
– pięć szybkich błysków oznacza błąd pobierania lub połączenia.

Projekt jest przeznaczony do integracji z systemem IoT (np. Home Assistant, Node-RED, EPD, wyświetlacze MQTT itd.) i ma być możliwie prosty, niezawodny i energooszczędny.

SMOG
Obrazek AI Gemini, prompt: Narysuj obrazek nowoczesnego, zapylonego miasta ze smogiem w formie ilustracji komiksowej.


Oto krótki tutorial pokazujący tworzenie projektu air_mqtt_rpzw na Raspberry Pi Zero W. Bardzo pomógł mi w tym ChatGPT wersja szybka.

1. Utwórz katalog projektu

Zaloguj się na RPZW i przejdź do katalogu domowego:

cd ~
mkdir -p pi-project
cd pi-project

2. Utwórz strukturę katalogów

Zorganizuj projekt w logiczne podkatalogi:

mkdir config
mkdir systemd
  • config – pliki konfiguracyjne (np. config.json)
  • systemd – pliki usług systemd

3. Przygotuj plik konfiguracyjny

W katalogu config utwórz plik config.json z ustawieniami:

  • ID stacji pomiarowej
  • Adres serwera MQTT
  • Temat MQTT
  • Interwał pobierania danych
  • Numer pinu LED

Przykładowy plik konfiguracyjny:

{
    "STATION_ID": 732,
    "MQTT_SERVER": "192.168.1.199",
    "MQTT_TOPIC": "city/air",
    "INTERVAL": 600,        // 600 sekund = 10 minut
    "LED_PIN": 17
}

4. Utwórz główny skrypt

W katalogu głównym pi-project utwórz skrypt air_mqtt_rpzw.py:

  • import bibliotek: requests, paho-mqtt, RPi.GPIO, time, json, os
  • wczytywanie konfiguracji z config.json
  • konfiguracja GPIO dla diody
  • funkcje: pobranie danych z API, filtrowanie danych, wysyłanie do MQTT, miganie diodą
  • główna pętla z obsługą wyjątków, z interwałem zdefiniowanym w konfiguracji
  • sprzątanie GPIO przy zakończeniu

Przykładowy skrypt:

import requests
import json
import paho.mqtt.client as mqtt
import time
import RPi.GPIO as GPIO
import os

# ===== Wczytanie konfiguracji =====
CONFIG_FILE = os.path.join(os.path.dirname(__file__), "config", "config.json")

with open(CONFIG_FILE, "r", encoding="utf-8") as f:
    config = json.load(f)

STATION_ID = config.get("STATION_ID", 732)
MQTT_SERVER = config.get("MQTT_SERVER", "192.168.1.199")
MQTT_TOPIC = config.get("MQTT_TOPIC", "city/air")
INTERVAL = config.get("INTERVAL", 600)  # w sekundach
LED_PIN = config.get("LED_PIN", 17)

# ===== Konfiguracja GPIO =====
GPIO.setmode(GPIO.BCM)
GPIO.setup(LED_PIN, GPIO.OUT)

def blink_led(times, duration=0.2):
    """Błyśnięcie diodą: times razy"""
    for _ in range(times):
        GPIO.output(LED_PIN, GPIO.HIGH)
        time.sleep(duration)
        GPIO.output(LED_PIN, GPIO.LOW)
        time.sleep(duration)

# ===== Funkcje =====
def fetch_air_quality():
    url = f"https://api.gios.gov.pl/pjp-api/v1/rest/aqindex/getIndex/{STATION_ID}"
    r = requests.get(url, timeout=5)
    r.raise_for_status()
    return r.json()

def send_to_mqtt(data):
    client = mqtt.Client()
    client.connect(MQTT_SERVER)
    payload = json.dumps(data, ensure_ascii=False)
    client.publish(MQTT_TOPIC, payload)
    client.disconnect()
    print("Dane wysłane do MQTT:", payload)

def get_filtered_data():
    data = fetch_air_quality()
    aq = data.get("AqIndex", {})

    # klucze do pobrania
    keys = [
        "Data wykonania obliczeń indeksu",
        "Nazwa kategorii indeksu",
        "Nazwa kategorii indeksu dla wskażnika PM2.5",
        "Nazwa kategorii indeksu dla wskażnika PM10",
        "Nazwa kategorii indeksu dla wskażnika O3"
    ]

    # pobranie danych bez KeyError
    filtered = {key: aq.get(key, "Brak danych") for key in keys}
    return filtered

# ===== Główna pętla =====
try:
    while True:
        try:
            filtered_data = get_filtered_data()
            print("Dane do wysłania:", filtered_data)
            send_to_mqtt(filtered_data)
            blink_led(1)  # 1 błysk = sukces
        except requests.RequestException as e:
            print("Błąd pobierania danych:", e)
            blink_led(5)  # 5 błysków = problem z API
        except Exception as e:
            print("Inny błąd:", e)
            blink_led(5)  # 5 błysków = inny błąd

        time.sleep(INTERVAL)

finally:
    GPIO.cleanup()

5. Zainstaluj wymagane pakiety przez apt

sudo apt update
sudo apt install -y python3-requests python3-paho-mqtt python3-rpi.gpio

Nie używamy pip3, aby nie było problemów z wersjami bibliotek.

6. Utwórz plik systemd

W katalogu systemd utwórz plik air_mqtt.service z konfiguracją:

  • nazwa usługi
  • użytkownik, katalog roboczy
  • ścieżka do skryptu
  • restart po awarii
  • logowanie do journal

Przykładowy plik systemowy:

[Unit]
Description=Air Quality MQTT Script for RPZW
After=network.target

[Service]
User=minimj
WorkingDirectory=/home/minimj/pi-project
ExecStart=/usr/bin/python3 /home/minimj/pi-project/air_mqtt_rpzw.py
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal

[Install]
WantedBy=multi-user.target

7. Skopiuj plik systemd do katalogu systemowego

sudo cp ~/pi-project/systemd/air_mqtt.service /etc/systemd/system/
sudo systemctl daemon-reload
sudo systemctl enable air_mqtt.service

8. Uruchom i sprawdź działanie

  • Start usługi:
sudo systemctl start air_mqtt.service
  • Sprawdzenie statusu:
sudo systemctl status air_mqtt.service
sudo journalctl -u air_mqtt.service -f
  • Dioda LED miga po pobraniu danych (1 błysk = sukces, 5 błysków = problem).
  • Dane są wysyłane do brokera MQTT.

9. Testowanie interwału

  • Interwał pobierania i wysyłki ustawiany jest w pliku config.json
  • Po zmianie interwału zrestartuj usługę:
sudo systemctl restart air_mqtt.service

Dodaj komentarz