Possible to listen to debug console and wait until no more messages?

Hi everyone,
I am sending a bunch of MQTT messages using Node-RED and would like to make sure that each new batch is delayed until the previous batch is finished.

I tried with a manual delay

msg.payload = "";
msg.topic = "";
msg.selectRange = "Ax:Ay";

// Output to output 1
msg.setting = "ip";
node.send([msg, null, null, null, null, null, null, null, null, null]);

// Delay and output to output 2
setTimeout(() => {
    msg.setting = "template";
    node.send([null, msg, null, null, null, null, null, null, null]);
}, 10000); // Delay in milliseconds (10 seconds)

// Delay and output to output 3
setTimeout(() => {
    msg.setting = "settings";
    node.send([null, null, msg, null, null, null, null, null]);
}, 20000); // Delay in milliseconds (20 seconds)

// Delay and output to output 4
setTimeout(() => {
    msg.setting = "timers";
    node.send([null, null, null, msg, null, null, null, null]);
}, 30000); // Delay in milliseconds (30 seconds)

// Delay and output to output 5
setTimeout(() => {
    msg.setting = "rule1";
    node.send([null, null, null, null, msg, null, null, null]);
}, 35000); // Delay in milliseconds (40 seconds)

// Delay and output to output 6
setTimeout(() => {
    msg.setting = "rule2";
    node.send([null, null, null, null, null, msg, null, null]);
}, 40000); // Delay in milliseconds (50 seconds)

// Delay and output to output 7
setTimeout(() => {
    msg.setting = "rule3";
    node.send([null, null, null, null, null, null, msg, null]);
}, 45000); // Delay in milliseconds (60 seconds)

// Delay and output to output 8
setTimeout(() => {
    msg.setting = "rule_commands";
    node.send([null, null, null, null, null, null, null, msg]);
}, 50000); // Delay in milliseconds (70 seconds)

// Delay and output to output 9
setTimeout(() => {
    msg.setting = "custom";
    node.send([null, null, null, null, null, null, null, null, msg]);
}, 55000); // Delay in milliseconds (80 seconds)

// // Delay and output to output 10
// setTimeout(() => {
//     msg.setting = "custom";
//     node.send([null, null, null, null, null, null, null, null, null, msg]);
// }, 65000); // Delay in milliseconds (90 seconds)

But depending on how many steps there are to process during each batch, this delay can be too long (wasting time) or too short (recipient might not be listening).

Is it possible to somehow delay unitl 5 seconds after the last debug message was observed? So if there are no new debug messages for 5 seconds, send the next msg to the next output?

Thank you all
Alex

If I have your request right, you can use my Semaphore Node.
which allows exclusivity inside a flow plus some advanced controls (like message priority)

@dceejay & @Colin
Also created a variant (before mine) using native nodes (Just I can never remember where to find it)

Note sure if the 2 possibilities are what you need, but still - seems like it might help

Note: I don't do MQTT (Much), so you know.... my suggestions might not be on par with what you need.

2 Likes

This post shows how to run tasks sequentially using standard nodes.

3 Likes

I like using the default nodes, if possible :slight_smile:

I tried the linked node combination and it looks very promising.
The only downside is that I cannot decide that certain topics are handled faster.

