My mqtt-in node cannot received data from broker

Hi. I have created my custom mqtt broker using python. Now I have issue that my node-red mqtt-in node cannot receive data from my custom broker. My scenario is I have one esp32 which connected with dht 22 sensor coding by arduino. the sensor sends data to my custom broker and my custom broker can received the data. but when my node-red mqtt in subscribe to my custom broker and get data, it cannot get any data.

import socket
import json
import threading
import traceback
import logging

Initialize the logging module

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(name)

Dictionary to store subscribed topics for each client

client_subscriptions = {}

Dictionary to store last message for each topic

last_messages = {}

Lock for thread safety

lock = threading.Lock()

def decode_string(data, start):
length = (data[start] << 8) + data[start + 1]
topic = data[start + 2:start + 2 + length].decode('utf-8')
return topic, start + 2 + length

def decode_length(data, start):
multiplier = 1
value = 0
while True:
digit = data[start]
value += (digit & 127) * multiplier
multiplier *= 128
start += 1
if (digit & 128) == 0:
break
return value, start

def send_connack(client_socket):
connack = b'\x20\x02\x00\x00' # CONNACK packet
client_socket.sendall(connack)

def decode_string_subscribe(data):
try:
topic_length = (data[2] << 8) + data[3]
topic_bytes = data[4:4 + topic_length]
print(f"Raw bytes of extracted topic: {topic_bytes}")
decoded_topic = topic_bytes.decode('utf-8', errors='replace')
return decoded_topic, 4 + topic_length

except UnicodeDecodeError as ude:
    print(f"[!] UnicodeDecodeError in decode_string_subscribe: {ude}")
    return "", 0
except Exception as ex:
    print(f"[!] Exception in decode_string_subscribe: {ex}")
    return "", 0

def handle_subscribe(client_socket, raw_packet):
try:
topic, index = decode_string_subscribe(raw_packet)

    # Extract the QoS level from the SUBSCRIBE packet
    qos = raw_packet[index]

    with lock:
        if client_socket not in client_subscriptions:
            client_subscriptions[client_socket] = set()
            
        client_subscriptions[client_socket].add(topic)

    print(f"[*] Received SUBSCRIBE packet. Decoded topic: {topic}")
    print(f"[*] Subscriber QoS level: {qos}")
    print(f"[*] Client subscribed to topic: {client_subscriptions}")

except Exception as ex:
    print(f"[!] Exception in handle_subscribe: {ex}")
    print(f"[*] Raw packet in case of exception: {raw_packet}")

def handle_publish(client_socket, topic, payload):
try:
with lock:
last_messages[topic] = payload

    with lock:
        for client, subscribed_topics in client_subscriptions.items():
            if topic in subscribed_topics:
                # Forward the message to subscribed clients
                logger.debug(f"[*] Forwarding message to client {client}: {payload}")
                client.sendall(payload)

    # Additional print statements for verification
    logger.info(f"[*] Published message to topic {topic} with payload: {payload}")
    
    # Print QoS level
    qos_level = (payload[0] & 0b00000110) >> 1
    logger.info(f"[*] QoS level: {qos_level}")

except Exception as ex:
    # Log the exception without exposing payload content
    logger.error(f"[!] Exception in handle_publish: {ex}")

    traceback.print_exc()  # Print the traceback for detailed information

    # Alternatively, you can log the exception to a file or use a logging library
    # logging.error(f"Exception in handle_publish: {ex}")
    # logging.exception("Detailed exception information:")

    logger.info(f"[*] Topic: {topic}, Payload: [Error occurred, check logs for details]")

def handle_client(client_socket):
try:
data = client_socket.recv(1024)

    while data:
        packet_type = (data[0] & 0xF0) >> 4

        if packet_type == 1:  # CONNECT packet
            mqtt_version = data[5] if (data[0] & 0x80) == 0x80 else data[6]

            print(f"[*] Raw CONNECT packet: {data}")
            print(f"[*] Client connected with MQTT version: {mqtt_version}")

            send_connack(client_socket)


        elif packet_type == 8:  # SUBSCRIBE packet
            topic, index = decode_string(data, 2)
            print(f"[*] Received SUBSCRIBE packet. Raw packet: {data}")
            print(f"[*] Received SUBSCRIBE packet. Topic (decoded): {topic}")
            handle_subscribe(client_socket, data[2:])

        elif packet_type == 3:  # PUBLISH packet
            topic, index = decode_string(data, 2)
            payload_length, index = decode_length(data, index)
            payload = data[index:index + payload_length]

            payload_str = payload.decode('utf-8')

            if payload_str[0] != '{':
                payload_str = '{' + payload_str

            print(f"[*] Received raw payload: {payload_str}")

            try:
                payload_dict = json.loads(payload_str)
                temperature = payload_dict.get("temperature")
                print(f"Temperature: {temperature}")

                handle_publish(client_socket, topic, payload_str.encode('utf-8'))

            except json.JSONDecodeError as e:
                print(f"[!] JSON Decode Error: {e}")
            except Exception as ex:
                print(f"[!] Exception: {ex}")

        data = client_socket.recv(1024)

