Node-red-contrib-msg-queue - Only storing not forwarding w/ active connection

Thanks. It didn't occur to me that I had to 'state' the status explicitly - I thought queue node would read it automatically from mqtt out node.

Added the status node to queue and it started working fine. I thought there is nothing much to it hence no recent update. Thanks for pointing it out.

I checked out the queue-gate that is active with recent updates. This gives good flexibility and control. Well thought out queue-gate. Will look this option too while I would like it to be persistent.

the queue-gate node can be persistent.

The flow you are using is not perfect, in case you did not realise. If the queue node passes on a message to the MQTT node, but before it gets sent the connection fails, then that message may get lost.

True! I have updated the flow using q-gate, controlling the flow using the connection status as below. But the problem is at the time of checking the connection is alive but when it reaches the mqtt node the connection fails (based on keep alive) - then this message will be lost - quite similar to what you have mentioned with the previous flow. Is there a way handle this failure and add such messages also in the queue? Can this be eliminated without controlling mqtt-conneciton-keep-alive?

Regarding persistence, I tried enabling contextStorage in settings.js - but still I couldn't see any options under "Restore from saved state in" dropdown. Do I have to do anything different to enable persistence (beyond memory)?

contextStorage: {
        default: {

Normally one would configure settings.js to provide a persistent store and a non-persistent one. I guess that because you have only configured the persistent one then it doesn't offer you the dropdown, since there would only be that one to choose from.

Not only quite similar but identical. A simple flow like that cannot solve that problem. Do you need persistence across a reboot or just across node-red restart? If only across node-red restart there is a simpler solution.

Thanks again Colin. I have got both default & custom option listed now with two config modules under ContextStorage. May be standard practice and is implicit not quite obvious with the documentation. Context Store API : Node-RED

With regards to likely loss of data in this approach also: One possible approach I could think of is to
A. Allow the message always to persist
B. Allow the node to read from the persistent store based on the pointer and publish to mqtt.
C. Only when the message is successfully sent, update/move the pointer.

This way it can sustain restarts (of node or m/c).
Plz let me know whether this sounds round about/incorrect or there is already a easy way out implemented?

C. An alternate to pointer could be to delete when successful and always take the latest like how queue implementation is.

The problem is, how do you know when it has been successfully sent? Or, in fact, and this is the tricky bit, how do you know when it hasn't been successfully sent?

Give this a go. It uses the Guaranteed Delivery subflow from here with the logic to make it work with MQTT. Configure the MQTT node as required and send the messages to be sent in via the top left link node. It uses a Complete node attached to the MQTT node to know when the message has been sent successfully but there is no way to know when it has failed, so it uses a 5 second timeout to indicate that. When it fails it retries every 60 seconds. That can be configured in the subflow settings. Be aware though that you may get repeated messages when MQTT reconnects as the MQTT node may buffer up some messages and send them when the connection recovers. If repeated messages are a problem for you then you will have to deal with that at the receiving end. Also you may get warning messages about unexpected OK messages after a recovery. You can safely ignore those (or edit the function in the subflow to stop the warning).

[{"id":"149380c1.63e107","type":"subflow","name":"Delivery subflow master copy","info":"","category":"","in":[{"x":60,"y":80,"wires":[{"id":"6a3f78ab.f6b8e"}]}],"out":[{"x":420,"y":80,"wires":[{"id":"6a3f78ab.f6b8e","port":0}]}],"env":[{"name":"controlProperty","type":"str","value":"control","ui":{"label":{"en-US":"Property for OK or FAIL"},"type":"input","opts":{"types":["str","env"]}}},{"name":"OKValue","type":"str","value":"OK","ui":{"label":{"en-US":"Value of success"},"type":"input","opts":{"types":["str","num","bool","env"]}}},{"name":"FAILValue","type":"str","value":"FAIL","ui":{"label":{"en-US":"Value for failure"},"type":"input","opts":{"types":["str","num","bool","env"]}}},{"name":"retrySecs","type":"num","value":"60","ui":{"label":{"en-US":"Retry period (secs)"},"type":"input","opts":{"types":["num","env"]}}},{"name":"maxQueue","type":"num","value":"100","ui":{"label":{"en-US":"Max messages in queue"},"type":"input","opts":{"types":["str","num","env"]}}},{"name":"contextStore","type":"str","value":"default","ui":{"label":{"en-US":"Context Store to use"},"type":"input","opts":{"types":["str","env"]}}}],"color":"#DDAA99","status":{"x":420,"y":160,"wires":[{"id":"ed779289.25b5d8","port":0}]}},{"id":"6a3f78ab.f6b8e","type":"function","z":"149380c1.63e107","name":"State machine","func":"let store = env.get(\"contextStore\")\nif (store === \"default\") store = null\nlet stat = context.get(\"stat\", store) || {state: \"initial\", queue: []}\n// can't use a switch here I think as need to compare against env value\nconst control = msg[env.get(\"controlProperty\")]\n\nif (control === env.get(\"OKValue\")) {\n    handleOK(stat)\n} else if (control === env.get(\"FAILValue\")) {\n    handleFAIL(stat)\n} else if (control === \"__trigger\") {\n    handleTrigger(stat)\n} else {\n    // no valid control value so must be incoming message\n    handleMessage(msg, stat)\n}\n//node.warn(`state: ${stat.state}`)\n// decide what to do next based on the new state\nswitch (stat.state) {\n    case \"initial\":\n    case \"waitingForMsg\":\n        sendMsg(stat)   // send next message if any\n        break;\n        \n    case \"waitingForTrigger\":\n    case \"waitingForOKFail\":\n        // do nothing\n        break;\n}\nnode.status( `${stat.queue.length} ${stat.state}` )\ncontext.set(\"stat\", stat, store)\nreturn null;\n\n// Called when message to be queued is received\nfunction handleMessage(msg, stat) {\n    //node.warn(\"handleMessage\")\n    // push a clone onto the queue\n    stat.queue.push(RED.util.cloneMessage(msg))\n    // limit number in queue\n    const max = Number(env.get(\"maxQueue\"))\n    if (!isNaN(max) && max > 0) {\n        // max length hit, remove oldest\n        if (stat.queue.length > max) stat.queue.shift()\n    }\n    // Simulate a trigger event to handle any state change needed\n    handleTrigger(stat)\n}\n\n// Called to send the next message off the queue if any, but leaves it on queue\nfunction sendMsg(stat) {\n    //node.warn(\"sendMsg\")\n    let thisMsg = stat.queue[0]\n    if (thisMsg) {\n        // send a clone\n        //node.warn(\"sending\")\n        node.send(RED.util.cloneMessage(thisMsg))\n        stat.state = \"waitingForOKFail\"\n    } else {\n        // nothing in queue\n        stat.state = \"waitingForMsg\"\n    }\n}\n\n// Called when OK response received\nfunction handleOK(stat) {\n    //node.warn(\"handleOK\")\n    // ignore if in wrong state\n    if (stat.state === \"waitingForOKFail\") {\n        // OK received so drop the top message \n        stat.queue.shift()\n        // set the state to waiting for message, which will allow the next one to be sent\n        stat.state = \"waitingForMsg\"\n    } else {\n        node.warn(\"Ignoring unnexpected OK\")\n    }\n}\n\n// Called when FAIL response received\nfunction handleFAIL(stat) {\n    //node.warn(\"handleFAIL\")\n    // ignore if in wrong state\n    if (stat.state === \"waitingForOKFail\") {\n        // FAIL received so go to waiting for trigger state\n        stat.state = \"waitingForTrigger\"\n    } else {\n        node.warn(\"Ignoring unnexpected FAIL\")\n    }\n}\n\n// Called when a trigger message is received or after a new incoming message is queued\nfunction handleTrigger(stat) {\n    //node.warn(\"handleTrigger\")\n    if (stat.state === \"waitingForTrigger\") {\n        //node.warn(\"state to waiting\")\n        // set it to watitingForMsg in order to trigger send \n        stat.state = \"waitingForMsg\"\n    }\n    // ignore for other states\n}","outputs":1,"noerr":0,"initialize":"","finalize":"","x":280,"y":80,"wires":[[]]},{"id":"602725f6.15eee4","type":"inject","z":"149380c1.63e107","name":"Retry ","props":[{"p":"${controlProperty}","v":"__trigger","vt":"str"}],"repeat":"${retrySecs}","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":110,"y":180,"wires":[["6a3f78ab.f6b8e"]]},{"id":"ed779289.25b5d8","type":"status","z":"149380c1.63e107","name":"","scope":["6a3f78ab.f6b8e"],"x":300,"y":160,"wires":[[]]},{"id":"87d89aae.9b0db","type":"change","z":"85aecebd.19674","name":"FAIL","rules":[{"t":"set","p":"control","pt":"msg","to":"FAIL","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":650,"y":360,"wires":[["81c21dfd.874868"]]},{"id":"a12804d0.bbd0a","type":"change","z":"85aecebd.19674","name":"OK","rules":[{"t":"set","p":"control","pt":"msg","to":"OK","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":650,"y":460,"wires":[["81c21dfd.874868"]]},{"id":"81c21dfd.874868","type":"link out","z":"85aecebd.19674","name":"","links":["7ae5a5e5.be2f6c"],"x":775,"y":400,"wires":[]},{"id":"7ae5a5e5.be2f6c","type":"link in","z":"85aecebd.19674","name":"","links":["81c21dfd.874868"],"x":135,"y":320,"wires":[["7267f4ba.8104e4"]]},{"id":"fbd0d6c6.8eb88","type":"link in","z":"85aecebd.19674","name":"Email delivery","links":["2090b139.480446"],"x":75,"y":260,"wires":[["7267f4ba.8104e4"]]},{"id":"7267f4ba.8104e4","type":"subflow:149380c1.63e107","z":"85aecebd.19674","name":"Guaranteed delivery","env":[],"x":280,"y":280,"wires":[["3ba7e731.3b7e88","2df5a331.164ed4"]]},{"id":"1a0a6921.df8e9f","type":"comment","z":"85aecebd.19674","name":"Send MQTT messages to this link","info":"","x":150,"y":220,"wires":[]},{"id":"3ba7e731.3b7e88","type":"mqtt out","z":"85aecebd.19674","name":"","topic":"","qos":"1","retain":"","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"e3d45312.a3103","x":750,"y":280,"wires":[]},{"id":"2df5a331.164ed4","type":"trigger","z":"85aecebd.19674","name":"Timeout 5 secs","op1":"","op2":"0","op1type":"nul","op2type":"num","duration":"5","extend":false,"overrideDelay":false,"units":"s","reset":"","bytopic":"all","topic":"topic","outputs":1,"x":500,"y":360,"wires":[["87d89aae.9b0db"]]},{"id":"4255ad9.f2b01d4","type":"comment","z":"85aecebd.19674","name":"Adjust timeout time if necessary","info":"","x":550,"y":320,"wires":[]},{"id":"55d227e5.4520b","type":"complete","z":"85aecebd.19674","name":"MQTT Complete","scope":["3ba7e731.3b7e88"],"uncaught":false,"x":180,"y":460,"wires":[["a838d587.28a058","a12804d0.bbd0a"]]},{"id":"a838d587.28a058","type":"change","z":"85aecebd.19674","name":"","rules":[{"t":"set","p":"reset","pt":"msg","to":"true","tot":"bool"}],"action":"","property":"","from":"","to":"","reg":false,"x":270,"y":380,"wires":[["2df5a331.164ed4"]]},{"id":"e3d45312.a3103","type":"mqtt-broker","name":"","broker":"owl2.local","port":"1883","clientid":"tigger","usetls":false,"compatmode":true,"keepalive":"60","cleansession":false,"birthTopic":"","birthQos":"0","birthPayload":"","closeTopic":"","closeQos":"0","closePayload":"","willTopic":"","willQos":"0","willPayload":""}]

You may have found a bug in the q-gate node, but I have not been able to reproduce it. Assuming you have the latest version (1.5.2), what you should see when you deploy a new instance from the Palette is that the checkbox is not selected and the dropdown field contains the label "default". When you check the box, the dropdown will not be active since there are no other options, and the label will remain "default" since you have not assigned a name to the storage module. If you see anything different, please describe it as fully as you can so I can try to troubleshoot.

Thanks Colin - that's a quite detailed implementation and much obliged.

I took sometime to figure out what is going on and still have few pieces that I couldn't put together.

The subflow 'Guaranteed delivery' doesn't push/send message(payload) to the mqtt node - not sure I'd configured things right. Also I don't see the need for OK being connected to link out which is looped back to Guaranteed Delivery node again (Shouldn't OK be a terminus?). Btw, I am still trying to understanding the way MQTT Complete + msg.reset + timeout works (and I don't want to bother you for my ignorance here - let me figure it out with some reading)

Don't see any issue/bug with this. Once I added default & custom in the contextStorage, the dropdown showed both of 'em. Yes - I use 1.5.2.

1 Like

Attach a debug node showing what is going to the MQTT node, also add one showing what is coming out of the FAIL node and another showing what is coming out of the OK node. Give them names and set them to show Complete message. Then you will be able to see what is happening.

The guaranteed deilivery subflow passes one message at a time to the MQTT node and waits for an OK or FAIL message to be linked back to it so that it knows whether to pass the next one on or wait for a bit and then retry the first message again. Read the description in the link to the example I posted for more explanation. The fact that the subflow is showing waiting for OK/FAIL means that it has passed one message on and is waiting for the OK/FAIL to be passed back.
Did you change the settings for the subflow? You should not have changed the control property or the ok/fail values.

I see that it now has 300 messages queued, you might be better to disable the repeat on your Inject node until you have it working so you can more easily check it.

While playing with it an Inject node sending a message to the FAIL node might be useful, then if it gets stuck you can hit that to trigger another FAIL message to get it going again. Otherwise you will have to do a Restart Flows (which is at the bottom of the Deploy button dropdown) to restart it.


Got the issue resolved(based on your pointers). For some reason, the messages were stuck and it was keeping the rest of the messages in the queue from proceedings further. Even restart flow couldn't cleanup those message - may be it is in-memory. I change 'Context store to use' to use a fileStore and now it started to work. I am able to successfully reproduce this when I change it back to 'default' (in memory). I moved back to custom (filestore) as in-memory will never be my use-case.

I had debug messages - just not to confuse the original I removed all of them while taking screenshot. Obviously, I didn't modify anything else in the subflow earlier.

My config for storage - in case there is some problem ...

contextStorage: {
        default: {
                        cache: true,
                        flushInterval: 10

It should not matter which you use, though I realise that Restart Flows may not clear the queue and restart everything. If you change the context and then restart node-red then, with the debug nodes enabled feed in one message and see what happens. If that goes through correctly then send in another. If it gets stuck show me the debug output.

I enabled default (in-memory) and tried it was getting stuck. Post clearing (restarting), when I tried I could not reproduce. I tried few other things such as changing max queue size with items in queue adding few more when the target node is down etc...but it works as designed. Could not find out why the first in-memory queue was stuck. For now we can ignore this as there is no reproducer.

I have few questions regarding this

  • Is there a way the messages can go out of sequence with the timeout being 5 seconds and time between two messages will be considerable greater than 5 seconds (say >10s).? I see that it is queue/FIFO but still.
  • Can I set size as a limit instead of messages - (I can roughly calculate the size of each message and still deal with message count). Is there a way I can avoid setting a limit (say -1 to ignore ceiling)
  • Does it work any efficient if we have a DB/Sqlite kind of store if the queue grows?


No, (assuming I have code it right). It only releases one message at a time and waits for an OK response before removing that one from the queue and sending the next one. Any messages arriving during the timeout are added to the back end of the queue.

Not as currently coded. There is nothing to stop you amending the code to make it do that of course.

That is mentioned in the page on the flows site I originally linked to. Set it to 0 for no limit. Of course it if gets up to GigaBytes (or less on a machine with not so much memory) then you may run out of memory.

What do you mean by 'efficient'?
How many items are you thinking you might need to queue up and how big are they?

Colin - You are (embarrassingly) fast in responding and I'm quite obliged. _/_

On limits, it is quite unambiguous on your documentation. I missed. My bad.

For our IoT applications, the edge devices is in remote location and in an industrial environment where network connectivity and handling is a challenge. There are times where we have two-three days of no connectivity in which case it could be of order of few GBs(extreme case and support estimation).

In that case possibly an sqlite backed queue would be better so that it is not all in memory at once. The current implementation clones the complete message and puts it in the queue, which is a simple javascript array. Since you know you only need the payload and topic (possibly not even topic) someone who knows about sqlite and knows a bit of javascript should have no problem modifying the code to use a database backed queue instead.

This topic was automatically closed 60 days after the last reply. New replies are no longer allowed.