Count unique id's from a stream of messages

Hi,

I am getting a stream of messages and every messages, contains an ID.
Some ID's send messages more often then other ID's.
How do I count the amount of ID's in a certain time, lets say 10 minutes?

You could use a switch node to filter out the ID's and then use this node to count them:

But will this also count the following stream:

ID1, ID2, ID3, ID1, ID4, ID4, ID5, ID1
= 5 unique ID's?

What does that actually mean? is it the topic? or in the payload? or msg.id?

show me a couple of example payloads (i.e. where is this ID that you want to count)

I get the following:

msg.payload.senderMmsi: 244730216
msg.payload.senderMmsi: 244704000
msg.payload.senderMmsi: 244355000
msg.payload.senderMmsi: 538071599
msg.payload.senderMmsi: 244730216
msg.payload.senderMmsi: 244355000

Some payload.senderMmsi might be the same, and I do not want to count those twice.
If I take the above, I want to end up with the value: 4

So what you want is count of unique numbers in msg.payload.senderMmsi?

I'm not the best in explaining my problem :rofl:
but, yes, that i what i am looking for

how many of these unique numbers would you expect over the course of time?

It can be up to 200

Here you go...

rNHHViKjIb

[{"id":"b127e9dd.ebda88","type":"inject","z":"5e6c8b.7f38b374","name":"1000","props":[{"p":"payload.senderMmsi","v":"1000","vt":"num"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":1200,"y":1088,"wires":[["99bca230.f1e7b"]]},{"id":"aae37861.f01d48","type":"inject","z":"5e6c8b.7f38b374","name":"1001","props":[{"p":"payload.senderMmsi","v":"1001","vt":"num"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":1202,"y":1136,"wires":[["99bca230.f1e7b"]]},{"id":"69f69a4.ba09064","type":"inject","z":"5e6c8b.7f38b374","name":"1002","props":[{"p":"payload.senderMmsi","v":"1002","vt":"num"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":1202,"y":1184,"wires":[["99bca230.f1e7b"]]},{"id":"5e69c1e4.cca27","type":"inject","z":"5e6c8b.7f38b374","name":"1003","props":[{"p":"payload.senderMmsi","v":"1003","vt":"num"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":1202,"y":1232,"wires":[["99bca230.f1e7b"]]},{"id":"99bca230.f1e7b","type":"function","z":"5e6c8b.7f38b374","name":"Unique message counter","func":"var seen = context.get(\"seen\") || {}\nvar count = context.get(\"count\") || 0\nif(!seen[msg.payload.senderMmsi]) {\n    count++;\n    seen[msg.payload.senderMmsi] = true\n}\n\ncontext.set(\"seen\", seen);\ncontext.set(\"count\", count);\nnode.status({text: \"Inique IDs: \" + count})\nmsg.count = count\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","x":1452,"y":1088,"wires":[["4a997ca.6faa084","47a5b748.076758"]]},{"id":"4a997ca.6faa084","type":"debug","z":"5e6c8b.7f38b374","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":1670,"y":1088,"wires":[]},{"id":"47a5b748.076758","type":"debug","z":"5e6c8b.7f38b374","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"count","targetType":"msg","statusVal":"","statusType":"auto","x":1672,"y":1136,"wires":[]},{"id":"351737b5.60c0a8","type":"comment","z":"5e6c8b.7f38b374","name":"test values","info":"","x":1196,"y":1040,"wires":[]}]
1 Like

Or create a context storage and log id and time, and then filter out times older than ten minutes and count id's.
This example gives an individual count on id and a total and a unique
e.g.
this is set to a 1min count, the inject creates a random id of 1-4

[{"id":"a15ba8f.8ca98d8","type":"change","z":"c74669a0.6a34f8","name":"","rules":[{"t":"set","p":"idCount","pt":"flow","to":"$append(\t   $flowContext(\"idCount\"),\t   [{\"id\":payload.id,\"time\":$millis()}]\t)[time > $millis() - 60*1000]\t","tot":"jsonata"},{"t":"set","p":"count","pt":"msg","to":"$count($flowContext(\"idCount\")[$.id=$$.payload.id])","tot":"jsonata"},{"t":"set","p":"uniqueIdCount","pt":"msg","to":"$count($distinct($flowContext(\"idCount\").[id]))","tot":"jsonata"},{"t":"set","p":"totalcount","pt":"msg","to":"$count($flowContext(\"idCount\"))","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":330,"y":2740,"wires":[["4a9c9eed.fd3bd8"]]},{"id":"f6f7ca7a.e186d8","type":"inject","z":"c74669a0.6a34f8","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"{    \"id\": $ceil($random()*4)}","payloadType":"jsonata","x":90,"y":2740,"wires":[["a15ba8f.8ca98d8"]]},{"id":"4a9c9eed.fd3bd8","type":"debug","z":"c74669a0.6a34f8","name":"","active":true,"tosidebar":true,"console":false,"tostatus":true,"complete":"true","targetType":"full","statusVal":"\"id\" & payload.id & \" count = \" & count & \" ALL = \" & totalcount & \" UNI = \" & uniqueIdCount","statusType":"jsonata","x":580,"y":2740,"wires":[]}]
1 Like

Thanks Steve, works like a charm.

I use it at in the label next to the home icon.
The green vessels are vessels who are sailing, and their labels show distance in KM's from my antenna.

1 Like

Shouldn't vessels be in the blue bit or am I missing something?

image
:wink:

Ah, they are on blue bits. Long thin blue bits.

1 Like

I have changed the unique message counter a bit as I want to reset the counter.
I have changed it to:


var seen = context.get("seen") || {}
var count = global.get("count") || 0

if(!seen[msg.payload.senderMmsi]) {
    count++;
    seen[msg.payload.senderMmsi] = true
}
context.set("seen", seen);
global.set("count", count);
node.status({text: "Inique IDs: " + count})
msg.payload.count = count.toFixed(0)
return msg;

I then use a change node to set global.count to 0.
How ever, when I set global.count to 0, it maybe counts up to 2 or 3 and then freezes.
Why does it do that?

[{"id":"99bca230.f1e7b","type":"function","z":"a7b2390e.90d3a8","name":"Unique message counter","func":"\nvar seen = context.get(\"seen\") || {}\nvar count = global.get(\"count\") || 0\n\nif(!seen[msg.payload.senderMmsi]) {\n    count++;\n    seen[msg.payload.senderMmsi] = true\n}\ncontext.set(\"seen\", seen);\nglobal.set(\"count\", count);\nnode.status({text: \"Inique IDs: \" + count})\nmsg.payload.count = count.toFixed(0)\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":1610,"y":120,"wires":[["2db6983.b048068"]]},{"id":"13307c21.232e74","type":"inject","z":"a7b2390e.90d3a8","name":"Reset Counter","props":[{"p":"payload"}],"repeat":"600","crontab":"","once":true,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":1170,"y":200,"wires":[["2bfe4b27.63e8a4"]]},{"id":"2bfe4b27.63e8a4","type":"change","z":"a7b2390e.90d3a8","name":"Reset Counter","rules":[{"t":"set","p":"count","pt":"global","to":"0","tot":"num"}],"action":"","property":"","from":"","to":"","reg":false,"x":1380,"y":200,"wires":[["89d9519d.fcd6"]]}]

But when I delete global.count in a change node, it works okay.

It wont count because seen still contains all the seen IDs - thus only unseen items are counted

Try this...

[{"id":"b127e9dd.ebda88","type":"inject","z":"6d01e3e4.6003b4","name":"1000","props":[{"p":"payload.senderMmsi","v":"1000","vt":"num"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":650,"y":1180,"wires":[["99bca230.f1e7b"]]},{"id":"aae37861.f01d48","type":"inject","z":"6d01e3e4.6003b4","name":"1001","props":[{"p":"payload.senderMmsi","v":"1001","vt":"num"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":652,"y":1228,"wires":[["99bca230.f1e7b"]]},{"id":"69f69a4.ba09064","type":"inject","z":"6d01e3e4.6003b4","name":"1002","props":[{"p":"payload.senderMmsi","v":"1002","vt":"num"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":652,"y":1276,"wires":[["99bca230.f1e7b"]]},{"id":"5e69c1e4.cca27","type":"inject","z":"6d01e3e4.6003b4","name":"1003","props":[{"p":"payload.senderMmsi","v":"1003","vt":"num"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":652,"y":1324,"wires":[["99bca230.f1e7b"]]},{"id":"99bca230.f1e7b","type":"function","z":"6d01e3e4.6003b4","name":"Unique message counter","func":"var seen = context.get(\"seen\") || {}\nvar count = context.get(\"count\") || 0\n\nif(msg.topic == \"reset\") {\n    seen = {}\n    count = 0;\n} else if(!seen[msg.payload.senderMmsi]) {\n    count++;\n    seen[msg.payload.senderMmsi] = true\n}\n\ncontext.set(\"seen\", seen);\ncontext.set(\"count\", count);\nnode.status({text: \"Inique IDs: \" + count})\nmsg.count = count\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":902,"y":1180,"wires":[["4a997ca.6faa084","47a5b748.076758"]]},{"id":"4a997ca.6faa084","type":"debug","z":"6d01e3e4.6003b4","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":1120,"y":1180,"wires":[]},{"id":"47a5b748.076758","type":"debug","z":"6d01e3e4.6003b4","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"count","targetType":"msg","statusVal":"","statusType":"auto","x":1122,"y":1228,"wires":[]},{"id":"351737b5.60c0a8","type":"comment","z":"6d01e3e4.6003b4","name":"test values","info":"","x":640,"y":1080,"wires":[]},{"id":"dc4f6d6d.2ad38","type":"inject","z":"6d01e3e4.6003b4","name":"","props":[{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"reset","x":650,"y":1120,"wires":[["99bca230.f1e7b"]]}]

↑ avoids the use of global context

This does not reset

i edited the code 2 mins ago - try it again
cvqGJ0sPx6