Publish 1000+ sensors data from Node-Red to ThingsBoard using MQTT

I have about 1000 sensors connected to NR. They are pushed to Thingsboard from NR by using MQTT (I am new to both platforms). Sensor data contains its device token. To connect to Thingsboard with MQTT we have to provide device token of TB in NR in MQTT Broker --> Server --> username.

I think there are 2 approaches. First, create 1000 MQTT Brokers with fixed device token.

Another approach is that use dynamic MQTT connections (Node-Red MQTT Dynamic Connections). I can successfully run below flow.

  1. ParseInput parse out device token and msg, store them in global variables.
  2. DynamicConnMQTT read device token from global variable and connect MQTT Broker.
  3. Once DynamicConnMQTT is done, SendMQTT read msg from global variable and send it. I have tried to merge 2) and 3) in one node, looks like: node.send (connect_mqtt); node.send (msg), but failed. I think connecting MQTT Broker is asynchronous action.

I was wondering which one is normal approach, creating fixed 1000 MQTT Brokers or using one Dynamic MQTT Broker?
For Dynamic MQTT Broker, how to perform node.send (msg) util node.send (connect_mqtt) is done without delay?

[
    {
        "id": "4a535d81376112c5",
        "type": "inject",
        "z": "cf96efa9aa607c83",
        "name": "disconnect",
        "props": [
            {
                "p": "action",
                "v": "disconnect",
                "vt": "str"
            }
        ],
        "repeat": "",
        "crontab": "",
        "once": false,
        "onceDelay": 0.1,
        "topic": "",
        "x": 640,
        "y": 600,
        "wires": [
            [
                "717f24e5cafad4d9"
            ]
        ]
    },
    {
        "id": "d43ecdecfe6b8d86",
        "type": "inject",
        "z": "cf96efa9aa607c83",
        "name": "Token1",
        "props": [
            {
                "p": "payload"
            }
        ],
        "repeat": "",
        "crontab": "",
        "once": false,
        "onceDelay": 0.1,
        "topic": "",
        "payload": "GW_ID:02345670,Token:m7IDjaC1k3zt7uWLq7CL,STAT:01000011,T:30.8,H:50.9%,ST:5M,V:3.61v,SN:234,RSSI:-52dBm,S:22.5769,E:113.9712,Time:0-0-0 0:0:0,T_RSSI:-99dBm",
        "payloadType": "str",
        "x": 130,
        "y": 600,
        "wires": [
            [
                "d8452361b330bb7d"
            ]
        ]
    },
    {
        "id": "d8452361b330bb7d",
        "type": "function",
        "z": "cf96efa9aa607c83",
        "name": "ParseInput",
        "func": "if ( msg.payload.substr(0, 5) === \"GW_ID\") {\n    const pairs = msg.payload.split(\",\");\n    const obj = pairs.map(e => {\n    const parts = e.split(\":\");\n    return [parts[0],parts[1]]\n    })\n    \n    msg.topic = \"v1/devices/me/telemetry\";\n    msg.payload = Object.fromEntries(obj);\n\n    global.set(\"dev_token\", msg.payload.Token);\n    global.set(\"dev_msg\", msg);\n    \n    return msg\n} else {\n    return;\n}",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 350,
        "y": 660,
        "wires": [
            [
                "7fcbf72d88c124a2"
            ]
        ]
    },
    {
        "id": "717f24e5cafad4d9",
        "type": "mqtt out",
        "z": "cf96efa9aa607c83",
        "name": "MQTT Broker",
        "topic": "",
        "qos": "",
        "retain": "",
        "respTopic": "",
        "contentType": "",
        "userProps": "",
        "correl": "",
        "expiry": "",
        "broker": "6ce3518c429a810e",
        "x": 920,
        "y": 680,
        "wires": []
    },
    {
        "id": "2ed8020662220727",
        "type": "inject",
        "z": "cf96efa9aa607c83",
        "name": "Token2",
        "props": [
            {
                "p": "payload"
            }
        ],
        "repeat": "",
        "crontab": "",
        "once": false,
        "onceDelay": 0.1,
        "topic": "",
        "payload": "GW_ID:02345670,Token:6gRUUBqw9137ja1auF8K,STAT:01000011,T:30.8,H:50.9%,ST:5M,V:3.61v,SN:234,RSSI:-52dBm,S:22.5769,E:113.9712,Time:0-0-0 0:0:0,T_RSSI:-99dBm",
        "payloadType": "str",
        "x": 130,
        "y": 660,
        "wires": [
            [
                "d8452361b330bb7d"
            ]
        ]
    },
    {
        "id": "7fcbf72d88c124a2",
        "type": "function",
        "z": "cf96efa9aa607c83",
        "name": "DynamicConnMQTT",
        "func": "var pstr = global.get(\"dev_token\");\nvar msg_mqtt = {\n    topic: \"v1/devices/me/telemetry\",\n    action: \"connect\",\n    broker: {\n        \"broker\": \"192.168.1.8\",\n        \"port\": 1883,\n        \"force\": true,\n        \"username\": pstr,\n        \"protocolVersion\": 5\n    }\n};\nnode.send (msg_mqtt);\n",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 600,
        "y": 660,
        "wires": [
            [
                "717f24e5cafad4d9"
            ]
        ]
    },
    {
        "id": "eb1fe95242c4cf15",
        "type": "inject",
        "z": "cf96efa9aa607c83",
        "name": "Token3",
        "props": [
            {
                "p": "payload"
            }
        ],
        "repeat": "",
        "crontab": "",
        "once": false,
        "onceDelay": 0.1,
        "topic": "",
        "payload": "GW_ID:02345670,Token:9saUF0WTot3LbCIamqhM,STAT:01000011,T:30.8,H:50.9%,ST:5M,V:3.61v,SN:234,RSSI:-52dBm,S:22.5769,E:113.9712,Time:0-0-0 0:0:0,T_RSSI:-99dBm",
        "payloadType": "str",
        "x": 130,
        "y": 720,
        "wires": [
            [
                "d8452361b330bb7d"
            ]
        ]
    },
    {
        "id": "c92576880cfc93cd",
        "type": "function",
        "z": "cf96efa9aa607c83",
        "name": "SendMQTT",
        "func": "msg = global.get(\"dev_msg\");\nreturn msg;\n",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 510,
        "y": 740,
        "wires": [
            [
                "ad967cdb73abfdc8"
            ]
        ]
    },
    {
        "id": "1c8ed8bfebe81ddf",
        "type": "complete",
        "z": "cf96efa9aa607c83",
        "name": "",
        "scope": [
            "7fcbf72d88c124a2"
        ],
        "uncaught": false,
        "x": 330,
        "y": 740,
        "wires": [
            [
                "c92576880cfc93cd"
            ]
        ]
    },
    {
        "id": "ad967cdb73abfdc8",
        "type": "delay",
        "z": "cf96efa9aa607c83",
        "name": "",
        "pauseType": "delay",
        "timeout": "100",
        "timeoutUnits": "milliseconds",
        "rate": "1",
        "nbRateUnits": "1",
        "rateUnits": "second",
        "randomFirst": "1",
        "randomLast": "5",
        "randomUnits": "seconds",
        "drop": false,
        "allowrate": false,
        "outputs": 1,
        "x": 710,
        "y": 740,
        "wires": [
            [
                "717f24e5cafad4d9"
            ]
        ]
    },
    {
        "id": "6ce3518c429a810e",
        "type": "mqtt-broker",
        "name": "TH3",
        "broker": "192.168.1.8",
        "port": "1883",
        "clientid": "",
        "autoConnect": false,
        "usetls": false,
        "protocolVersion": "5",
        "keepalive": "0",
        "cleansession": true,
        "birthTopic": "",
        "birthQos": "0",
        "birthPayload": "",
        "birthMsg": {},
        "closeTopic": "",
        "closeQos": "0",
        "closePayload": "",
        "closeMsg": {},
        "willTopic": "",
        "willQos": "0",
        "willPayload": "",
        "willMsg": {},
        "sessionExpiry": ""
    }
]

