How to receive MQTT data using Subscribe and callback

I am have trouble figuring out how to implement the subscribe and callback feature of MQTT in micropython. The examples I've found aren't helping me figure out how to do it. I'm a new guy.

I am using a Pi Pico W, HiveMQ and Node-Red. I can send data to node-red and display it on a guage, but I can't seem to get data like a switch toggle back. The MQTT send node says its connected. I see the data coming through HiveMQ and something is getting back to the Pi Pico, but it crashes the program with the following error:

Traceback (most recent call last):
  File "<stdin>", line 77, in <module>
  File "umqtt/simple.py", line 216, in check_msg
  File "umqtt/simple.py", line 202, in wait_msg
TypeError: function takes 1 positional arguments but 2 were given

Line 77 is client.check_msg()

The callback is:

def sub_cb(topic):
    print(topic)
    ```

client.set_callback(sub_cb)
client.subscribe(SUBSCRIBE_TOPIC)

SUBSCRIBE_TOPIC = "badflash.picow/onoff"

Help!

Can you explain a bit more about your setup?
Maybe a screen capture of the Node-red flow would help.

I can't understand how Node-red fits in with your attempt to publish or subscribe in micropython on a Raspberry Pi Pico, nor how the Pico fits in with subscribing and publishing from Node-red

They are just using NR to publish/subscribe to a broker
Their issue is with Micropython subscribing on the PicoW
It's not really a NR issue

Yes, cymplecy has it right. I don't know the correct micropython structure for subscribing and callback.

Here is my code:

import network
import utime
from umqtt.simple import MQTTClient
import machine, onewire, ds18x20, time
 
SUBSCRIBE_TOPIC = "badflash.picow/onoff"

ds_pin = machine.Pin(5)
 
ds_sensor = ds18x20.DS18X20(onewire.OneWire(ds_pin))
 
roms = ds_sensor.scan()
 
print('Found DS devices: ', roms)
from secrets import SSID, PW
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.config(pm = 0xa11140)
wlan.connect(SSID, PW)
max_wait = 10
while max_wait > 0:
    if wlan.status() < 0 or wlan.status() >= 3:
        break
    max_wait -= 1
    print('waiting for connection...')
    utime.sleep(1)
    
# Handle connection error
if wlan.status() != 3:
    raise RuntimeError('network connection failed')
else:
    print('Connected')
    status = wlan.ifconfig()
    print( 'ip = ' + status[0] )
    

def connectMQTT():
    client= MQTTClient(client_id=b"badflash.picow",
                       server=b"'***********************.s1.eu.hivemq.cloud",
                       port=0,
                       user=b"badflash.picow",
                       password=b"**********",
                       keepalive=7200,
                       ssl=True,
                       ssl_params={'server_hostname':'***********************.s1.eu.hivemq.cloud'}
                       )
    client.connect()
    return client

client = connectMQTT()

def publish(topic, value):
    print(topic)
    print(value)
    client.publish(topic, value)
    print("publish Done")
    
def sub_cb(topic):
    print(topic)
    

    
while True:
    ds_sensor.convert_temp()
 
    time.sleep_ms(750)
 
    for rom in roms:
        print(rom)
        temperature= (ds_sensor.read_temp(rom))*9/5+32
        print(ds_sensor.read_temp(rom))     
    publish('badflash.picow/temperature', str(temperature))
    client.set_callback(sub_cb)
    client.subscribe(SUBSCRIBE_TOPIC)
    utime.sleep(5)
# Non-blocking wait for message
    client.check_msg()
                       
    time.sleep(1.25)
   

This is what is coming back throught HiveMQ:

Here's the basic structure I sometimes use. To keep it simple I've left out error checking and checking if the connection to the WiFi and/or MQTT broker has been lost. Normally I'd use a non-blocking delay in 'main' rather than utime.sleep(5) as utime introduces a fixed delay before the code performs 'check_msg()'.

import utime
import network
from umqtt.simple import MQTTClient

# Wi-Fi configuration
WIFI_SSID = "YOUR_WIFI_SSID"
WIFI_PASSWORD = "YOUR_WIFI_PASSWORD"

# MQTT configuration
MQTT_BROKER = "mqtt.example.com"
MQTT_PORT = 1883  # Change this to the port your MQTT broker is running on
MQTT_CLIENT_ID = "ESP32"
MQTT_TOPIC_SUBSCRIBE = "topic/sub"
MQTT_TOPIC_PUBLISH = "topic/pub"

# Connect to Wi-Fi
def connect_to_wifi():
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    if not wlan.isconnected():
        print("Connecting to Wi-Fi...")
        wlan.connect(WIFI_SSID, WIFI_PASSWORD)
        while not wlan.isconnected():
            pass
    print("Wi-Fi connected:", wlan.ifconfig())

# MQTT message callback
def mqtt_callback(topic, msg):
    print("Received message on topic {}: {}".format(topic, msg.decode()))

# Connect to MQTT broker
def connect_to_mqtt():
    client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, port=MQTT_PORT)
    client.set_callback(mqtt_callback)
    client.connect()
    client.subscribe(MQTT_TOPIC_SUBSCRIBE)
    print("Connected to MQTT broker")

    return client

# Main function
def main():
    connect_to_wifi()
    mqtt_client = connect_to_mqtt()

    # Publish data
    while True:
        data = "Hello, MQTT!"
        mqtt_client.publish(MQTT_TOPIC_PUBLISH, data)
        print("Published data:", data)
        utime.sleep(5)

        # Check for incoming messages
        mqtt_client.check_msg()

if __name__ == "__main__":
    main()

Here's a version with a non-blocking delay in 'main'. This version means the code in 'mqtt_callback' will respond very quickly when a message is received on the subscribe topic.

import utime
import network
from umqtt.simple import MQTTClient

# Wi-Fi configuration
WIFI_SSID = "YOUR_WIFI_SSID"
WIFI_PASSWORD = "YOUR_WIFI_PASSWORD"

# MQTT configuration
MQTT_BROKER = "mqtt.example.com"
MQTT_PORT = 1883  # Change this to the port your MQTT broker is running on
MQTT_CLIENT_ID = "ESP32"
MQTT_TOPIC_SUBSCRIBE = "topic/sub"
MQTT_TOPIC_PUBLISH = "topic/pub"
MQTT_USERNAME = "your_username"
MQTT_PASSWORD = "your_password"

# Connect to Wi-Fi
def connect_to_wifi():
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    if not wlan.isconnected():
        print("Connecting to Wi-Fi...")
        wlan.connect(WIFI_SSID, WIFI_PASSWORD)
        while not wlan.isconnected():
            pass
    print("Wi-Fi connected:", wlan.ifconfig())

# MQTT message callback
def mqtt_callback(topic, msg):
    print("Received message on topic {}: {}".format(topic, msg.decode()))

# Connect to MQTT broker
def connect_to_mqtt():
    client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, port=MQTT_PORT, user=MQTT_USERNAME, password=MQTT_PASSWORD)
    client.set_callback(mqtt_callback)
    client.connect()
    client.subscribe(MQTT_TOPIC_SUBSCRIBE)
    print("Connected to MQTT broker")

    return client

# Main function
def main():
    connect_to_wifi()
    mqtt_client = connect_to_mqtt()
    
    # Publish data
    last_publish_time = utime.time()
    publish_interval  = 5  # Publish every 5 seconds

    while True:
        current_time = utime.time()
        # Non-blocking delay for the publish interval
        if current_time - last_publish_time >= publish_interval:
            data = "Hello, MQTT!"
            mqtt_client.publish(MQTT_TOPIC_PUBLISH, data)
            print("Published data:", data)
            last_publish_time = current_time

        # Check for incoming messages
        mqtt_client.check_msg()

        utime.sleep_ms(100)  # Sleep for 100 milliseconds

if __name__ == "__main__":
    main()

1 Like

Thank You! That is exactly what I needed.
I needed to format my publish statement:

data = str(data).encode()
mqtt_client.publish(MQTT_TOPIC_PUBLISH, data)

Having a "DUH" moment. More peeling the onion. I can't figure out how to trap the data that comes back in the callback. It seems to come in as some sort of byte array and I have been unable to trap the topic or msg using if statement.

I was trying to use things like:
if topic=="badflash.pico/onoff" and msg==1:
PWRFLG=True

Not catching it.

Thanks in advance

Try this...

Here's the Node-RED flow to send a directive...
thursday_directive

[{"id":"332f5e2948396c68","type":"tab","label":"Flow 4","disabled":false,"info":"","env":[]},{"id":"0f465be5ce4f96e5","type":"mqtt out","z":"332f5e2948396c68","name":"","topic":"topic/sub","qos":"","retain":"","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"7002d750.c4462","x":700,"y":260,"wires":[]},{"id":"ca67fabdf76b653b","type":"inject","z":"332f5e2948396c68","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":320,"y":260,"wires":[["5310c8f218f69d9f"]]},{"id":"5310c8f218f69d9f","type":"function","z":"332f5e2948396c68","name":"prepare directive","func":"msg.payload = { \"request_report\": \"anything\" };\nreturn msg;","outputs":1,"timeout":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":510,"y":260,"wires":[["0f465be5ce4f96e5"]]},{"id":"7002d750.c4462","type":"mqtt-broker","name":"","broker":"192.168.1.156","port":"1883","clientid":"","autoConnect":true,"usetls":false,"protocolVersion":"4","keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","birthMsg":{},"closeTopic":"","closeQos":"0","closePayload":"","closeMsg":{},"willTopic":"","willQos":"0","willPayload":"","willMsg":{},"userProps":"","sessionExpiry":""}]

And here's the modified Micro-Python script to detect the directive sent from Node-RED.
Note: You need to import ujson.

import utime
import network
import ujson
from umqtt.simple import MQTTClient

# Wi-Fi configuration
WIFI_SSID = "***"
WIFI_PASSWORD = "***"

# MQTT configuration
MQTT_BROKER = "**************"
MQTT_CLIENT_ID = "ESP32"
MQTT_TOPIC_SUBSCRIBE = "topic/sub"
MQTT_TOPIC_PUBLISH = "topic/pub"



# Connect to Wi-Fi
def connect_to_wifi():
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    if not wlan.isconnected():
        print("Connecting to Wi-Fi...")
        wlan.connect(WIFI_SSID, WIFI_PASSWORD)
        while not wlan.isconnected():
            pass
    print("Wi-Fi connected:", wlan.ifconfig())

# MQTT message callback
def mqtt_callback(topic, msg):
    print("Received message on topic {}: {}".format(topic, msg.decode()))
    
    mqtt_directives = dict(ujson.loads(msg.decode("utf-8")))
    print("Recd directive is:", mqtt_directives)
    #Then decode the directive that has been sent

# Connect to MQTT broker
def connect_to_mqtt():
    client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER)
    client.set_callback(mqtt_callback)
    client.connect()
    client.subscribe(MQTT_TOPIC_SUBSCRIBE)
    print("Connected to MQTT broker")

    return client

# Main function
def main():
    connect_to_wifi()
    mqtt_client = connect_to_mqtt()
    
    # Publish data
    last_publish_time = utime.time()
    publish_interval  = 5  # Publish every 5 seconds

    while True:
        current_time = utime.time()
        # Non-blocking delay
        if current_time - last_publish_time >= publish_interval:
            data = "Hello, MQTT!"
            mqtt_client.publish(MQTT_TOPIC_PUBLISH, data)
            print("Published data:", data)
            last_publish_time = current_time

        # Check for incoming messages
        mqtt_client.check_msg()

        utime.sleep_ms(100)  # Sleep for 100 milliseconds

if __name__ == "__main__":
    main()

Thanks again, but 2 steps forward, one step back,
I dropped

    mqtt_directives = dict(ujson.loads(msg.decode("utf-8")))
    print("Recd directive is:", mqtt_directives)

into the mqtt_callback. When I run it, it errors and points to the
mqtt_directives line and says:

  File "<stdin>", line 81, in mqtt_callback
TypeError: 'int' object isn't iterable

Is that mqtt-directives code line 81? If not what is it?

Without your code, it is near impossible to know what is going on.

Its from a post just above

def mqtt_callback(topic, msg):
    print("Received message on topic {}: {}".format(topic, msg.decode()))
    
    mqtt_directives = dict(ujson.loads(msg.decode("utf-8")))
    print("Recd directive is:", mqtt_directives)
    #Then decode the directive that has been sent

This is the response I get when I send this payload (shown below) from a function node connected to MQTT-Out in Node-RED (as per the flow I posted above).

msg.payload = { "request_report": "anything" };
return msg;

Received message on topic b'topic/sub': {"request_report":"anything"}
Recd directive is: {'request_report': 'anything'}

I'm not at a point where I understand what you are doing, but I did find something that works.
I added a couple of lines to the callback to capture the topic and message:

def mqtt_callback(topic, msg):
    global Topic,Msg
    print("Received message on topic {}: {}".format(topic, msg.decode()))
    Topic=topic
    Msg=msg

Then in the main program I decoded the messages:

try:
        if Msg.decode()=="1":
            PWRFLG=True
            led.value(1)
        if Msg.decode()=="0":
            PWRFLG=False
            led.value(0)
        if Msg.decode() >"60":
            SP=float(Msg.decode())
            
    except:
        pass

It looks like the message sticks in the queue until something else takes its place.

Anyway, thanks for the help. It got me thinking at least.

Let me try to explain my approach or strategy.

The msg.payload I'm sending, from Node-RED, is a JavaScript object that contains one or more name/value pairs. I called it a directive as it going to "direct" your physical node to do something. The example I used was to request a report. I have a physical node in my greenhouse that reports on a regular basis (say every 15-mins) temperature/humidity/pressure. By sending this directive, I can force the node to send a report instantly to Node-RED.

msg.payload = { "request_report": "anything" };

If you had an LED fitted to the node, then this "directive" could have been used to turn it On or OFF.

msg.payload = { "led": "on" };
msg.payload = { "led": "off" };
msg.payload = { "led": 0 };
msg.payload = { "led": 1 };

If you had more than one LED you could compose directives like this...

msg.payload = { "led0": "on" };
msg.payload = { "led1": "off" };
msg.payload = { "led2": 0 };
msg.payload = { "led": 1 };

All these directives would be decoded in 'mqtt_callback', so it would be up to you to work out a strategy suitable for your application.
For example, you could combine a number of name/value pairs in the payload to simplify the above.

msg.payload = { "led0": "on", "led1": "off", "led2": 0, "led": 1 };

If you wanted to turn all the LEDs on or off, you could use something like...

msg.payload = { "all": "on"};

or

msg.payload = ("all": "off"}

For the last situation (shown above) you would have to write a series of Micro-Python statements to turn each LED on or off in 'mqtt_callback' to achieve the desired effect,

If you would like to read more about this strategy, I've published some of my Micro-Python projects on the following website...

https://teamwork-int.com/share_my_projects/

Click the folder labelled C_single_sensor_node as that contains a detailed install guide.

EDIT:
I'm currently working on a Micro-Python script that operates in 'cyclic mode' so the microcontroller never goes to sleep and will react to "directives" sent to it from Node-RED along the lines described above.
If you PM your email address to me I'll let you know when the script is complete and working.

Just had a few spare minutes to show how I would perform the decoding in 'mqtt_callback'. Basically it's a series of tests to check the contents of the received message.

# MQTT message callback
def mqtt_callback(topic, msg):
    print("Received message on topic {}: {}".format(topic, msg.decode()))
    
    mqtt_directives = dict(ujson.loads(msg.decode("utf-8")))
    print("Recd directive is:", mqtt_directives)
    
    if mqtt_directives.get("led0") is not None:
        led0_value = mqtt_directives["led0"]
        print("Set LED0 to: ", led0_value)
        #Set the gpio-pin to led0_value
        
    if mqtt_directives.get("led1") is not None:
        led1_value = mqtt_directives["led1"]
        print("Set LED1 to: ", led1_value)
        #Set the gpio-pin to led1_value

I truly appreciate the help. Are you using a Pi Pico? I just can't seem to get the directive thing to work.

I have been successful snagging the topic and message sent back by code red, and am using that in if statements to capture the power on/off and setpoint commands. It is working pretty well.

I ended up having to put most of the code together in an explicit while true loop. Setting the "main" up in a def main call was causing issue with local variable declaration errors I could not resolve. I could grab the commands, but they would not execute reliably. Getting rid of the :

if __name__ == "__main__":
    main()
``
fixed that.

Here is my incubator dashboard in operation.
![Screenshot 2024-03-22 090100|283x500](upload://hQGKBT4tBXIQP5Y5MRr1y1rZJtE.png)

At the moment I'm using Micro-Python script on an ESP32-C3-SuperMini. I have a few Pico-Ws and will try the code on one of them over the weekend. As I said in my previous post, it might be better to take our discussion to a private chat as your questions are more related to Python, rather than Node-RED.

PS: The link to your 'png' doesn't work.