Can the Delay node rate limit, but pass first message immediately, and always pass on the latest message?

I have a scenario with messages coming in at random intervals. Often a long way apart but occasionally close together. These are the goals

  1. When a message comes in and there has been more than five seconds since the last message then the new one is passed on immediately.

  2. If a second message comes in less than five seconds after the first then it will be queued and released at the end of the five seconds unless ...

  3. If a third (and subsequent) message comes in when there is already one queued then it replaces the one already in the queue and will be released at the end of the period.

So the purpose is to rate limit, but immediately to pass on the first message, and to guarantee that the latest message is always delivered eventually. I can't work out how to do it with the Delay node. It is nearly achievable in All Topics mode, but in that mode it doesn't send the first message immediately.

2 Likes

You may need to expand on that.

What happens if there is a message arrives and then nothing else?

That is covered by case 1

(Sorry, I didn't parse it like that.)

Got it now.

Doesn't this do what you want then?

[
    {
        "id": "791af1ddf9a1328d",
        "type": "inject",
        "z": "f7d630bfc1b2f259",
        "name": "",
        "props": [
            {
                "p": "payload"
            },
            {
                "p": "topic",
                "vt": "str"
            }
        ],
        "repeat": "",
        "crontab": "",
        "once": false,
        "onceDelay": 0.1,
        "topic": "",
        "payload": "blah",
        "payloadType": "str",
        "x": 360,
        "y": 2170,
        "wires": [
            [
                "9b79800d4b74a795"
            ]
        ]
    },
    {
        "id": "9b79800d4b74a795",
        "type": "delay",
        "z": "f7d630bfc1b2f259",
        "name": "",
        "pauseType": "rate",
        "timeout": "5",
        "timeoutUnits": "seconds",
        "rate": "1",
        "nbRateUnits": "5",
        "rateUnits": "second",
        "randomFirst": "1",
        "randomLast": "5",
        "randomUnits": "seconds",
        "drop": false,
        "allowrate": false,
        "outputs": 1,
        "x": 540,
        "y": 2170,
        "wires": [
            [
                "585d6add0477b5c6"
            ]
        ]
    },
    {
        "id": "585d6add0477b5c6",
        "type": "debug",
        "z": "f7d630bfc1b2f259",
        "name": "OUT",
        "active": true,
        "tosidebar": true,
        "console": false,
        "tostatus": false,
        "complete": "payload",
        "targetType": "msg",
        "statusVal": "",
        "statusType": "auto",
        "x": 700,
        "y": 2170,
        "wires": []
    }
]

Does it work for case 3?

It is a pity that sending {flush: 1,reset: true} to the Delay node does not flush and then reset. It appears to reset first, so there is nothing to flush. Sending them as two messages is not reliable as there is a window between them where a message could arrive and then get lost because of the reset.

You may need to look at other nodes in the library.
https://flows.nodered.org/search?type=node

I have been looking. Any suggestions?

q-gate may work.

It allows you to queue messages and spit out selected ones.

(Sorry)

Could you try with point 3 again.

There are messages in the queue (as per 1 and 2) and a third message arrives.

Which messages are then on-sent?
The older or newer ones?

Ok, not exact, but how about this:

[
    {
        "id": "8ac365e46b76938d",
        "type": "switch",
        "z": "f7d630bfc1b2f259",
        "name": "",
        "property": "count",
        "propertyType": "msg",
        "rules": [
            {
                "t": "eq",
                "v": "2",
                "vt": "num"
            },
            {
                "t": "eq",
                "v": "1",
                "vt": "num"
            },
            {
                "t": "gt",
                "v": "2",
                "vt": "num"
            }
        ],
        "checkall": "true",
        "repair": false,
        "outputs": 3,
        "x": 530,
        "y": 1550,
        "wires": [
            [
                "b0826138af4b91c4"
            ],
            [
                "06721c1d68f7bfc3"
            ],
            [
                "d15070a114653524",
                "06721c1d68f7bfc3"
            ]
        ]
    },
    {
        "id": "5bd3658d5a3a11a7",
        "type": "counter",
        "z": "f7d630bfc1b2f259",
        "name": "",
        "init": "0",
        "step": "1",
        "lower": null,
        "upper": null,
        "mode": "increment",
        "outputs": "1",
        "x": 360,
        "y": 1550,
        "wires": [
            [
                "8ac365e46b76938d"
            ]
        ]
    },
    {
        "id": "b0826138af4b91c4",
        "type": "delay",
        "z": "f7d630bfc1b2f259",
        "name": "",
        "pauseType": "delay",
        "timeout": "5",
        "timeoutUnits": "seconds",
        "rate": "1",
        "nbRateUnits": "1",
        "rateUnits": "second",
        "randomFirst": "1",
        "randomLast": "5",
        "randomUnits": "seconds",
        "drop": false,
        "allowrate": false,
        "outputs": 1,
        "x": 700,
        "y": 1550,
        "wires": [
            [
                "06721c1d68f7bfc3"
            ]
        ]
    },
    {
        "id": "06721c1d68f7bfc3",
        "type": "debug",
        "z": "f7d630bfc1b2f259",
        "name": "",
        "active": true,
        "tosidebar": true,
        "console": false,
        "tostatus": false,
        "complete": "true",
        "targetType": "full",
        "statusVal": "",
        "statusType": "auto",
        "x": 680,
        "y": 1670,
        "wires": []
    },
    {
        "id": "d15070a114653524",
        "type": "change",
        "z": "f7d630bfc1b2f259",
        "name": "",
        "rules": [
            {
                "t": "set",
                "p": "reset",
                "pt": "msg",
                "to": "reset",
                "tot": "str"
            },
            {
                "t": "delete",
                "p": "payload",
                "pt": "msg"
            }
        ],
        "action": "",
        "property": "",
        "from": "",
        "to": "",
        "reg": false,
        "x": 700,
        "y": 1510,
        "wires": [
            [
                "b0826138af4b91c4"
            ]
        ]
    },
    {
        "id": "03284863bd452713",
        "type": "inject",
        "z": "f7d630bfc1b2f259",
        "name": "",
        "props": [
            {
                "p": "payload"
            },
            {
                "p": "topic",
                "vt": "str"
            }
        ],
        "repeat": "",
        "crontab": "",
        "once": false,
        "onceDelay": 0.1,
        "topic": "",
        "payload": "",
        "payloadType": "date",
        "x": 100,
        "y": 1640,
        "wires": [
            [
                "5bd3658d5a3a11a7",
                "5406068f12e1eeac"
            ]
        ]
    },
    {
        "id": "acbdf11c584a1633",
        "type": "change",
        "z": "f7d630bfc1b2f259",
        "name": "",
        "rules": [
            {
                "t": "set",
                "p": "reset",
                "pt": "msg",
                "to": "reset",
                "tot": "str"
            },
            {
                "t": "delete",
                "p": "payload",
                "pt": "msg"
            }
        ],
        "action": "",
        "property": "",
        "from": "",
        "to": "",
        "reg": false,
        "x": 370,
        "y": 1510,
        "wires": [
            [
                "5bd3658d5a3a11a7"
            ]
        ]
    },
    {
        "id": "5406068f12e1eeac",
        "type": "delay",
        "z": "f7d630bfc1b2f259",
        "name": "",
        "pauseType": "delay",
        "timeout": "5",
        "timeoutUnits": "seconds",
        "rate": "1",
        "nbRateUnits": "1",
        "rateUnits": "second",
        "randomFirst": "1",
        "randomLast": "5",
        "randomUnits": "seconds",
        "drop": false,
        "allowrate": false,
        "outputs": 1,
        "x": 140,
        "y": 1550,
        "wires": [
            [
                "e0da96193f54affc"
            ]
        ]
    },
    {
        "id": "e0da96193f54affc",
        "type": "delay",
        "z": "f7d630bfc1b2f259",
        "name": "",
        "pauseType": "rate",
        "timeout": "5",
        "timeoutUnits": "seconds",
        "rate": "1",
        "nbRateUnits": "5",
        "rateUnits": "second",
        "randomFirst": "1",
        "randomLast": "5",
        "randomUnits": "seconds",
        "drop": true,
        "allowrate": false,
        "outputs": 1,
        "x": 160,
        "y": 1510,
        "wires": [
            [
                "acbdf11c584a1633"
            ]
        ]
    }
]

Foreign nodes.
node-red-contrib-counter

errrr flush then reset doesn't make sense (to me) - as once flushed it is in the same state as reset (IE nothing in Q) - so the reset is superfluous.

This may be better.

[{"id":"8ac365e46b76938d","type":"switch","z":"f7d630bfc1b2f259","name":"","property":"count","propertyType":"msg","rules":[{"t":"eq","v":"2","vt":"num"},{"t":"eq","v":"1","vt":"num"},{"t":"gt","v":"2","vt":"num"}],"checkall":"true","repair":false,"outputs":3,"x":370,"y":1600,"wires":[["b0826138af4b91c4"],["06721c1d68f7bfc3"],["d15070a114653524","6cc4a6c19d7efbc8"]]},{"id":"5bd3658d5a3a11a7","type":"counter","z":"f7d630bfc1b2f259","name":"","init":"0","step":"1","lower":null,"upper":null,"mode":"increment","outputs":"1","x":360,"y":1550,"wires":[["8ac365e46b76938d"]]},{"id":"b0826138af4b91c4","type":"delay","z":"f7d630bfc1b2f259","name":"","pauseType":"delay","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":700,"y":1550,"wires":[["06721c1d68f7bfc3"]]},{"id":"06721c1d68f7bfc3","type":"debug","z":"f7d630bfc1b2f259","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":720,"y":1670,"wires":[]},{"id":"d15070a114653524","type":"change","z":"f7d630bfc1b2f259","name":"","rules":[{"t":"set","p":"reset","pt":"msg","to":"reset","tot":"str"},{"t":"delete","p":"payload","pt":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":700,"y":1510,"wires":[["b0826138af4b91c4"]]},{"id":"6cc4a6c19d7efbc8","type":"delay","z":"f7d630bfc1b2f259","name":"","pauseType":"rate","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"5","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":380,"y":1670,"wires":[["ac8840e7f8c36742"]]},{"id":"03284863bd452713","type":"inject","z":"f7d630bfc1b2f259","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":100,"y":1640,"wires":[["5bd3658d5a3a11a7","d64f36e617cfaa63"]]},{"id":"acbdf11c584a1633","type":"change","z":"f7d630bfc1b2f259","name":"","rules":[{"t":"set","p":"reset","pt":"msg","to":"reset","tot":"str"},{"t":"delete","p":"payload","pt":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":370,"y":1510,"wires":[["5bd3658d5a3a11a7"]]},{"id":"ac8840e7f8c36742","type":"delay","z":"f7d630bfc1b2f259","name":"","pauseType":"delay","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":560,"y":1670,"wires":[["06721c1d68f7bfc3"]]},{"id":"d64f36e617cfaa63","type":"trigger","z":"f7d630bfc1b2f259","name":"","op1":"","op2":"0","op1type":"nul","op2type":"str","duration":"5","extend":true,"overrideDelay":false,"units":"s","reset":"","bytopic":"all","topic":"topic","outputs":1,"x":140,"y":1550,"wires":[["e0da96193f54affc"]]},{"id":"e0da96193f54affc","type":"delay","z":"f7d630bfc1b2f259","name":"","pauseType":"rate","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"5","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":true,"allowrate":false,"outputs":1,"x":160,"y":1510,"wires":[["acbdf11c584a1633"]]}]

But still not perfect.

My best attempt:

[{"id":"864772ef56f82916","type":"tab","label":"Flow 2","disabled":false,"info":"","env":[]},{"id":"6c7f6a05311c3f9d","type":"group","z":"864772ef56f82916","style":{"stroke":"#999999","stroke-opacity":"1","fill":"none","fill-opacity":"1","label":true,"label-position":"nw","color":"#a4a4a4"},"nodes":["169b911cfc6e0ee4","508272893fdea567","3c7b3c50819be497","b4b5e64c6a8d1a89"],"x":314,"y":79,"w":452,"h":142},{"id":"b4b5e64c6a8d1a89","type":"junction","z":"864772ef56f82916","g":"6c7f6a05311c3f9d","x":340,"y":160,"wires":[["508272893fdea567","169b911cfc6e0ee4"]]},{"id":"8416ee8d414b6c99","type":"inject","z":"864772ef56f82916","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":"2","topic":"","payload":"Wales","payloadType":"str","x":210,"y":120,"wires":[["b4b5e64c6a8d1a89"]]},{"id":"bc2001d5d56c5276","type":"inject","z":"864772ef56f82916","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":"3","topic":"","payload":"Scotland","payloadType":"str","x":220,"y":160,"wires":[["b4b5e64c6a8d1a89"]]},{"id":"c5d6473c292a3bde","type":"inject","z":"864772ef56f82916","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":"4","topic":"","payload":"England","payloadType":"str","x":220,"y":200,"wires":[["b4b5e64c6a8d1a89"]]},{"id":"d5c6363f1343d657","type":"debug","z":"864772ef56f82916","name":"debug 4","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":880,"y":180,"wires":[]},{"id":"169b911cfc6e0ee4","type":"q-gate","z":"864772ef56f82916","g":"6c7f6a05311c3f9d","name":"","controlTopic":"control","defaultState":"open","openCmd":"open","closeCmd":"close","toggleCmd":"toggle","queueCmd":"queue","defaultCmd":"default","triggerCmd":"trigger","flushCmd":"flush","resetCmd":"reset","peekCmd":"peek","dropCmd":"drop","statusCmd":"status","maxQueueLength":"1","keepNewest":true,"qToggle":false,"persist":false,"storeName":"memory","x":490,"y":180,"wires":[["d5c6363f1343d657"]]},{"id":"508272893fdea567","type":"trigger","z":"864772ef56f82916","g":"6c7f6a05311c3f9d","name":"Set mode Queue","op1":"queue","op2":"open","op1type":"str","op2type":"str","duration":"5","extend":false,"overrideDelay":false,"units":"s","reset":"","bytopic":"all","topic":"topic","outputs":1,"x":450,"y":120,"wires":[["3c7b3c50819be497"]]},{"id":"3c7b3c50819be497","type":"change","z":"864772ef56f82916","g":"6c7f6a05311c3f9d","name":"topic = control","rules":[{"t":"set","p":"topic","pt":"msg","to":"control","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":660,"y":120,"wires":[["169b911cfc6e0ee4"]]}]

The node-red-contrib-queue-gate defaults to Open but a message (after it has passed through) sets it to Queue mode for 5 sec and only the latest 1 message is queued.

I thought it might need a millisecond or two delay before switching modes but so far I have not seen a problem.

Oh, and I originally set up the grouped bit as a subflow, but since it discards some messages you get link node timeout errors. Edit - not a subflow, link in and link out.

Edit 2:
This works too, maybe better since there is less possibility of a race between msg1 passing the gate and setting the gate to Queue.
But feedback loops ring mental alarm bells. No actual problem here I think.

1 Like

Not bad.

But isn't there a problem with scenario 3. Message 2 would still get through.

I read it as: If a third message arrives (and there is a queued message) the queued message is dropped and 3 is sent after the timeout time.
(But I am not sure)

These two settings allow only the most recently queued message to emerge after 5 seconds.

Sorry...

I use that node myself, and overlooked that part.

(I've got to learn to read) :wink:

I was working on doing just that and made my convoluted mess.
Glad you saw that trick.

Yeah, your way looks better.

But what happens if there are other (more) messages coming through?
Like 4, 5 or 6?

As long as the actual messages are different then actually a simple trigger node and filter may work...

[{"id":"ce4b2f27d2d181d8","type":"inject","z":"a7fc399c358cfac0","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"A","payload":"1","payloadType":"num","x":215,"y":360,"wires":[["8c004dc7912dc3ac"]]},{"id":"8c004dc7912dc3ac","type":"trigger","z":"a7fc399c358cfac0","name":"","op1":"","op2":"","op1type":"pay","op2type":"payl","duration":"5","extend":false,"overrideDelay":false,"units":"s","reset":"","bytopic":"all","topic":"topic","outputs":1,"x":440,"y":405,"wires":[["f2d6fb1495637cd9"]]},{"id":"f2d6fb1495637cd9","type":"rbe","z":"a7fc399c358cfac0","name":"","func":"rbe","gap":"","start":"","inout":"out","septopics":true,"property":"payload","topi":"topic","x":625,"y":405,"wires":[["8317fdb6d1bae677"]]},{"id":"d88cc3cace4cda48","type":"inject","z":"a7fc399c358cfac0","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"A","payload":"2","payloadType":"num","x":215,"y":405,"wires":[["8c004dc7912dc3ac"]]},{"id":"87acfb08b3f28e66","type":"inject","z":"a7fc399c358cfac0","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"A","payload":"3","payloadType":"num","x":215,"y":450,"wires":[["8c004dc7912dc3ac"]]},{"id":"8317fdb6d1bae677","type":"debug","z":"a7fc399c358cfac0","name":"debug 3","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":795,"y":405,"wires":[]}]

1 Like

Ok, so can messages be on-going so you get the first message then a second, third and the more?

To what limit?

Are they all to be kept?

Is there an upper limit?

In all of these it would seem Message2 gets dropped and all you see are message 1, 3 and subsequent message at 5 second intervals.