Detecting disconnection of MQTT out node

Did you read the description of the flow? If it can't send it then it automatically saves it in the context. All you have to do is to add the timestamp into the payload and pass it in. It will send it or store it and send it later.

I was not sure what "in context" means. I will give a try, as I said. Anyway, when copy JSON, delivery subflow it is not copied.

Do you mean that when you import it it is not imported?

Yes, I meant that but I noticed that it is accessed via "edit subflow template".

You don't need to touch that unless you want to see how it works. The parameters are setup by opening the instance of the subflow that you see in the editor.

Presumably you are a newcomer to node red. In that case I recommend watching this playlist: Node-RED Essentials. The videos are done by the developers of node-red. They're nice & short and to the point. You will understand a whole lot more in about 1 hour. A small investment for a lot of gain.

In particular, for context, see Working with context : Node-RED

Thanks for your help and your time. Now it is working and, at least, it has been able to cover three minutes. I will check a typical cut of 30 minutes. Let's see but I think I must change the limit of 100 messages in queue.
This doesn't help me to know what has changed but seems to make it work again.

You can set it to whatever you like. If you set it to zero then it will not be limited (until you run out of memory which will be a very long time at three messages/minute).

Three messages per second

Yes, that's what I meant.

It works perfectly. As share of curiosity, every time I make a cutoff, it recovers almost all data but 75-95 seconds are lost. It's not critical but...
Another issue is the behavior at recovering: sometimes it is immediate; others, after connection is on again, I see "waiting for OKFail" increasing long time before it starts to decrease.

Do you mean it is losing messages? That should not happen. Do you see the number of messages shown under the subflow increasing as soon as the network is disconnected?

What have you set the Retry Period to in the subflow settings?

What have you set Max Messages in queue to?

What timeout have you got in the Trigger node?

Have you changed anything in the subflow?

I don't understand what you mean by that.

First of all, bad news: It is not enought to support a 15 hours cutoff. Analizing the local data, it seems Raspberry runs out of memory 12 hours later.

Your questions:

After a cutoff I can check the data available in the remote server and it recovers the whole gap but those first seconds.

60 seconds in each node. Should I have to change it in the input node in the Delivery subflow?

0 in each "guaranteed delivery" node. It remains at 100 in the subflow

5 seconds

Nothing at all