except Exception as e:
    print(f"[!] Error in handle_client: {e}")

finally:
    with lock:
        if client_socket in client_subscriptions:
            del client_subscriptions[client_socket]
    client_socket.close()

if name == "main":
broker_address = "192.168.100.5"
broker_port =1883

broker_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
broker_socket.bind((broker_address, broker_port))
broker_socket.listen(5)
print(f"[*] Broker socket bound to {broker_socket.getsockname()}")
print(f"[*] MQTT Broker listening on {broker_address}:{broker_port}")

try:
    while True:
        client_socket, client_address = broker_socket.accept()
        print(f"[*] Accepted connection from {client_address}")

        # Assuming you allow any client to connect without authentication
        client_handler = threading.Thread(target=handle_client, args=(client_socket,))
        client_handler.start()

except KeyboardInterrupt:
    print("[*] Broker shutting down.")
    broker_socket.close()

This is my broker code

Hi @CCF

I'm not sure we're really in a position to debug your python code.

Have you tested other mqtt clients with your broker?
What have you done to try to debug this yourself? Do you see the TCP connection arrive at the broker? If so, at what point does it fail?

I have tested with other mqtt client, it can inject the message to the broker. but teh broker cannot publish data

this is my whole custom broker code


This is my output for broker

Have you tested any client (not Node-RED) can receive a message?

If you are going to share code in the forum, please format it properly using the </> button in the toolbar.

I cannot see any code where you are constructing the proper mqtt publish packet when you send to the subscribers. It looks like you are just writing the payload to the socket.

others also cannot received

import socket
import json
import threading
import traceback
import logging

# Initialize the logging module
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

# Dictionary to store subscribed topics for each client
client_subscriptions = {}
# Dictionary to store last message for each topic
last_messages = {}
# Lock for thread safety
lock = threading.Lock()

def decode_string(data, start):
    length = (data[start] << 8) + data[start + 1]
    topic = data[start + 2:start + 2 + length].decode('utf-8')
    return topic, start + 2 + length

def decode_length(data, start):
    multiplier = 1
    value = 0
    while True:
        digit = data[start]
        value += (digit & 127) * multiplier
        multiplier *= 128
        start += 1
        if (digit & 128) == 0:
            break
    return value, start

def send_connack(client_socket):
    connack = b'\x20\x02\x00\x00'  # CONNACK packet
    client_socket.sendall(connack)

def decode_string_subscribe(data):
    try:
        topic_length = (data[2] << 8) + data[3]
        topic_bytes = data[4:4 + topic_length]
        print(f"Raw bytes of extracted topic: {topic_bytes}")
        decoded_topic = topic_bytes.decode('utf-8', errors='replace')
        return decoded_topic, 4 + topic_length

    except UnicodeDecodeError as ude:
        print(f"[!] UnicodeDecodeError in decode_string_subscribe: {ude}")
        return "", 0
    except Exception as ex:
        print(f"[!] Exception in decode_string_subscribe: {ex}")
        return "", 0

def handle_subscribe(client_socket, raw_packet):
    try:
        topic, index = decode_string_subscribe(raw_packet)
        
        # Extract the QoS level from the SUBSCRIBE packet
        qos = raw_packet[index]

        with lock:
            if client_socket not in client_subscriptions:
                client_subscriptions[client_socket] = set()
                
            client_subscriptions[client_socket].add(topic)

        print(f"[*] Received SUBSCRIBE packet. Decoded topic: {topic}")
        print(f"[*] Subscriber QoS level: {qos}")
        print(f"[*] Client subscribed to topic: {client_subscriptions}")

    except Exception as ex:
        print(f"[!] Exception in handle_subscribe: {ex}")
        print(f"[*] Raw packet in case of exception: {raw_packet}")


