{"id":5958,"date":"2023-11-28T21:41:38","date_gmt":"2023-11-28T20:41:38","guid":{"rendered":"https:\/\/arduino.net.pl\/?p=5958"},"modified":"2023-11-28T22:11:19","modified_gmt":"2023-11-28T21:11:19","slug":"wspolpraca-nrf24l01-z-micropython","status":"publish","type":"post","link":"https:\/\/arduino.net.pl\/index.php\/wspolpraca-nrf24l01-z-micropython\/","title":{"rendered":"Wsp\u00f3\u0142praca nRF24L01 z MicroPython"},"content":{"rendered":"\n<p>Poni\u017cej znajdziesz przyk\u0142adowy kod w j\u0119zyku MicroPython dla dw\u00f3ch mikroprocesor\u00f3w ESP8266, kt\u00f3re komunikuj\u0105 si\u0119 ze sob\u0105 za pomoc\u0105 modu\u0142\u00f3w nRF24L01. Do tego celu u\u017cy\u0142em biblioteki &#8222;rf24&#8221; dost\u0119pnej dla MicroPython.<\/p>\n\n\n\n<p>Kod biblioteki nrf24l01.py<\/p>\n\n\n<div class=\"wp-block-syntaxhighlighter-code \"><pre class=\"brush: plain; light: false; title: Kod:; toolbar: true; notranslate\" title=\"Kod:\">\n&quot;&quot;&quot;NRF24L01 driver for MicroPython\n&quot;&quot;&quot;\n\nfrom micropython import const\nimport utime\n\n# nRF24L01+ registers\nCONFIG = const(0x00)\nEN_RXADDR = const(0x02)\nSETUP_AW = const(0x03)\nSETUP_RETR = const(0x04)\nRF_CH = const(0x05)\nRF_SETUP = const(0x06)\nSTATUS = const(0x07)\nRX_ADDR_P0 = const(0x0A)\nTX_ADDR = const(0x10)\nRX_PW_P0 = const(0x11)\nFIFO_STATUS = const(0x17)\nDYNPD = const(0x1C)\n\n# CONFIG register\nEN_CRC = const(0x08)  # enable CRC\nCRCO = const(0x04)  # CRC encoding scheme; 0=1 byte, 1=2 bytes\nPWR_UP = const(0x02)  # 1=power up, 0=power down\nPRIM_RX = const(0x01)  # RX\/TX control; 0=PTX, 1=PRX\n\n# RF_SETUP register\nPOWER_0 = const(0x00)  # -18 dBm\nPOWER_1 = const(0x02)  # -12 dBm\nPOWER_2 = const(0x04)  # -6 dBm\nPOWER_3 = const(0x06)  # 0 dBm\nSPEED_1M = const(0x00)\nSPEED_2M = const(0x08)\nSPEED_250K = const(0x20)\n\n# STATUS register\nRX_DR = const(0x40)  # RX data ready; write 1 to clear\nTX_DS = const(0x20)  # TX data sent; write 1 to clear\nMAX_RT = const(0x10)  # max retransmits reached; write 1 to clear\n\n# FIFO_STATUS register\nRX_EMPTY = const(0x01)  # 1 if RX FIFO is empty\n\n# constants for instructions\nR_RX_PL_WID = const(0x60)  # read RX payload width\nR_RX_PAYLOAD = const(0x61)  # read RX payload\nW_TX_PAYLOAD = const(0xA0)  # write TX payload\nFLUSH_TX = const(0xE1)  # flush TX FIFO\nFLUSH_RX = const(0xE2)  # flush RX FIFO\nNOP = const(0xFF)  # use to read STATUS register\n\n\nclass NRF24L01:\n    def __init__(self, spi, cs, ce, channel=46, payload_size=16):\n        assert payload_size &amp;lt;= 32\n\n        self.buf = bytearray(1)\n\n        # store the pins\n        self.spi = spi\n        self.cs = cs\n        self.ce = ce\n\n        # init the SPI bus and pins\n        self.init_spi(4000000)\n\n        # reset everything\n        ce.init(ce.OUT, value=0)\n        cs.init(cs.OUT, value=1)\n\n        self.payload_size = payload_size\n        self.pipe0_read_addr = None\n        utime.sleep_ms(5)\n\n        # set address width to 5 bytes and check for device present\n        self.reg_write(SETUP_AW, 0b11)\n        if self.reg_read(SETUP_AW) != 0b11:\n            raise OSError(&quot;nRF24L01+ Hardware not responding&quot;)\n\n        # disable dynamic payloads\n        self.reg_write(DYNPD, 0)\n\n        # auto retransmit delay: 1750us\n        # auto retransmit count: 8\n        self.reg_write(SETUP_RETR, (6 &amp;lt;&amp;lt; 4) | 8)\n\n        # set rf power and speed\n        self.set_power_speed(POWER_3, SPEED_250K)  # Best for point to point links\n\n        # init CRC\n        self.set_crc(2)\n\n        # clear status flags\n        self.reg_write(STATUS, RX_DR | TX_DS | MAX_RT)\n\n        # set channel\n        self.set_channel(channel)\n\n        # flush buffers\n        self.flush_rx()\n        self.flush_tx()\n\n    def init_spi(self, baudrate):\n        try:\n            master = self.spi.MASTER\n        except AttributeError:\n            self.spi.init(baudrate=baudrate, polarity=0, phase=0)\n        else:\n            self.spi.init(master, baudrate=baudrate, polarity=0, phase=0)\n\n    def reg_read(self, reg):\n        self.cs(0)\n        self.spi.readinto(self.buf, reg)\n        self.spi.readinto(self.buf)\n        self.cs(1)\n        return self.buf&#x5B;0]\n\n    def reg_write_bytes(self, reg, buf):\n        self.cs(0)\n        self.spi.readinto(self.buf, 0x20 | reg)\n        self.spi.write(buf)\n        self.cs(1)\n        return self.buf&#x5B;0]\n\n    def reg_write(self, reg, value):\n        self.cs(0)\n        self.spi.readinto(self.buf, 0x20 | reg)\n        ret = self.buf&#x5B;0]\n        self.spi.readinto(self.buf, value)\n        self.cs(1)\n        return ret\n\n    def flush_rx(self):\n        self.cs(0)\n        self.spi.readinto(self.buf, FLUSH_RX)\n        self.cs(1)\n\n    def flush_tx(self):\n        self.cs(0)\n        self.spi.readinto(self.buf, FLUSH_TX)\n        self.cs(1)\n\n    # power is one of POWER_x defines; speed is one of SPEED_x defines\n    def set_power_speed(self, power, speed):\n        setup = self.reg_read(RF_SETUP) &amp;amp; 0b11010001\n        self.reg_write(RF_SETUP, setup | power | speed)\n\n    # length in bytes: 0, 1 or 2\n    def set_crc(self, length):\n        config = self.reg_read(CONFIG) &amp;amp; ~(CRCO | EN_CRC)\n        if length == 0:\n            pass\n        elif length == 1:\n            config |= EN_CRC\n        else:\n            config |= EN_CRC | CRCO\n        self.reg_write(CONFIG, config)\n\n    def set_channel(self, channel):\n        self.reg_write(RF_CH, min(channel, 125))\n\n    # address should be a bytes object 5 bytes long\n    def open_tx_pipe(self, address):\n        assert len(address) == 5\n        self.reg_write_bytes(RX_ADDR_P0, address)\n        self.reg_write_bytes(TX_ADDR, address)\n        self.reg_write(RX_PW_P0, self.payload_size)\n\n    # address should be a bytes object 5 bytes long\n    # pipe 0 and 1 have 5 byte address\n    # pipes 2-5 use same 4 most-significant bytes as pipe 1, plus 1 extra byte\n    def open_rx_pipe(self, pipe_id, address):\n        assert len(address) == 5\n        assert 0 &amp;lt;= pipe_id &amp;lt;= 5\n        if pipe_id == 0:\n            self.pipe0_read_addr = address\n        if pipe_id &amp;lt; 2:\n            self.reg_write_bytes(RX_ADDR_P0 + pipe_id, address)\n        else:\n            self.reg_write(RX_ADDR_P0 + pipe_id, address&#x5B;0])\n        self.reg_write(RX_PW_P0 + pipe_id, self.payload_size)\n        self.reg_write(EN_RXADDR, self.reg_read(EN_RXADDR) | (1 &amp;lt;&amp;lt; pipe_id))\n\n    def start_listening(self):\n        self.reg_write(CONFIG, self.reg_read(CONFIG) | PWR_UP | PRIM_RX)\n        self.reg_write(STATUS, RX_DR | TX_DS | MAX_RT)\n\n        if self.pipe0_read_addr is not None:\n            self.reg_write_bytes(RX_ADDR_P0, self.pipe0_read_addr)\n\n        self.flush_rx()\n        self.flush_tx()\n        self.ce(1)\n        utime.sleep_us(130)\n\n    def stop_listening(self):\n        self.ce(0)\n        self.flush_tx()\n        self.flush_rx()\n\n    # returns True if any data available to recv\n    def any(self):\n        return not bool(self.reg_read(FIFO_STATUS) &amp;amp; RX_EMPTY)\n\n    def recv(self):\n        # get the data\n        self.cs(0)\n        self.spi.readinto(self.buf, R_RX_PAYLOAD)\n        buf = self.spi.read(self.payload_size)\n        self.cs(1)\n        # clear RX ready flag\n        self.reg_write(STATUS, RX_DR)\n\n        return buf\n\n    # blocking wait for tx complete\n    def send(self, buf, timeout=500):\n        self.send_start(buf)\n        start = utime.ticks_ms()\n        result = None\n        while result is None and utime.ticks_diff(utime.ticks_ms(), start) &amp;lt; timeout:\n            result = self.send_done()  # 1 == success, 2 == fail\n        if result == 2:\n            raise OSError(&quot;send failed&quot;)\n\n    # non-blocking tx\n    def send_start(self, buf):\n        # power up\n        self.reg_write(CONFIG, (self.reg_read(CONFIG) | PWR_UP) &amp;amp; ~PRIM_RX)\n        utime.sleep_us(150)\n        # send the data\n        self.cs(0)\n        self.spi.readinto(self.buf, W_TX_PAYLOAD)\n        self.spi.write(buf)\n        if len(buf) &amp;lt; self.payload_size:\n            self.spi.write(b&quot;\\x00&quot; * (self.payload_size - len(buf)))  # pad out data\n        self.cs(1)\n\n        # enable the chip so it can send the data\n        self.ce(1)\n        utime.sleep_us(15)  # needs to be &gt;10us\n        self.ce(0)\n\n    # returns None if send still in progress, 1 for success, 2 for fail\n    def send_done(self):\n        if not (self.reg_read(STATUS) &amp;amp; (TX_DS | MAX_RT)):\n            return None  # tx not finished\n\n        # either finished or failed: get and clear status flags, power down\n        status = self.reg_write(STATUS, RX_DR | TX_DS | MAX_RT)\n        self.reg_write(CONFIG, self.reg_read(CONFIG) &amp;amp; ~PWR_UP)\n        return 1 if status &amp;amp; TX_DS else 2\n\n<\/pre><\/div>\n\n\n<h3 class=\"wp-block-heading\">ESP8266 #1 (Nadawca):<\/h3>\n\n\n<div class=\"wp-block-syntaxhighlighter-code \"><pre class=\"brush: plain; light: false; title: Kod:; toolbar: true; notranslate\" title=\"Kod:\">\nfrom machine import Pin, SPI\nfrom nrf24 import NRF24L01\n\n# Konfiguracja pin\u00f3w\nce_pin = Pin(5)  # D1\ncs_pin = Pin(4)  # D2\n\n# Konfiguracja SPI\nspi = SPI(1, baudrate=1000000, polarity=0, phase=0, sck=Pin(14), mosi=Pin(13), miso=Pin(12))\n\n# Inicjalizacja modu\u0142u nRF24L01\nradio = NRF24L01(spi, cs_pin, ce_pin)\nradio.open_tx_pipe(b&#039;NODE&#039;)\n\n# Funkcja do wysy\u0142ania wiadomo\u015bci\ndef send_message(message):\n    radio.send(message.encode(&#039;utf-8&#039;))\n    print(&quot;Sent message:&quot;, message)\n\n# P\u0119tla g\u0142\u00f3wna\nwhile True:\n    send_message(&quot;Hello from Node 1!&quot;)\n\n<\/pre><\/div>\n\n\n<h3 class=\"wp-block-heading\">ESP8266 #2 (Odbiorca):<\/h3>\n\n\n<div class=\"wp-block-syntaxhighlighter-code \"><pre class=\"brush: plain; light: false; title: Kod:; toolbar: true; notranslate\" title=\"Kod:\">\nfrom machine import Pin, SPI\nfrom nrf24 import NRF24L01\n\n# Konfiguracja pin\u00f3w\nce_pin = Pin(5)  # D1\ncs_pin = Pin(4)  # D2\n\n# Konfiguracja SPI\nspi = SPI(1, baudrate=1000000, polarity=0, phase=0, sck=Pin(14), mosi=Pin(13), miso=Pin(12))\n\n# Inicjalizacja modu\u0142u nRF24L01\nradio = NRF24L01(spi, cs_pin, ce_pin)\nradio.open_rx_pipe(1, b&#039;NODE&#039;)\n\n# Funkcja do odbierania wiadomo\u015bci\ndef receive_message():\n    if radio.any():\n        message = radio.recv().decode(&#039;utf-8&#039;)\n        print(&quot;Received message:&quot;, message)\n\n# P\u0119tla g\u0142\u00f3wna\nwhile True:\n    receive_message()\n\n<\/pre><\/div>\n\n\n<ul class=\"wp-block-list\">\n<li><strong>ESP8266 #1 (Nadawca):<\/strong>\n<ul class=\"wp-block-list\">\n<li>VCC -&gt; 3.3V<\/li>\n\n\n\n<li>GND -&gt; GND<\/li>\n\n\n\n<li>CE -&gt; D5 (Pin 5)<\/li>\n\n\n\n<li>CSN -&gt; D4 (Pin 4)<\/li>\n\n\n\n<li>SCK -&gt; D14 (Pin 14)<\/li>\n\n\n\n<li>MOSI -&gt; D13 (Pin 13)<\/li>\n\n\n\n<li>MISO -&gt; D12 (Pin 12)<\/li>\n<\/ul>\n<\/li>\n\n\n\n<li><strong>ESP8266 #2 (Odbiorca):<\/strong>\n<ul class=\"wp-block-list\">\n<li>VCC -&gt; 3.3V<\/li>\n\n\n\n<li>GND -&gt; GND<\/li>\n\n\n\n<li>CE -&gt; D5 (Pin 5)<\/li>\n\n\n\n<li>CSN -&gt; D4 (Pin 4)<\/li>\n\n\n\n<li>SCK -&gt; D14 (Pin 14)<\/li>\n\n\n\n<li>MOSI -&gt; D13 (Pin 13)<\/li>\n\n\n\n<li>MISO -&gt; D12 (Pin 12)<\/li>\n<\/ul>\n<\/li>\n<\/ul>\n","protected":false},"excerpt":{"rendered":"<p>Poni\u017cej znajdziesz przyk\u0142adowy kod w j\u0119zyku MicroPython dla dw\u00f3ch mikroprocesor\u00f3w ESP8266, kt\u00f3re komunikuj\u0105 si\u0119 ze sob\u0105 za pomoc\u0105 modu\u0142\u00f3w nRF24L01. Do tego celu u\u017cy\u0142em biblioteki &#8222;rf24&#8221; dost\u0119pnej dla MicroPython. Kod&#8230;<\/p>\n","protected":false},"author":3,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"_monsterinsights_skip_tracking":false,"_monsterinsights_sitenote_active":false,"_monsterinsights_sitenote_note":"","_monsterinsights_sitenote_category":0,"footnotes":""},"categories":[2],"tags":[],"class_list":["post-5958","post","type-post","status-publish","format-standard","hentry","category-arduino"],"aioseo_notices":[],"_links":{"self":[{"href":"https:\/\/arduino.net.pl\/index.php\/wp-json\/wp\/v2\/posts\/5958","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/arduino.net.pl\/index.php\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/arduino.net.pl\/index.php\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/arduino.net.pl\/index.php\/wp-json\/wp\/v2\/users\/3"}],"replies":[{"embeddable":true,"href":"https:\/\/arduino.net.pl\/index.php\/wp-json\/wp\/v2\/comments?post=5958"}],"version-history":[{"count":7,"href":"https:\/\/arduino.net.pl\/index.php\/wp-json\/wp\/v2\/posts\/5958\/revisions"}],"predecessor-version":[{"id":5966,"href":"https:\/\/arduino.net.pl\/index.php\/wp-json\/wp\/v2\/posts\/5958\/revisions\/5966"}],"wp:attachment":[{"href":"https:\/\/arduino.net.pl\/index.php\/wp-json\/wp\/v2\/media?parent=5958"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/arduino.net.pl\/index.php\/wp-json\/wp\/v2\/categories?post=5958"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/arduino.net.pl\/index.php\/wp-json\/wp\/v2\/tags?post=5958"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}