I'm afraid I took this on as a bit of a challenge. This flow uses a function
node (labelled counter-timer
) that does what I described in an earlier post.
[{"id":"488ec17a.f1cbc","type":"function","z":"5b837f12.858c98","name":" counter-timer","func":"var topic = msg.topic, limit = 5, time = 10;\nvar counters = context.get('counters') || new Map();\n//reset\nif (typeof msg.reset !== 'undefined') {\n counters = counters.clear();\n} else {\n count = counters.get(topic);\n if (typeof count === 'undefined') {\n count = 1;\n } else {\n count ++;\n if (count >= limit) {\n msg.payload = 'limit reached for topic ' + topic;\n }\n }\n counters.set(topic,count);\n setTimeout(function(){\n var n = counters.get(topic);\n n--;\n counters.set(topic,n);\n if (n === 0) {counters.delete(topic);}\n }, 1000*time);\n}\n// clear topic\nif (typeof msg.delete !== 'undefined') {\n counters.delete(topic);\n}\n// save map\ncontext.set('counters',counters);\n// map size\nmsg.size = 0;\nif (typeof counters !== 'undefined'){\n msg.size = counters.size;\n}\n// map to array\nif (typeof counters !== 'undefined') {\n msg.counters = Array.from(counters);\n}\nreturn msg;","outputs":1,"noerr":0,"x":580,"y":200,"wires":[["20892204.55c716"]]},{"id":"ad036b7b.ae54b","type":"inject","z":"5b837f12.858c98","name":"","topic":"topic0","payload":"","payloadType":"date","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":180,"y":160,"wires":[["488ec17a.f1cbc"]]},{"id":"4f1482af.66fe9c","type":"inject","z":"5b837f12.858c98","name":"","topic":"topic1","payload":"","payloadType":"date","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":180,"y":200,"wires":[["488ec17a.f1cbc"]]},{"id":"e95552ac.455ba8","type":"inject","z":"5b837f12.858c98","name":"","topic":"topic2","payload":"","payloadType":"date","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":180,"y":240,"wires":[["488ec17a.f1cbc"]]},{"id":"6fd0047.f7ce37c","type":"inject","z":"5b837f12.858c98","name":"","topic":"","payload":"","payloadType":"date","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":160,"y":120,"wires":[["fdf8cc17.a188e8"]]},{"id":"fdf8cc17.a188e8","type":"change","z":"5b837f12.858c98","name":"","rules":[{"t":"set","p":"reset","pt":"msg","to":"","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":340,"y":120,"wires":[["488ec17a.f1cbc"]]},{"id":"3227acba.6041bc","type":"change","z":"5b837f12.858c98","name":"","rules":[{"t":"set","p":"delete","pt":"msg","to":"","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":360,"y":280,"wires":[["488ec17a.f1cbc"]]},{"id":"1f1b502d.f61a58","type":"inject","z":"5b837f12.858c98","name":"","topic":"topic1","payload":"delete","payloadType":"str","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":190,"y":280,"wires":[["3227acba.6041bc"]]},{"id":"20892204.55c716","type":"debug","z":"5b837f12.858c98","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","x":750,"y":200,"wires":[]}]
What it does:
Maintains a list of all message topics seen during the last specified collection interval and the number of messages received with that topic.
How it does it:
When a message is received, it increments the counter associated with the msg.topic
(creating one if necessary) and starts a timer set for the collection interval. When the count for a topic exceeds a specified limit it sends out an alert message. When the timer expires, it decrements the counter associated with that topic, and if the counter reaches zero, it deletes the counter.
The code and output are not pretty and still contain debugging cruft, but you should get the idea. With a nod to @BartButenaers for the basic idea.