Send payload when certain event is received X number of times within specified timeframe


#1

I know that title might be confusing but I couldn't think of another way to say it. I have an application where I will be receiving events from dozens (possibly hundreds) of devices. I don't care about the event if it is one-off but I do care if the same event from the same device has been seen a certain number of times in a specified time frame. As an example, here is the same event coming into Node-RED from two different devices.

image

In this case the event is "Motion Started" but it could be any number of things such as device offline or something else. As an example, I need something that will say "if the Motion Started events comes from the same ID 5 times in 120 seconds, then send X payload". I believe I need to do the following:

  • Have a timer start each time an event comes in for a specified device.
    • I don't know if this means storing the time in a DB since there could potentially be dozens or hundreds or if there is another way to do it.
  • Each time an event comes in for a specified device, it needs to check if there is already a timer for that event/device combination and, if so, increment the count for that event. It will also need to start its own timer in case the time for the previous event expires before the specified time.
  • If there is an existing timer and the count gets incremented to 5 (in the example above), then send X payload.

I'm struggling to figure out how to even start this. I have seen some of the counter nodes but they don't do what I need so I'm assuming I will need to use a Function node and write some JavaScript. I'm by no means a pro at JavaScript but I'm learning. I'm not looking for someone to write something for me (although, if you have something already written I would gladly take it) but more just looking for someone to point me in the right direction. I've done some Googling around but I'm coming up empty (I think that is more because of me not using the proper terminology when searching as opposed to there not being an answer).

Any help/input would be greatly appreciated.


#2

It resembles a use case of sensor data analytics, which would be something demanding in terms of resources. Trying to find a solution with Node-RED perhaps you could have a look on the set of nodes:

Honestly speaking I never tried to use them (did not find the need neither the time to explore) so I am not aware of its capabilities but perhaps it may be worth an investigation.


#3

Thanks Andrei. I'll dig through that and see if it helps. From a first glance, it looks like it will at least get me started and cover some of what I need.


#4

This node may also be useful - https://flows.nodered.org/node/node-red-contrib-timed-counter


#5

Using the node indicated by Dave the solution could be very simple at the end of the day. I tested using flow below. It could be a starting point for you to start to developing the solution.

Flow:

[{"id":"2924d0c6.65e49","type":"tab","label":"Flow 1","disabled":false,"info":""},{"id":"a773d1c2.92409","type":"timed-counter","z":"2924d0c6.65e49","name":"","timelimit":"2000","withhold":true,"fixedtimeout":false,"x":640,"y":300,"wires":[["60894e0a.f72b4"]]},{"id":"beb06291.f1a93","type":"inject","z":"2924d0c6.65e49","name":"Camera 1 - Started","topic":"","payload":"{\"Event\":\"Motion Started\",\"ID\":\"AAA\",\"Name\":\"Camera 1\",\"Type\":\"Camera\"}","payloadType":"json","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":250,"y":200,"wires":[["2835811.dc5c07e"]]},{"id":"27c6c1b6.b3128e","type":"inject","z":"2924d0c6.65e49","name":"Camera 2 - Started","topic":"","payload":"{\"Event\":\"Motion Started\",\"ID\":\"BBB\",\"Name\":\"Camera 2\",\"Type\":\"Camera\"}","payloadType":"json","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":250,"y":320,"wires":[["2835811.dc5c07e"]]},{"id":"643930ed.7eabe","type":"inject","z":"2924d0c6.65e49","name":"Camera 1 - Stopped","topic":"","payload":"{\"Event\":\"Motion Stopped\",\"ID\":\"AAA\",\"Name\":\"Camera 1\",\"Type\":\"Camera\"}","payloadType":"json","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":250,"y":240,"wires":[["2835811.dc5c07e"]]},{"id":"69e30cd6.351e94","type":"inject","z":"2924d0c6.65e49","name":"Camera 2 - Stopped","topic":"","payload":"{\"Event\":\"Motion Stopped\",\"ID\":\"BBB\",\"Name\":\"Camera 2\",\"Type\":\"Camera\"}","payloadType":"json","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":250,"y":360,"wires":[["2835811.dc5c07e"]]},{"id":"2835811.dc5c07e","type":"switch","z":"2924d0c6.65e49","name":"","property":"payload.ID","propertyType":"msg","rules":[{"t":"eq","v":"AAA","vt":"str"},{"t":"eq","v":"BBB","vt":"str"}],"checkall":"true","repair":false,"outputs":2,"x":470,"y":280,"wires":[["72c14731.556cf8"],["a773d1c2.92409"]]},{"id":"28a9b04b.d369","type":"debug","z":"2924d0c6.65e49","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"count","x":830,"y":260,"wires":[]},{"id":"72c14731.556cf8","type":"timed-counter","z":"2924d0c6.65e49","name":"","timelimit":"2000","withhold":true,"fixedtimeout":false,"x":640,"y":260,"wires":[["28a9b04b.d369"]]},{"id":"60894e0a.f72b4","type":"debug","z":"2924d0c6.65e49","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"count","x":830,"y":300,"wires":[]}]

#6

@Andrei, the solution you suggest is great in principle, but I can see real problems with scaling it to the "dozens (possibly hundreds) of devices" that @TheSauce says he needs to monitor. It ought to be possible to write a single function node that deploys a counter and timer for each msg.payload.ID that arrives and deletes them when they are no longer needed. The technique that @BartButenaers uses in the msg-resend node, node-red-contrib-msg-resend, ought to work, but don't have time just now to try it out.


