Delayed execution / one-time scheduling

I need to execute some tasks with a small delay. E.g., when a specific door opens, I get an MQTT message, and need to take a snapshot, but, say, with a 5-second delay. I don't want to use NR's blocking delays, since other events may come along. Also, I'd like to be able to do things like 'take a snapshot tomorrow morning' (one-time).

One idea is to use CronPlus, creating a new schedule every time. The downside - need to store/address the data somewhere, create/remove a schedule, etc.

I was thinking about InfluxDB's 'checks' since I'm using it anyway (set the target time in ms and check for 'expiry'). Was also looking at AWS EventBridge (but that's basically a glorified cron for my purposes). Both look like an overkill :slight_smile:

Maybe there's a better way?

P.S. Basically what I'm looking for is MQTT-with-programmable-delay. There's a plugin for RabbitMQ but I'm not ready to switch:

P.P.S. Looks like EMQX can do it out of the box. Hm.. Maybe it's time to ditch Mosquitto.

Can you be more specific? Why won't a Trigger or Delay node between the MQTT In and the Snapshot flow work? At least for the short delays.

Well, in these 5 (sometimes more) seconds a lot can happen. Those new events will be waiting in the MQTT queue until the delay is done and the message is sent.

As a general rule, I try to make as little blocking in my flows as possible. E.g., when I get the snapshot, I prepare a message for the user and send it via messenger. I do this by sending an MQTT message to myself so that I can someday move the blocking call to the messenger API to a separate instance.

Same logic here. If I can unblock immediately, that's the way to go.

Ideally, I'd prefer to have each of my flows as a separate 'lambda' on AWS, but that's another story.

There doesn't seem to be a node that lets you delay individual messages for different times.

But Linux has the at command.
You could have a Python script listening for MQTT messages with a specific topic (or called with an Exec node).
Use at to specify a time when the original MQTT topic and payload should be sent.

The Node-Red bit something like this:

I know, not very elegant!

That's clever, thanks :slight_smile:

Why not just a delay node, you can set to 0 milliseconds and override with msg.delay. Any messages you wish delayed just add the delay property in milliseconds.
e.g.

[{"id":"43a98d75.8a242c","type":"inject","z":"b779de97.b1b46","name":"normal","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"normal","payloadType":"str","x":230,"y":4260,"wires":[["b93280c9.0a6d4"]]},{"id":"b93280c9.0a6d4","type":"delay","z":"b779de97.b1b46","name":"","pauseType":"delayv","timeout":"0","timeoutUnits":"milliseconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"x":420,"y":4260,"wires":[["6ed73c6e.0d1e1c"]]},{"id":"6ed73c6e.0d1e1c","type":"debug","z":"b779de97.b1b46","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":640,"y":4260,"wires":[]},{"id":"b8a49dd7.ae362","type":"inject","z":"b779de97.b1b46","name":"delayed","props":[{"p":"payload"},{"p":"delay","v":"5000","vt":"num"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"delayed","payloadType":"str","x":240,"y":4300,"wires":[["b93280c9.0a6d4"]]}]
1 Like

It's a single threaded environment, so while my main and only thread is waiting for 5 seconds, no new messages from the queue are being processed, which is a problem sometimes.

I didn't think the delay node would allow short delays to "jump the queue" but it does, so perfect!

I don't understand, I thought it was the snapshot you were trying to delay. Why do you need to delay the MQTT message? Put the delay between the MQTT In and the Snapshot. Why would that hold up other MQTT messages?

There are many messages in the MQTT queue. I get them one by one for processing. Some don't require anything, some require a snapshot immediately, some require a delay. But the queue needs to be processed ASAP, so blocking it for seconds is unacceptable.

To avoid delaying potentially critical messages I need a(n external) mechanism of delaying the snapshots for some messages. So, if I'll be using EMQX (haven't tried it yet), I'd be posting the snapshot request messages that go to the cam with a variable delay (zero or the number of seconds to delay).

Something like:

$delayed/3/cam/ID/snapshot/get

@E1cid 's example flow demonstrates that messages that don't have a msg.delay set are processed immediately without blocking the queue. did you try his flow ?

1 Like

Why would I try his flow if it doesn't solve my problem? My problem is that while NR is waiting on the message that requires a delay the queue is not being processed. I need some external entity to schedule messages without blocking the NR flow.

There is no delay in other messages, did you try the example?

Here is a fuller example

[{"id":"d017620e.f00a1","type":"mqtt in","z":"b779de97.b1b46","name":"","topic":"+/cam/ID/snapshot/get","qos":"2","datatype":"auto","broker":"e8ba3ef5.22f4a8","x":230,"y":4220,"wires":[["6a41133b.d145fc"]]},{"id":"6a41133b.d145fc","type":"change","z":"b779de97.b1b46","name":"","rules":[{"t":"set","p":"delay","pt":"msg","to":"$number($split($$.topic, \"/\")[0])","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":470,"y":4220,"wires":[["b93280c9.0a6d4"]]},{"id":"b93280c9.0a6d4","type":"delay","z":"b779de97.b1b46","name":"","pauseType":"delayv","timeout":"0","timeoutUnits":"milliseconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"x":660,"y":4220,"wires":[["6ed73c6e.0d1e1c"]]},{"id":"6ed73c6e.0d1e1c","type":"debug","z":"b779de97.b1b46","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":610,"y":4320,"wires":[]},{"id":"43a98d75.8a242c","type":"inject","z":"b779de97.b1b46","name":"normal","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"0/cam/ID/snapshot/get","payload":"normal","payloadType":"str","x":220,"y":4340,"wires":[["224ab3.e98ce54e"]]},{"id":"b8a49dd7.ae362","type":"inject","z":"b779de97.b1b46","name":"delayed","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"3000/cam/ID/snapshot/get","payload":"delayed","payloadType":"str","x":200,"y":4380,"wires":[["224ab3.e98ce54e"]]},{"id":"224ab3.e98ce54e","type":"mqtt out","z":"b779de97.b1b46","name":"","topic":"","qos":"","retain":"","broker":"e8ba3ef5.22f4a8","x":380,"y":4360,"wires":[]},{"id":"e8ba3ef5.22f4a8","type":"mqtt-broker","name":"testb","broker":"192.168.1.25","port":"1883","clientid":"node-red","usetls":false,"compatmode":false,"keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthRetain":"false","birthPayload":"","closeTopic":"","closeQos":"0","closePayload":"","willTopic":"","willQos":"0","willPayload":""}]

I may be mistaken, but although JS is single-threaded, you will not block execution if you use the delay node with the "override delay with msg.delay" option. You could also postpone an action by calling setTimeout() in a function node and so on.

Oh, I see what you mean, 'cooperative multitasking' a-la windows-3. It does seem to work here.

Thank you for the code! After looking at it, I tested a different aspect, my concern wasn't with the variable delay, but about blocking. And the 'delay' node doesn't seem to block the flow.

This topic was automatically closed 60 days after the last reply. New replies are no longer allowed.