Hi. I have created my custom mqtt broker using python. Now I have issue that my node-red mqtt-in node cannot receive data from my custom broker. My scenario is I have one esp32 which connected with dht 22 sensor coding by arduino. the sensor sends data to my custom broker and my custom broker can received the data. but when my node-red mqtt in subscribe to my custom broker and get data, it cannot get any data.
import socket
import json
import threading
import traceback
import logging
Initialize the logging module
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(name)
Dictionary to store subscribed topics for each client
client_subscriptions = {}
Dictionary to store last message for each topic
last_messages = {}
Lock for thread safety
lock = threading.Lock()
def decode_string(data, start):
length = (data[start] << 8) + data[start + 1]
topic = data[start + 2:start + 2 + length].decode('utf-8')
return topic, start + 2 + length
def decode_length(data, start):
multiplier = 1
value = 0
while True:
digit = data[start]
value += (digit & 127) * multiplier
multiplier *= 128
start += 1
if (digit & 128) == 0:
break
return value, start
def send_connack(client_socket):
connack = b'\x20\x02\x00\x00' # CONNACK packet
client_socket.sendall(connack)
def decode_string_subscribe(data):
try:
topic_length = (data[2] << 8) + data[3]
topic_bytes = data[4:4 + topic_length]
print(f"Raw bytes of extracted topic: {topic_bytes}")
decoded_topic = topic_bytes.decode('utf-8', errors='replace')
return decoded_topic, 4 + topic_length
except UnicodeDecodeError as ude:
print(f"[!] UnicodeDecodeError in decode_string_subscribe: {ude}")
return "", 0
except Exception as ex:
print(f"[!] Exception in decode_string_subscribe: {ex}")
return "", 0
def handle_subscribe(client_socket, raw_packet):
try:
topic, index = decode_string_subscribe(raw_packet)
# Extract the QoS level from the SUBSCRIBE packet
qos = raw_packet[index]
with lock:
if client_socket not in client_subscriptions:
client_subscriptions[client_socket] = set()
client_subscriptions[client_socket].add(topic)
print(f"[*] Received SUBSCRIBE packet. Decoded topic: {topic}")
print(f"[*] Subscriber QoS level: {qos}")
print(f"[*] Client subscribed to topic: {client_subscriptions}")
except Exception as ex:
print(f"[!] Exception in handle_subscribe: {ex}")
print(f"[*] Raw packet in case of exception: {raw_packet}")
def handle_publish(client_socket, topic, payload):
try:
with lock:
last_messages[topic] = payload
with lock:
for client, subscribed_topics in client_subscriptions.items():
if topic in subscribed_topics:
# Forward the message to subscribed clients
logger.debug(f"[*] Forwarding message to client {client}: {payload}")
client.sendall(payload)
# Additional print statements for verification
logger.info(f"[*] Published message to topic {topic} with payload: {payload}")
# Print QoS level
qos_level = (payload[0] & 0b00000110) >> 1
logger.info(f"[*] QoS level: {qos_level}")
except Exception as ex:
# Log the exception without exposing payload content
logger.error(f"[!] Exception in handle_publish: {ex}")
traceback.print_exc() # Print the traceback for detailed information
# Alternatively, you can log the exception to a file or use a logging library
# logging.error(f"Exception in handle_publish: {ex}")
# logging.exception("Detailed exception information:")
logger.info(f"[*] Topic: {topic}, Payload: [Error occurred, check logs for details]")
def handle_client(client_socket):
try:
data = client_socket.recv(1024)
while data:
packet_type = (data[0] & 0xF0) >> 4
if packet_type == 1: # CONNECT packet
mqtt_version = data[5] if (data[0] & 0x80) == 0x80 else data[6]
print(f"[*] Raw CONNECT packet: {data}")
print(f"[*] Client connected with MQTT version: {mqtt_version}")
send_connack(client_socket)
elif packet_type == 8: # SUBSCRIBE packet
topic, index = decode_string(data, 2)
print(f"[*] Received SUBSCRIBE packet. Raw packet: {data}")
print(f"[*] Received SUBSCRIBE packet. Topic (decoded): {topic}")
handle_subscribe(client_socket, data[2:])
elif packet_type == 3: # PUBLISH packet
topic, index = decode_string(data, 2)
payload_length, index = decode_length(data, index)
payload = data[index:index + payload_length]
payload_str = payload.decode('utf-8')
if payload_str[0] != '{':
payload_str = '{' + payload_str
print(f"[*] Received raw payload: {payload_str}")
try:
payload_dict = json.loads(payload_str)
temperature = payload_dict.get("temperature")
print(f"Temperature: {temperature}")
handle_publish(client_socket, topic, payload_str.encode('utf-8'))
except json.JSONDecodeError as e:
print(f"[!] JSON Decode Error: {e}")
except Exception as ex:
print(f"[!] Exception: {ex}")
data = client_socket.recv(1024)
except Exception as e:
print(f"[!] Error in handle_client: {e}")
finally:
with lock:
if client_socket in client_subscriptions:
del client_subscriptions[client_socket]
client_socket.close()
if name == "main":
broker_address = "192.168.100.5"
broker_port =1883
broker_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
broker_socket.bind((broker_address, broker_port))
broker_socket.listen(5)
print(f"[*] Broker socket bound to {broker_socket.getsockname()}")
print(f"[*] MQTT Broker listening on {broker_address}:{broker_port}")
try:
while True:
client_socket, client_address = broker_socket.accept()
print(f"[*] Accepted connection from {client_address}")
# Assuming you allow any client to connect without authentication
client_handler = threading.Thread(target=handle_client, args=(client_socket,))
client_handler.start()
except KeyboardInterrupt:
print("[*] Broker shutting down.")
broker_socket.close()
This is my broker code