def handle_publish(client_socket, topic, payload):
    try:
        with lock:
            last_messages[topic] = payload

        with lock:
            for client, subscribed_topics in client_subscriptions.items():
                if topic in subscribed_topics:
                    # Forward the message to subscribed clients
                    logger.debug(f"[*] Forwarding message to client {client}: {payload}")
                    client.sendall(payload)

        # Additional print statements for verification
        logger.info(f"[*] Published message to topic {topic} with payload: {payload}")
        
        # Print QoS level
        qos_level = (payload[0] & 0b00000110) >> 1
        logger.info(f"[*] QoS level: {qos_level}")

    except Exception as ex:
        # Log the exception without exposing payload content
        logger.error(f"[!] Exception in handle_publish: {ex}")

        traceback.print_exc()  # Print the traceback for detailed information

        # Alternatively, you can log the exception to a file or use a logging library
        # logging.error(f"Exception in handle_publish: {ex}")
        # logging.exception("Detailed exception information:")

        logger.info(f"[*] Topic: {topic}, Payload: [Error occurred, check logs for details]")


def handle_client(client_socket):
    try:
        data = client_socket.recv(1024)

        while data:
            packet_type = (data[0] & 0xF0) >> 4

            if packet_type == 1:  # CONNECT packet
                mqtt_version = data[5] if (data[0] & 0x80) == 0x80 else data[6]

                print(f"[*] Raw CONNECT packet: {data}")
                print(f"[*] Client connected with MQTT version: {mqtt_version}")

                send_connack(client_socket)


            elif packet_type == 8:  # SUBSCRIBE packet
                topic, index = decode_string(data, 2)
                print(f"[*] Received SUBSCRIBE packet. Raw packet: {data}")
                print(f"[*] Received SUBSCRIBE packet. Topic (decoded): {topic}")
                handle_subscribe(client_socket, data[2:])

            elif packet_type == 3:  # PUBLISH packet
                topic, index = decode_string(data, 2)
                payload_length, index = decode_length(data, index)
                payload = data[index:index + payload_length]

                payload_str = payload.decode('utf-8')

                if payload_str[0] != '{':
                    payload_str = '{' + payload_str

                print(f"[*] Received raw payload: {payload_str}")

                try:
                    payload_dict = json.loads(payload_str)
                    temperature = payload_dict.get("temperature")
                    print(f"Temperature: {temperature}")

                    handle_publish(client_socket, topic, payload_str.encode('utf-8'))

                except json.JSONDecodeError as e:
                    print(f"[!] JSON Decode Error: {e}")
                except Exception as ex:
                    print(f"[!] Exception: {ex}")

            data = client_socket.recv(1024)

    except Exception as e:
        print(f"[!] Error in handle_client: {e}")

    finally:
        with lock:
            if client_socket in client_subscriptions:
                del client_subscriptions[client_socket]
        client_socket.close()


if __name__ == "__main__":
    broker_address = "192.168.100.5"
    broker_port =1883
    
    broker_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    broker_socket.bind((broker_address, broker_port))
    broker_socket.listen(5)
    print(f"[*] Broker socket bound to {broker_socket.getsockname()}")
    print(f"[*] MQTT Broker listening on {broker_address}:{broker_port}")

    try:
        while True:
            client_socket, client_address = broker_socket.accept()
            print(f"[*] Accepted connection from {client_address}")

            # Assuming you allow any client to connect without authentication
            client_handler = threading.Thread(target=handle_client, args=(client_socket,))
            client_handler.start()

    except KeyboardInterrupt:
        print("[*] Broker shutting down.")
        broker_socket.close()

you have any idea how to let teh data publish to the node-red mqtt in? I really have no idea haha

@CCF,

I feel you will be better off discussing in a python orientated community/forum

Most folk around here are JavaScript developers around Node RED.

It’s not really a forum for python - so help around python might be limited on these forums

You can edit old posts rather than keep posting your code in every reply.

You need to format the publish packet properly otherwise no mqtt client will receive it.

Alternatively, just use mosquitto for your broker and don't try to implement your own.

beacuse My project is too not use any outsources haha. So cannot use it

In which case, you've done a good job parsing the incoming publish packet. You need to apply what you've learnt doing that to reconstruct a proper publish packet to sent to your clients.

In the broker I already successfully publish my data to the following topic but still cannot works haha

What do you mean by 'successfully'? You said no clients are receiving it.

As I said, a quick look at your code shows you are just writing the payload to the socket. It is missing all of the mqtt header information. You have already implemented the parsing of the incoming mqtt packets. You need to do the reverse to create the correct mqtt header before sending it.

you have any idea how to reverse to create the correct mqtt header before sending it. haha

I've given you all the hints I can at the moment as I'm heading out for the day.

Assuming this is your code to begin with, you have already figured out parsing the packet. So you know what the packet should look like.

ok thanks a lot

This topic was automatically closed 60 days after the last reply. New replies are no longer allowed.