In the image you can see the message "0 waitingForMsg". After a cutoff I can see "423 waiting OKFail" and quickly decrease, but sometimes, this number keep on growing for minutes although connection is recovered (I'm accesing remotely to it)

You did not answer the question: Do you see the number of messages shown under the subflow increasing as soon as the network is disconnected?
The number shown is the number of messages in the queue.
Is that number increasing at the expected rate, 2 per second?

The flow does save the complete message, so if there is other stuff in the message that may be a factor in how long it lasts. Try adding a function node containing this in front of the subflow (or incorporate in your existing function 11 etc.

return {topic: msg.topic, payload: msg.payload}

That will ensure that only what you need is in the message. In fact, if you are setting the topic in the MQTT out node then you don't even need msg.topic.

What sort of pi is it? How much RAM?
Can you show me the node red startup log please. Assuming that you installed node-red using the pi install script then you can get the log by running
node-red-stop
node-red-start
Copy/paste the log here rather than screenshot please.

I've never checked that. Simply I switch off connections for a while, and when they are back again and I connect through Chrome on my computer, I see a number other than 0, so I assume it works. I've just tried it but with a Chrome tab opened locally and these number did not increase at all in four minutes so four minutes data lost. Iā€™ve repeated it again but with no Chrome tab opened locally and, back again, only around 90 seconds lost but 2 and half minutes recovered.

I need the complete message. It is a string of 70 numerical values and timestamp.

Rasp Pi 4 with 2 Gb RAM

gesinne@raspberrypi:~ $ node-red-stop
Stop Node-RED

Use   node-red-start   to start Node-RED again

gesinne@raspberrypi:~ $ node-red-start

Start Node-RED

Once Node-RED has started, point a browser at http://192.168.0.129:1880
On Pi Node-RED works better with the Firefox or Chrome browser

Use   node-red-stop                          to stop Node-RED
Use   node-red-start                         to start Node-RED again
Use   node-red-log                           to view the recent log output
Use   sudo systemctl enable nodered.service  to autostart Node-RED at every boot
Use   sudo systemctl disable nodered.service to disable autostart on boot

To find more nodes and example flows - go to http://flows.nodered.org

Starting as a systemd service.
11 Jan 10:56:56 - [info]
Welcome to Node-RED
===================
11 Jan 10:56:56 - [info] Node-RED version: v3.1.3
11 Jan 10:56:56 - [info] Node.js  version: v18.19.0
11 Jan 10:56:56 - [info] Linux 6.1.21-v8+ arm LE
11 Jan 10:56:58 - [info] Loading palette nodes
11 Jan 10:57:00 - [info] Dashboard version 3.6.2 started at /ui
11 Jan 10:57:01 - [info] Settings file  : /home/gesinne/.node-red/settings.js
11 Jan 10:57:01 - [info] Context store  : 'default' [module=memory]
11 Jan 10:57:01 - [info] User directory : /home/gesinne/.node-red
11 Jan 10:57:01 - [warn] Projects disabled : editorTheme.projects.enabled=false
11 Jan 10:57:01 - [info] Flows file     : /home/gesinne/.node-red/flows.json
11 Jan 10:57:01 - [info] Server now running at http://127.0.0.1:1880/
11 Jan 10:57:01 - [info] Starting flows
11 Jan 10:57:02 - [info] Started flows
11 Jan 10:57:03 - [warn] [modbus-flex-getter:Tarjeta1] Flex-Getter -> Inject while node is not ready for input.
11 Jan 10:57:03 - [info] [mqtt-broker:Elastic] Connected to broker: mqtt://52.169.183.190:1883
11 Jan 10:57:03 - [warn] [modbus-flex-getter:Tarjeta2] Flex-Getter -> Inject while node is not ready for input.
11 Jan 10:57:03 - [warn] [modbus-flex-getter:Tarjeta3] Flex-Getter -> Inject while node is not ready for input.

My try: I've just increased --max-old-space-size=1024 and set contextStorage module: "localfilesystem"
Let's see.

Are you confident that you have linked each MQTT Complete nodes to the correct MQTT node?

Well that is not right. Please add a debug node connected to the OK node and another connected to the FAIL node. Name them appropriately to make it easier to follow what happens. Configure them both to Output to the Console (which means they will appear in the node-red log). Run node-red-log so that you can see the log output and you should see the messages coming from the OK node. Then disconnect the network and see what happens. Watch what the queue count does too. After 10 seconds connect the network back up again.
Then copy the section of the log from just before the network disconnection to after it has recovered and paste it here.

Also please select the Guaranteed Delivery node, Export it and paste it here.

You need the complete payload, which is that string. You don't need any other message properties that the message may have picked up along the way. To see what else is there, feed the message into a Debug node and set the debug node to Output Complete Message. You may be surprised.

Yes I am.
First tests: I've attached,as you suggested, six debug nodes to the six change nodes. The ones attached to OK change nodes (debug 25, 26 and 27) simply show the MQTT published string. The ones attached to FAIL (debug 22, 23 and 24) show "0" about a minute after disconnection. Dates for next log: disconnection at 17:21:00 reconnection at 17:22:24. Remote access to get the log at 17:22:58. It "realizes" that he is already connected at 17:23:20.

11 Jan 17:22:12 - [info] [debug:debug 25] 1704990132,00001,140,0,2,0,0,21798,23440,5000,276,171,181,0,59084,0,11796,0,60250,98,1,195,550,0,0,0,0,0,0,0,0,0,0,0,43981,2,2180,1,516,0,0,0,0,0,47818,360,2400,2310,3500,230,2,22,1,200,15,140,263,30,64,58512,2190,550,100,0,0,0,0,179,179,5,0
11 Jan 17:22:12 - [info] [debug:debug 26] 1704990133,00001,140,0,2,0,0,21800,23763,5000,99,90,65,0,14345,0,16183,0,21626,66,1,200,550,0,0,0,0,0,0,0,0,0,0,0,43981,2,2180,1,287,0,0,0,0,0,0,166,2400,2310,3500,230,2,22,2,200,15,140,263,30,52,59328,53696,550,100,0,61072,0,0,179,179,5,0
11 Jan 17:22:12 - [info] [debug:debug 27] 1704990133,00001,140,0,2,0,0,21795,23671,5000,47,50,30,0,9717,0,3243,0,10244,94,1,230,550,0,0,0,0,0,0,0,0,0,0,0,43981,2,2180,1,331,0,0,0,0,0,0,137,2400,2310,3500,230,2,22,3,200,15,140,263,30,103,2,2190,550,100,0,0,0,0,179,179,5,0
11 Jan 17:22:13 - [info] [debug:debug 25] 1704990133,00001,140,0,2,0,0,21798,23440,5000,276,171,181,0,59084,0,11796,0,60250,98,1,195,550,0,0,0,0,0,0,0,0,0,0,0,43981,2,2180,1,523,0,0,0,0,0,47818,360,2400,2310,3500,230,2,22,1,200,15,140,263,30,64,58512,2190,550,100,0,0,0,0,179,179,5,0
11 Jan 17:22:13 - [info] [debug:debug 26] 1704990133,00001,140,0,2,0,0,21800,23755,5000,99,90,65,0,14345,0,16183,0,21626,66,1,200,550,0,0,0,0,0,0,0,0,0,0,0,43981,2,2180,1,296,0,0,0,0,0,0,166,2400,2310,3500,230,2,22,2,200,15,140,263,30,52,59328,53696,550,100,0,61072,0,0,179,179,5,0
11 Jan 17:22:13 - [info] [debug:debug 27] 1704990134,00001,140,0,2,0,0,21795,23671,5000,47,50,30,0,9717,0,3243,0,10244,94,1,230,550,0,0,0,0,0,0,0,0,0,0,0,43981,2,2180,1,331,0,0,0,0,0,0,137,2400,2310,3500,230,2,22,3,200,15,140,263,30,103,2,2190,550,100,0,0,0,0,179,179,5,0
11 Jan 17:22:18 - [info] [debug:debug 22] 0
11 Jan 17:22:19 - [info] [debug:debug 23] 0
11 Jan 17:22:19 - [info] [debug:debug 24] 0
11 Jan 17:22:24 - [info] [debug:debug 22] 0
11 Jan 17:22:24 - [info] [debug:debug 23] 0
11 Jan 17:22:24 - [info] [debug:debug 24] 0
11 Jan 17:22:29 - [info] [debug:debug 22] 0
11 Jan 17:22:30 - [info] [debug:debug 23] 0
11 Jan 17:22:30 - [info] [debug:debug 24] 0
11 Jan 17:22:31 - [warn] [modbus-flex-getter:Tarjeta3] Flex-Getter -> Inject while node is not ready for input.
11 Jan 17:22:35 - [info] [debug:debug 22] 0
11 Jan 17:22:35 - [info] [debug:debug 23] 0
11 Jan 17:22:35 - [info] [debug:debug 24] 0
11 Jan 17:22:36 - [warn] [modbus-flex-getter:Tarjeta1] Flex-Getter -> Inject while node is not ready for input.
11 Jan 17:22:36 - [warn] [modbus-flex-getter:Tarjeta2] Flex-Getter -> Inject while node is not ready for input.
11 Jan 17:22:40 - [info] [debug:debug 22] 0
11 Jan 17:22:40 - [info] [debug:debug 23] 0
11 Jan 17:22:41 - [info] [debug:debug 24] 0
11 Jan 17:22:46 - [info] [debug:debug 22] 0
11 Jan 17:22:46 - [info] [debug:debug 23] 0
11 Jan 17:22:47 - [info] [debug:debug 24] 0
11 Jan 17:22:51 - [info] [debug:debug 22] 0
11 Jan 17:22:51 - [info] [debug:debug 23] 0
11 Jan 17:22:52 - [info] [debug:debug 24] 0
11 Jan 17:22:57 - [info] [debug:debug 22] 0
11 Jan 17:22:57 - [info] [debug:debug 23] 0
11 Jan 17:22:58 - [info] [debug:debug 24] 0
11 Jan 17:23:02 - [info] [debug:debug 22] 0
11 Jan 17:23:02 - [info] [mqtt-broker:Elastic] Disconnected from broker: mqtt://52.169.183.190:1883
11 Jan 17:23:02 - [info] [debug:debug 23] 0
11 Jan 17:23:03 - [warn] [modbus-flex-getter:Tarjeta1] Flex-Getter -> Inject while node is not ready for input.
11 Jan 17:23:03 - [warn] [modbus-flex-getter:Tarjeta2] Flex-Getter -> Inject while node is not ready for input.
11 Jan 17:23:03 - [info] [debug:debug 24] 0
11 Jan 17:23:09 - [info] [debug:debug 22] 0
11 Jan 17:23:09 - [info] [debug:debug 23] 0
11 Jan 17:23:09 - [info] [debug:debug 24] 0
11 Jan 17:23:14 - [info] [debug:debug 22] 0
11 Jan 17:23:14 - [info] [debug:debug 23] 0
11 Jan 17:23:15 - [info] [debug:debug 24] 0
11 Jan 17:23:17 - [info] [mqtt-broker:Elastic] Connected to broker: mqtt://52.169.183.190:1883
11 Jan 17:23:20 - [info] [debug:debug 22] 0
11 Jan 17:23:20 - [info] [debug:debug 23] 0
11 Jan 17:23:20 - [info] [debug:debug 24] 0
11 Jan 17:23:20 - [info] [debug:debug 25] 1704990134,00001,140

[
    {
        "id": "149380c1.63e107",
        "type": "subflow",
        "name": "Delivery subflow",
        "info": "",
        "category": "",
        "in": [
            {
                "x": 50,
                "y": 81,
                "wires": [
                    {
                        "id": "62560ae6ab8737dc"
                    }
                ]
            }
        ],
        "out": [
            {
                "x": 420,
                "y": 80,
                "wires": [
                    {
                        "id": "62560ae6ab8737dc",
                        "port": 0
                    }
                ]
            },
            {
                "x": 420,
                "y": 160,
                "wires": [
                    {
                        "id": "ed779289.25b5d8",
                        "port": 0
                    }
                ]
            }
        ],
        "env": [
            {
                "name": "controlProperty",
                "type": "str",
                "value": "control",
                "ui": {
                    "label": {
                        "en-US": "Property for OK or FAIL"
                    },
                    "type": "input",
                    "opts": {
                        "types": [
                            "str",
                            "env"
                        ]
                    }
                }
            },
            {
                "name": "OKValue",
                "type": "str",
                "value": "OK",
                "ui": {
                    "label": {
                        "en-US": "Value of success"
                    },
                    "type": "input",
                    "opts": {
                        "types": [
                            "str",
                            "num",
                            "bool",
                            "env"
                        ]
                    }
                }
            },
            {
                "name": "FAILValue",
                "type": "str",
                "value": "FAIL",
                "ui": {
                    "label": {
                        "en-US": "Value for failure"
                    },
                    "type": "input",
                    "opts": {
                        "types": [
                            "str",
                            "num",
                            "bool",
                            "env"
                        ]
                    }
                }
            },
            {
                "name": "retrySecs",
                "type": "num",
                "value": "60",
                "ui": {
                    "label": {
                        "en-US": "Retry period (secs)"
                    },
                    "type": "input",
                    "opts": {
                        "types": [
                            "num",
                            "env"
                        ]
                    }
                }
            },
            {
                "name": "maxQueue",
                "type": "num",
                "value": "100",
                "ui": {
                    "label": {
                        "en-US": "Max messages in queue"
                    },
                    "type": "input",
                    "opts": {
                        "types": [
                            "str",
                            "num",
                            "env"
                        ]
                    }
                }
            },
            {
                "name": "contextStore",
                "type": "str",
                "value": "default",
                "ui": {
                    "label": {
                        "en-US": "Context Store to use"
                    },
                    "type": "input",
                    "opts": {
                        "types": [
                            "str",
                            "env"
                        ]
                    }
                }
            }
        ],
        "color": "#DDAA99",
        "status": {
            "x": 420,
            "y": 160,
            "wires": [
                {
                    "id": "ed779289.25b5d8",
                    "port": 0
                }
            ]
        }
    },
    {
        "id": "602725f6.15eee4",
        "type": "inject",
        "z": "149380c1.63e107",
        "name": "Retry ",
        "props": [
            {
                "p": "${controlProperty}",
                "v": "__trigger",
                "vt": "str"
            }
        ],
        "repeat": "${retrySecs}",
        "crontab": "",
        "once": false,
        "onceDelay": 0.1,
        "topic": "",
        "x": 100,
        "y": 181,
        "wires": [
            [
                "62560ae6ab8737dc"
            ]
        ]
    },
    {
        "id": "ed779289.25b5d8",
        "type": "status",
        "z": "149380c1.63e107",
        "name": "",
        "scope": [
            "62560ae6ab8737dc"
        ],
        "x": 300,
        "y": 160,
        "wires": [
            []
        ]
    },
    {
        "id": "62560ae6ab8737dc",
        "type": "function",
        "z": "149380c1.63e107",
        "name": "State machine",
        "func": "let store = env.get(\"contextStore\")\nif (store === \"default\") store = null\nlet stat = context.get(\"stat\", store) || {state: \"initial\", queue: []}\n// can't use a switch here I think as need to compare against env value\nconst control = msg[env.get(\"controlProperty\")]\n\nif (control === env.get(\"OKValue\")) {\n    handleOK(stat)\n} else if (control === env.get(\"FAILValue\")) {\n    handleFAIL(stat)\n} else if (control === \"__trigger\") {\n    handleTrigger(stat)\n} else {\n    // no valid control value so must be incoming message\n    handleMessage(msg, stat)\n}\n//node.warn(`state: ${stat.state}`)\n// decide what to do next based on the new state\nswitch (stat.state) {\n    case \"initial\":\n    case \"waitingForMsg\":\n        sendMsg(stat)   // send next message if any\n        break;\n        \n    case \"waitingForTrigger\":\n    case \"waitingForOKFail\":\n        // do nothing\n        break;\n}\nnode.status( `${stat.queue.length} ${stat.state}` )\ncontext.set(\"stat\", stat, store)\nreturn null;\n\n// Called when message to be queued is received\nfunction handleMessage(msg, stat) {\n    //node.warn(\"handleMessage\")\n    // push a clone onto the queue\n    stat.queue.push(RED.util.cloneMessage(msg))\n    // limit number in queue\n    const max = Number(env.get(\"maxQueue\"))\n    if (!isNaN(max) && max > 0) {\n        // max length hit, remove oldest\n        if (stat.queue.length > max) stat.queue.shift()\n    }\n    // Simulate a trigger event to handle any state change needed\n    handleTrigger(stat)\n}\n\n// Called to send the next message off the queue if any, but leaves it on queue\nfunction sendMsg(stat) {\n    //node.warn(\"sendMsg\")\n    let thisMsg = stat.queue[0]\n    if (thisMsg) {\n        // send a clone\n        //node.warn(\"sending\")\n        node.send(RED.util.cloneMessage(thisMsg))\n        stat.state = \"waitingForOKFail\"\n    } else {\n        // nothing in queue\n        stat.state = \"waitingForMsg\"\n    }\n}\n\n// Called when OK response received\nfunction handleOK(stat) {\n    //node.warn(\"handleOK\")\n    // ignore if in wrong state\n    if (stat.state === \"waitingForOKFail\") {\n        // OK received so drop the top message \n        stat.queue.shift()\n        // set the state to waiting for message, which will allow the next one to be sent\n        stat.state = \"waitingForMsg\"\n    } else {\n        node.warn(\"Ignoring unnexpected OK\")\n    }\n}\n\n// Called when FAIL response received\nfunction handleFAIL(stat) {\n    //node.warn(\"handleFAIL\")\n    // ignore if in wrong state\n    if (stat.state === \"waitingForOKFail\") {\n        // FAIL received so go to waiting for trigger state\n        stat.state = \"waitingForTrigger\"\n    } else {\n        node.warn(\"Ignoring unnexpected FAIL\")\n    }\n}\n\n// Called when a trigger message is received or after a new incoming message is queued\nfunction handleTrigger(stat) {\n    //node.warn(\"handleTrigger\")\n    if (stat.state === \"waitingForTrigger\") {\n        //node.warn(\"state to waiting\")\n        // set it to watitingForMsg in order to trigger send \n        stat.state = \"waitingForMsg\"\n    }\n    // ignore for other states\n}",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "x": 280,
        "y": 80,
        "wires": [
            []
        ]
    },
    {
        "id": "34b76192fa82bed4",
        "type": "subflow:149380c1.63e107",
        "z": "6125fd6fe9bb084b",
        "name": "Guaranteed delivery",
        "env": [
            {
                "name": "maxQueue",
                "value": "0",
                "type": "num"
            }
        ],
        "x": 1660,
        "y": 960,
        "wires": [
            [
                "0866631e9e6da383",
                "436b59be231781a9",
                "f8a6fb773e53581a"
            ],
            []
        ]
    }
]

I am truly surprised :woozy_face: topic, message ID, payload, modbus requests, arrays....
So instead of return msg I should try return {topic: msg.topic, payload: msg.payload} to keep the message "clean", shouldn't I

1 Like

Yes.

I have replicated the problem here. It seems that an update to the MQTT node has broken the flow. There seems to be a change to the way it signals Done when the network is disconnected.

I will investigate further.