Thank you in advance for your help.

Hi.

I am not familiar with thingsboard but my first thought is what on earth? You have to specify a different user name for every device?

Usually, there is one broker (with appropriate security set) and you have 1000 topics that differentiate the sensors.

Sorry if that is of no help but it seems incredulous that is an actual requirement!

Thank you Steve, Thingsboard use device token differentiate sensor. And topic is fixed "v1/devices/me/telemetry".

Thingsboard calling format:
mosquitto_pub -d -q 1 -h "mqtt.thingsboard.cloud" -p "1883" -t "v1/devices/me/telemetry" -u "YOUR_ACCESS_TOKEN" -m {"temperature":25}

I dont see anything there about an access token per publish? Only one access token for your connection.

A single mqtt connection can have 1 or 1 million different topics sent WITHOUT using alternative credentials. Once the connection (with token) is established, just keep sending it data on 1 or more topics. Thats the norm!

Perhaps I am misunderstanding your usage? or even misunderstanding the thinkboard way of doing things?

1 Like

Thingsboard's access token is actually device token, not user token. Every publish associates with one device.

From you explanation I think it is proper approach with "A single mqtt connection handle 1 or 1 mill…” in Node-Red.

To simplify my problem is that NR use configurable topic and “fixed” MQTT Broker. But Thingsboard use “fixed” topic and configurable device token. Unfortunately, TB's device token mapped to MQTT Broker’s username in NR.

In order to avoid connection/disconnection before/after each MQTT messages, the MQTT nodes in nodered are maintaining the established connection.
So either at deployment or upon receiving the "connect" order MQTT out node will establish a unique connection where all messages with the same "constraints" (broker, user, pwd).
In this case, as the "user" will be different, there is no way for MQTT nodes in NR to maintain a unique connection.
So, it is either 1000 MQTT out nodes or an exec call to mosquitto_pub. Much less efficient but simpler considering the requirements.

1 Like

greengolfer, thank you for clarifying of NR MQTT connection. Finally by studying Thingsboard I found the solution. I can include sensor's ID in payload, thus only one MQTT out can solve it.

2 Likes

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.