#7

or maybe enhance that node so it can optionally handle multiple IDs (normally we suggest using msg.topic as the identifier). - or engage with the developer of that node etc... which would then make it better for everyone.


#8

Hi Mike, I couldn´t agree more. The key point for this use case is scalability and the solution I drafted did not take this into consideration. Indeed another solution would be doable if the contrib-node were able to manage msg.topic, as Dave mentioned.

@TheSauce, I wonder now if it is possible to imagine a different approach for handling the incoming data. What if we partition the stream of data in chunks, consolidating whatever is received each period of 120 seconds and generating the analysis only for the data received within this timeframe? In such a case, we would have only one timer managing the stream of data.That would be scalabe and efficient.


#9

This makes really good sense, as long as the "chunking" is actually a sliding window. That way, events from the same device can't fall into adjacent chunks and be under-counted. Also, the counting window would have to be the same for all the devices. I don't see a way to do this with existing core and contributed nodes (I think the sliding window rules out the batch node), but I've been wrong before.


#10

Thank you for all of the great ideas and options. I'll dig through these and see if I can make something work.


#11

I'm afraid I took this on as a bit of a challenge. This flow uses a function node (labelled counter-timer) that does what I described in an earlier post.

[{"id":"488ec17a.f1cbc","type":"function","z":"5b837f12.858c98","name":" counter-timer","func":"var topic = msg.topic, limit = 5, time = 10;\nvar counters = context.get('counters') || new Map();\n//reset\nif (typeof msg.reset !== 'undefined') {\n    counters = counters.clear();\n} else {\n    count = counters.get(topic);\n    if (typeof count === 'undefined') {\n        count = 1;\n    } else {\n        count ++;\n        if (count >= limit) {\n            msg.payload = 'limit reached for topic ' + topic;\n        }\n    }\n    counters.set(topic,count);\n    setTimeout(function(){\n        var n = counters.get(topic);\n        n--;\n        counters.set(topic,n);\n        if (n === 0) {counters.delete(topic);}\n    }, 1000*time);\n}\n// clear topic\nif (typeof msg.delete !== 'undefined') {\n    counters.delete(topic);\n}\n// save map\ncontext.set('counters',counters);\n// map size\nmsg.size = 0;\nif (typeof counters !== 'undefined'){\n    msg.size = counters.size;\n}\n// map to array\nif (typeof counters !== 'undefined') {\n    msg.counters = Array.from(counters);\n}\nreturn msg;","outputs":1,"noerr":0,"x":580,"y":200,"wires":[["20892204.55c716"]]},{"id":"ad036b7b.ae54b","type":"inject","z":"5b837f12.858c98","name":"","topic":"topic0","payload":"","payloadType":"date","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":180,"y":160,"wires":[["488ec17a.f1cbc"]]},{"id":"4f1482af.66fe9c","type":"inject","z":"5b837f12.858c98","name":"","topic":"topic1","payload":"","payloadType":"date","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":180,"y":200,"wires":[["488ec17a.f1cbc"]]},{"id":"e95552ac.455ba8","type":"inject","z":"5b837f12.858c98","name":"","topic":"topic2","payload":"","payloadType":"date","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":180,"y":240,"wires":[["488ec17a.f1cbc"]]},{"id":"6fd0047.f7ce37c","type":"inject","z":"5b837f12.858c98","name":"","topic":"","payload":"","payloadType":"date","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":160,"y":120,"wires":[["fdf8cc17.a188e8"]]},{"id":"fdf8cc17.a188e8","type":"change","z":"5b837f12.858c98","name":"","rules":[{"t":"set","p":"reset","pt":"msg","to":"","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":340,"y":120,"wires":[["488ec17a.f1cbc"]]},{"id":"3227acba.6041bc","type":"change","z":"5b837f12.858c98","name":"","rules":[{"t":"set","p":"delete","pt":"msg","to":"","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":360,"y":280,"wires":[["488ec17a.f1cbc"]]},{"id":"1f1b502d.f61a58","type":"inject","z":"5b837f12.858c98","name":"","topic":"topic1","payload":"delete","payloadType":"str","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":190,"y":280,"wires":[["3227acba.6041bc"]]},{"id":"20892204.55c716","type":"debug","z":"5b837f12.858c98","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","x":750,"y":200,"wires":[]}]

What it does:
Maintains a list of all message topics seen during the last specified collection interval and the number of messages received with that topic.

How it does it:
When a message is received, it increments the counter associated with the msg.topic (creating one if necessary) and starts a timer set for the collection interval. When the count for a topic exceeds a specified limit it sends out an alert message. When the timer expires, it decrements the counter associated with that topic, and if the counter reaches zero, it deletes the counter.

The code and output are not pretty and still contain debugging cruft, but you should get the idea. With a nod to @BartButenaers for the basic idea.


#12

:eye: Automatic copyright protection violation alert message :eye:

:wink:


#13

Busted, nabbed, caught red-handed... :open_mouth:


#14

I have raised an issue requesting that feature and offering the function I wrote as a proof-of-concept. Let's see what happens.


#15

Thanks @drmibell and @BartButenaers. This is one of the many things I love about this forum and community. I'll try and play around with this flow tomorrow and see how it goes.