[{"id":"b6630ded2db7d680","type":"inject","z":"bdd7be38.d3b55","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":140,"y":480,"wires":[["ed63ee4225312b40"]]},{"id":"ed63ee4225312b40","type":"delay","z":"bdd7be38.d3b55","name":"Queue","pauseType":"rate","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"minute","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":310,"y":480,"wires":[["d4d479e614e82a49","7eb760e019b512dc"]]},{"id":"a82c03c3d34f683c","type":"delay","z":"bdd7be38.d3b55","name":"Some more stuff to do","pauseType":"delay","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":800,"y":480,"wires":[["7c6253e5d34769ac","b23cea1074943d4d"]]},{"id":"2128a855234c1016","type":"link in","z":"bdd7be38.d3b55","name":"link in 1","links":["7c6253e5d34769ac"],"x":95,"y":560,"wires":[["3a9faf0a95b4a9bb"]]},{"id":"7c6253e5d34769ac","type":"link out","z":"bdd7be38.d3b55","name":"link out 1","mode":"link","links":["2128a855234c1016"],"x":665,"y":560,"wires":[]},{"id":"b23cea1074943d4d","type":"debug","z":"bdd7be38.d3b55","name":"OUT","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":670,"y":420,"wires":[]},{"id":"d4d479e614e82a49","type":"debug","z":"bdd7be38.d3b55","name":"IN","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":450,"y":420,"wires":[]},{"id":"3a9faf0a95b4a9bb","type":"function","z":"bdd7be38.d3b55","name":"Flush","func":"return {flush: 1}","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":190,"y":560,"wires":[["ed63ee4225312b40"]]},{"id":"7eb760e019b512dc","type":"function","z":"bdd7be38.d3b55","name":"Some functions to be performed","func":"\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":550,"y":480,"wires":[["a82c03c3d34f683c"]]},{"id":"e35f37deeae94860","type":"comment","z":"bdd7be38.d3b55","name":"Set the queue timeout to larger than you ever expect the process to take","info":"This is a simple flow which allows a sequence of nodes to be \nprotected so that only one message is allowed in at a time. \nIt uses a Delay node in Rate Limit mode to queue them, but \nreleases them, using the Flush mechanism, as soon as the \nprevious one is complete. Set the timeout in the delay node to \na value greater than the maximum time you expect it ever to take. \nIf for some reason the flow locks up (a message fails to indicate \ncompletion) then the next message will be released after that time.\n\nMake sure that you trap any errors and feed back to the Flush \nnode when you have handled the error. Also make sure only one \nmessage is fed back for each one in, even in the case of errors.","x":270,"y":360,"wires":[]}]

So if you look at these nodes, then I can either trigger each topic separetely or all at once.

I would like to send all messages of the same topic without queuing and only queue if the topic changes.

So basically, all messages output on output 1 of the "start all" function node should be run in parallel but output 2 should be queued, output 3 queued etc.
And then each outputs messages should be sent once the previous output has been executed completely.

Not sure if that was clear or not.

Example:
Output 1 sends 5 messages. Then all 5 should be completed before output 2 messages are allowed to pass. Once that is allowed, output 3 must wait until all 5 of output 2 have completed. And so on.

What are all these operations that have to be queued like that?

Each topic changes a different config setting of a device. So "ip" will change the ip address of every device connected via MQTT.
As that will cause the device to reboot, the next change (e.g. "template" or "settings") must not be executed for around 10 seconds (otherwise the device will not have finished rebooting and won't receive the command).
On the other hand, all the different payloads bundled as ip will target different devices, so within "ip" or "settings" there is no need to wait.

So all "ip" can run in parallel, all "settings" can run in parallel etc.
But "settings" must not run until all "ip" have finished (plus an additional 10 second delay).

This waits 10 seconds when the incoming topic changes, but not when the topic stays the same. You could make the function node a little more complex if you only wanted to wait after ip messages rather than whenever the topic changes.

[{"id":"45153a8be9284a10","type":"group","z":"bdd7be38.d3b55","name":"Sequential Operation with delay after topic change","style":{"label":true},"nodes":["420251879eef0c8c","e7233d5998956aa0","fb0ce3b38c2ecf10","1aa333ad433925d1","9c6ce60eee43f7e9","93ceb941a4ee7079","06fae80a2c362937","0157a86787276e3f","b168e6f45e4f7f44","92e5726d16da6451"],"x":14,"y":3719,"w":1292,"h":262},{"id":"420251879eef0c8c","type":"inject","z":"bdd7be38.d3b55","g":"45153a8be9284a10","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"topic/1","payload":"topic 1 value","payloadType":"str","x":150,"y":3820,"wires":[["e7233d5998956aa0"]]},{"id":"e7233d5998956aa0","type":"delay","z":"bdd7be38.d3b55","g":"45153a8be9284a10","name":"Queue","pauseType":"rate","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"minute","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":330,"y":3860,"wires":[["0157a86787276e3f"]]},{"id":"fb0ce3b38c2ecf10","type":"delay","z":"bdd7be38.d3b55","g":"45153a8be9284a10","name":"Topic changed, wait 10 secs before sending to MQTT","pauseType":"delay","timeout":"10","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":920,"y":3900,"wires":[["9c6ce60eee43f7e9","93ceb941a4ee7079"]]},{"id":"1aa333ad433925d1","type":"link in","z":"bdd7be38.d3b55","g":"45153a8be9284a10","name":"link in 14","links":["9c6ce60eee43f7e9"],"x":75,"y":3940,"wires":[["06fae80a2c362937"]]},{"id":"9c6ce60eee43f7e9","type":"link out","z":"bdd7be38.d3b55","g":"45153a8be9284a10","name":"link out 35","mode":"link","links":["1aa333ad433925d1"],"x":1205,"y":3940,"wires":[]},{"id":"93ceb941a4ee7079","type":"debug","z":"bdd7be38.d3b55","g":"45153a8be9284a10","name":"Send to MQTT","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":1180,"y":3840,"wires":[]},{"id":"06fae80a2c362937","type":"function","z":"bdd7be38.d3b55","g":"45153a8be9284a10","name":"Flush","func":"return {flush: 1}","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":170,"y":3940,"wires":[["e7233d5998956aa0"]]},{"id":"0157a86787276e3f","type":"function","z":"bdd7be38.d3b55","g":"45153a8be9284a10","name":"Topic same as last time?","func":"let lastTopic = context.get(\"lastTopic\") ?? msg.topic\n// default to sending msg to output 1\nlet messages = [msg, null]\nif (msg.topic !== lastTopic) {\n    // topic change so send to output 2\n    messages = [null, msg]\n}\ncontext.set(\"lastTopic\", msg.topic)\nreturn messages;","outputs":2,"timeout":"","noerr":0,"initialize":"","finalize":"","libs":[],"x":550,"y":3860,"wires":[["93ceb941a4ee7079","9c6ce60eee43f7e9"],["fb0ce3b38c2ecf10"]]},{"id":"b168e6f45e4f7f44","type":"comment","z":"bdd7be38.d3b55","g":"45153a8be9284a10","name":"Set the queue timeout to larger than you ever expect the process to take","info":"This is a simple flow which allows a sequence of nodes to be \nprotected so that only one message is allowed in at a time. \nIt uses a Delay node in Rate Limit mode to queue them, but \nreleases them, using the Flush mechanism, as soon as the \nprevious one is complete. Set the timeout in the delay node to \na value greater than the maximum time you expect it ever to take. \nIf for some reason the flow locks up (a message fails to indicate \ncompletion) then the next message will be released after that time.\n\nMake sure that you trap any errors and feed back to the Flush \nnode when you have handled the error. Also make sure only one \nmessage is fed back for each one in, even in the case of errors.","x":290,"y":3760,"wires":[]},{"id":"92e5726d16da6451","type":"inject","z":"bdd7be38.d3b55","g":"45153a8be9284a10","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"topic/2","payload":"topic 2 value","payloadType":"str","x":150,"y":3860,"wires":[["e7233d5998956aa0"]]}]
1 Like

That looks very cool, thank you.

However, it does not work if multiple different topics are being sent at the same time. When that happens, they all get sent to the "topic changed" node and then, after 10 s, all of them get released in parallel.

So,

  • topic 1 is sent to output 1 => MQTT
  • topic 2 is sent to output 2 => 10 s delay
  • topic 3 is sent to output 2 => 10 s delay
  • topic 4,5, 6 .... are sent to output 2 => 10 s delay
    Hence, topic 2, 3... are all then sent to MQTT after 10 s.

chrome_RDCYE03gh6

There is no such thing as 'at the same time'. You said

which is what that flow does. You could batch the messages together so all the messages with one topic are sent before the messages for the next topic. You could do that in your Start All function by just having one output and sending all the topic1 messages, then the topic2 messages and so on.

It does queue if the following topics are not the same. But it only does this once, i.e. if there are 10 different topics with 5 messages each, then the first 5 of topic 1 will be sent directly and the other 45 will be queued.
So far so good, but then the flushing causes the remaining 45 messages (from 9 different topics) to be flushed at the same time.

So the flow only accounts for the first and second topic, but not correctly for topics 3-x.

I think the "Topic changed" delay node should somehow be routed back into the "topic unchanged" function node.

I cannot batch the messages because the number of messages per topic are determined later in the flow. So I only send one message per topic with start_all and that one message will multiply into e.g. 10 messages, depending on what happens in the flow later.
The "cell range" node you see in the gif can vary, so it can be one cell or 1000 cells. And each cell creates one message with the same topic.

Can you post your flow please that exhibits this? Just the queuing group and some injects feeding it to show the problem.

I will try to post something more useful later but most of the flow needs an input excel to perform the tasks.

[{"id":"83858dc9c8781fe4","type":"inject","z":"f888cb06810e6b43","name":"start all","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":180,"y":420,"wires":[["9bbc36d4fa09801f"]]},{"id":"9bbc36d4fa09801f","type":"function","z":"f888cb06810e6b43","name":"start all","func":"msg.payload = \"\";\nmsg.topic = \"\";\nmsg.selectRange = \"Ax:Ay\";\n\nmsg.setting = \"ip\";\nnode.send([msg, null, null, null, null, null, null, null, null, null]);\n\nmsg.setting = \"template\";\nnode.send([null, msg, null, null, null, null, null, null, null]);\n\nmsg.setting = \"settings\";\nnode.send([null, null, msg, null, null, null, null, null]);\n\nmsg.setting = \"timers\";\nnode.send([null, null, null, msg, null, null, null, null]);\n\nmsg.setting = \"rule1\";\nnode.send([null, null, null, null, msg, null, null, null]);\n\nmsg.setting = \"rule2\";\nnode.send([null, null, null, null, null, msg, null, null]);\n\nmsg.setting = \"rule3\";\nnode.send([null, null, null, null, null, null, msg, null]);\n\nmsg.setting = \"rule_commands\";\nnode.send([null, null, null, null, null, null, null, msg]);\n\nmsg.setting = \"custom\";\nnode.send([null, null, null, null, null, null, null, null, msg]);\n\n\n// // Delay and output to output 10\n// setTimeout(() => {\n//     msg.setting = \"custom\";\n//     node.send([null, null, null, null, null, null, null, null, null, msg]);\n// }, 65000); // Delay in milliseconds (90 seconds)","outputs":10,"timeout":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":330,"y":420,"wires":[["ed63ee4225312b40"],["ed63ee4225312b40"],["ed63ee4225312b40"],["ed63ee4225312b40"],["ed63ee4225312b40"],["ed63ee4225312b40"],["ed63ee4225312b40"],["ed63ee4225312b40"],["ed63ee4225312b40"],["ed63ee4225312b40"]]},{"id":"ed63ee4225312b40","type":"delay","z":"f888cb06810e6b43","name":"Queue","pauseType":"rate","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"minute","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":540,"y":420,"wires":[["cae558cbf96096da"]]},{"id":"2128a855234c1016","type":"link in","z":"f888cb06810e6b43","name":"link in 1","links":["7c6253e5d34769ac"],"x":505,"y":590,"wires":[["3a9faf0a95b4a9bb"]]},{"id":"3a9faf0a95b4a9bb","type":"function","z":"f888cb06810e6b43","name":"Flush","func":"let lastTopic = context.get(\"lastTopic\") || msg.setting;\nlet lastMessageTime = context.get(\"lastMessageTime\") || Date.now();\n\n// Check if the topic has changed\nif (msg.setting !== lastTopic) {\n    context.set(\"lastTopic\", msg.setting);\n    context.set(\"lastMessageTime\", Date.now());\n    return { flush: 0 }; // Do nothing if the topic has changed\n}\n\n// Check if 5 seconds have passed since the last message\nif (Date.now() - lastMessageTime >= 5000) {\n    context.set(\"lastMessageTime\", Date.now());\n    return { flush: 1 }; // Perform the flush\n}\n\n// Update the lastMessageTime context variable\ncontext.set(\"lastMessageTime\", Date.now());\nreturn { flush: 0 }; // Do nothing if the topic has not changed and 5 seconds have not passed\n","outputs":1,"timeout":"","noerr":0,"initialize":"","finalize":"","libs":[],"x":540,"y":540,"wires":[["ed63ee4225312b40"]]},{"id":"cae558cbf96096da","type":"function","z":"f888cb06810e6b43","name":"Topic unchanged?","func":"let lastTopic = context.get(\"lastTopic\") ?? msg.setting\n// topic unchanged => send to output 1\nlet messages = [msg, null]\nif (msg.setting !== lastTopic) {\n    // topic changed => send to output 2\n    messages = [null, msg]\n}\ncontext.set(\"lastTopic\", msg.setting)\nreturn messages;","outputs":2,"timeout":"","noerr":0,"initialize":"","finalize":"","libs":[],"x":720,"y":420,"wires":[["9c79de6a093d5d02","d4d479e614e82a49"],["ed63ee4225312b40"]]}]

Here I tried a slightly different approach but I think maybe my idea of the flush is incorrect.

Rather than route the other topics to a delay node, I am routing them back into the queue.
The flush function node should then wait until it has not received any more messages from the link-in node and once it has not received any more input, flush the next message through the queue.

Do you know what I mean?

What? In that case the queuing is being applied at the wrong point. You need to apply it after the messages have been multiplied up.

I am not sure how practical this is.
Combine all the output 1 messages into a single message (as an array), ditto for output 2 etc.
Then you can use a split node to get back to individual messages.
The final message of each set can be identified by msg.parts.index + 1 == msg.parts.count.
You could use this, with a Complete node to release the next message array from the queue.
So all "output 1" messages will be completed before "output 2" messages are released from the queue.

A partial flow. I used a function to test if payload.parts.index +1 == payload.parts.count because I couldn't work out how to do it in a switch node.


image

I don't think so. Because the start_all starts the different settings. So whether they are multiplied or not makes no difference for the queue. It only changes the flush behaviour.

The flush is triggered multiple times then. So the location is fine but I need to account for varying number of flushings due to multiple link-in messages of the same topic.

If I move the queuing, exactly the same behaviour is observed as before.

To amplify this a little. If the output of the Cell Range node is a stream if messages where the topic varies as has been discussed earlier, then feed that in the Queue node, rather than the output of the Start all node.

As a side question, why have you got multiple outputs from the Start all node? You could just send all the messages on one output.

So, the start_all node sends a different payload for msg.setting on each output.

This msg.setting is my actual filter value (and is used in my Router node later to route to the respective sub-flow).

So depending on the payload of msg.setting a different column of the Excel will be read.

msg.setting = "ip";
node.send([msg, null, null, null, null, null, null, null, null, null]);

msg.setting = "template";
node.send([null, msg, null, null, null, null, null, null, null]);

msg.setting = "settings";
node.send([null, null, msg, null, null, null, null, null]);

...and so on

As you can see above, the flow also contains a node called "Cell range".

Here I can specify how many rows are being read.

So if msg.setting=ip, then the column "ip" will be read and in the above picture, 14 rows will be read. This causes 14 messages with identical msg.setting.

The start_all now also sends msg.setting=template and msg.setting=settings, which again read thos 14 rows causing 28 more messages to be created.

So there are 3 msg.setting values multiplied by 14 rows.
I want to send all rows simultaneously but only one msg.setting at a time.

I am not sure if I can post the entire flow here or if that will exceed the allow chars.
So here is an overview of what is happening.

Most of it is irrelevant for us. The problem is really that I need to make sure that only one msg.setting is being routed to the "Cell range" node and the others are delayed until there are no more messages received from the link-in node (plus add a 10 second delay after the last message has been received).

P.S.: The individual triggers below "start_all" are not being used in this problem. They are only there if I actually want to trigger only one of all the msg.setting payloads.

msg.setting = "ip";
node.send([msg, null, null, null, null, null, null, null, null, null]);

msg.setting = "template";
node.send([null, msg, null, null, null, null, null, null, null]);

msg.setting = "settings";
node.send([null, null, msg, null, null, null, null, null]);

Instead you can do

msg.setting = "ip";
node.send(msg);
msg.setting = "template";
node.send(msg);
msg.setting = "settings";
node.send(msg);

Exactly. Feed those into the queue, then the next batch, with msg.setting set to the next value, and so on. The 14 messages will be sent immediately, then the 10 second delay well be imposed because the next message has a different ip.

I will try it out now. But I think it will not make a difference because the problem is in the flushing.

The flushing is not trigger per batch but per message. So if a batch (e.g. msg.setting=ip) creates 10 messages, then the flushing is performed 10 times (once per message). So every waiting batch (msg.setting=template, msg.setting=settings etc.) will be release from the queue due to the 10 messages of batch 1 (ip).

EDIT:
Yep, problem is the flushing per message. I need to ensure that the flushing is performed per batch somehow.

There will be 10 messages, not just 1, because you will be feeding the 10 ip messages into the queue individually.

Show me the flow you have now. A screenshot will do initially.
Edit: Also post the function node originally called Topic unchanged?