Time based - sliding window - simple stats

Hi,
I'm super new to Node-Red, so I apologize if this is not the right spot to ask this. I've looked at the Contrib nodes and I see things that are close, but not exact. I'm also far from a JS wizard so I'm a smidge hesitant to try to edit a node.

I've got data flowing in over time, and I'd like a sliding window based on time, to collect values and then calculate simple things: average, median, min, max. And emit the results only when the time period expires.

I quickly found " Generate a time Weighted Average plus Max and Min for user defined period" but I cannot understand why it generates the results it does and I don't think it's what I want.

This node "rate-avg" does behave the way I'd like my node to work. But it only emits the average over the time window. I'd like to get those additional stats: min.max, median, etc.

Does anyone know of such a node? Or would I probably be best served by just modifying the rate-avg javascript to add what I want? I think with ChatGPTs help, I could probably make the changes, but I thought I'd ask here first.

thank you

Hi @pconroy328,
Can the node-red-node-smooth node perhaps be of any help?
Bart

Thank you Bart. If I understand the documentation correctly, smooth is quantity based. I'm looking for a sliding window that emits/calcs based on time.

I'd like the calculations to run every 60 seconds, regardless of how many data samples are in the buffer.

Thank you.

I would do it in a Function node rather than creating a new node, if that is what you meant.

I've got data flowing in over time, and I'd like a sliding window based on time, to collect values and then calculate simple things: average, median, min, max. And emit the results only when the time period expires.

Here is a quick example how to solve it:

The data goes inside the store function node and is saved as an array in the node context store
https://nodered.org/docs/user-guide/writing-functions#storing-data
.
When the msg.topic is 'dump', the function node return the saved data (here, as an array)

The switch node looks if the payload is an array. if so, the second function node do whatever you want with this array.

You can use the inject-node to emit the dump-msg every x seconds

If you put this in a subflow, you can reuse this logic

[{"id":"1386b228cb3e1a1c","type":"function","z":"1160bb977c192b32","name":"store","func":"if (msg.topic == 'dump') {\n    let data = context.get('data') || []\n    msg.payload = data\n    context.set('data', [])\n\n} else {\n    let payload = msg.payload\n    let data = context.get('data') || []\n    data.push(payload)\n    context.set('data',data)\n}\nreturn msg;","outputs":1,"timeout":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":510,"y":640,"wires":[["39b19b80dbca327d"]]},{"id":"64c92e04ec4f3808","type":"inject","z":"1160bb977c192b32","name":"add values","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"1","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"$floor($random() * 10)\t\t","payloadType":"jsonata","x":330,"y":620,"wires":[["1386b228cb3e1a1c"]]},{"id":"05ad16d40813b354","type":"inject","z":"1160bb977c192b32","name":"dump","props":[{"p":"topic","vt":"str"}],"repeat":"10","crontab":"","once":false,"onceDelay":0.1,"topic":"dump","x":340,"y":660,"wires":[["1386b228cb3e1a1c"]]},{"id":"1f262f56255cd37a","type":"debug","z":"1160bb977c192b32","name":"debug 78","active":true,"tosidebar":true,"console":false,"tostatus":true,"complete":"payload","targetType":"msg","statusVal":"payload","statusType":"auto","x":1040,"y":600,"wires":[]},{"id":"39b19b80dbca327d","type":"switch","z":"1160bb977c192b32","name":"","property":"payload","propertyType":"msg","rules":[{"t":"istype","v":"array","vt":"array"},{"t":"else"}],"checkall":"true","repair":false,"outputs":2,"x":670,"y":640,"wires":[["fde7749eea9562dc"],["3eb39d92eb329e8d"]]},{"id":"fde7749eea9562dc","type":"function","z":"1160bb977c192b32","name":"calculate stuff","func":"let new_payload = {}\nconst payload = msg.payload\n\n\n//https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Math/max\nconst min = Math.min(...payload)\nconst max = Math.max(...payload)\n\n// https://stackoverflow.com/a/10624256\nconst sum = payload.reduce((a, b) => a + b, 0);\nconst avg = (sum / payload.length) || 0;\n\nnew_payload[\"min\"] = min\nnew_payload[\"max\"] = max\nnew_payload[\"sum\"] = sum\nnew_payload[\"avg\"] = avg\n\n\nmsg.payload = new_payload\nreturn msg;","outputs":1,"timeout":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":880,"y":600,"wires":[["1f262f56255cd37a"]]},{"id":"3eb39d92eb329e8d","type":"debug","z":"1160bb977c192b32","name":"debug 80","active":false,"tosidebar":true,"console":false,"tostatus":true,"complete":"payload","targetType":"msg","statusVal":"payload","statusType":"auto","x":940,"y":660,"wires":[]}]
1 Like

Does that do time based averaging or sample based? I believe that @pconroy328 requires this to be a sliding time window, where the samples are not coming in at a regular rate.

@pconroy328 correct me if I am wrong.

so if you safe the time, you can integrate the values over time and then calculate the average:
i used this blog entry from timescaledb as a blueprint

https://www.timescale.com/blog/what-time-weighted-averages-are-and-why-you-should-care/

function timeWeightedAverage(data, timestamps) {
    if (data.length !== timestamps.length) {
        node.warn("array lenght is not equal");
    }
    
    let total_duration = 0.0;
    let total_area = 0.0
    
    for (let i = 1; i < data.length; i++) {
        let d_t = timestamps[i] - timestamps[i - 1];
        let v_1 = data[i -1]
        let v_2 = data[i]
        let area = ((v_1+v_2))/2 * d_t
        total_area += area
        total_duration += d_t ;
    }
    
    const time_weighted_avg = total_area / total_duration;
    return time_weighted_avg;
}

[{"id":"1386b228cb3e1a1c","type":"function","z":"1160bb977c192b32","name":"store","func":"if (msg.topic == 'dump') {\n    let data = context.get('data') || []\n    let times = context.get('times') || []\n    \n    msg.payload = {}\n    msg.payload[\"data\"] = data\n    msg.payload[\"times\"]= times\n\n    context.set('data', [])\n    context.set('times', [])\n\n} else {\n    let payload = msg.payload\n    let now = Date.now()\n    let data = context.get('data') || []\n    let times = context.get('times') || []\n\n    data.push(payload)\n    times.push(now)\n    context.set('data',data)\n    context.set('times',times)\n}\nreturn msg;","outputs":1,"timeout":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":530,"y":480,"wires":[["39b19b80dbca327d"]]},{"id":"64c92e04ec4f3808","type":"inject","z":"1160bb977c192b32","name":"add values","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"$floor($random() * 10)\t\t","payloadType":"jsonata","x":340,"y":460,"wires":[["1386b228cb3e1a1c"]]},{"id":"05ad16d40813b354","type":"inject","z":"1160bb977c192b32","name":"dump","props":[{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"dump","x":350,"y":500,"wires":[["1386b228cb3e1a1c"]]},{"id":"1f262f56255cd37a","type":"debug","z":"1160bb977c192b32","name":"debug 78","active":true,"tosidebar":true,"console":false,"tostatus":true,"complete":"payload","targetType":"msg","statusVal":"payload","statusType":"auto","x":1180,"y":440,"wires":[]},{"id":"39b19b80dbca327d","type":"switch","z":"1160bb977c192b32","name":"","property":"payload","propertyType":"msg","rules":[{"t":"istype","v":"object","vt":"object"},{"t":"else"}],"checkall":"true","repair":false,"outputs":2,"x":690,"y":480,"wires":[["fde7749eea9562dc","7ffa4bd22e78b1cd"],["3eb39d92eb329e8d"]]},{"id":"fde7749eea9562dc","type":"function","z":"1160bb977c192b32","name":"calculate stuff","func":"function timeWeightedAverage(data, timestamps) {\n    if (data.length !== timestamps.length) {\n        node.warn(\"array lenght is not equal\");\n    }\n    \n    let total_weighted_sum = 0.0;\n    let total_duration = 0.0;\n    let total_area = 0.0\n    \n    for (let i = 1; i < data.length; i++) {\n        let d_t = timestamps[i] - timestamps[i - 1];\n        let v_1 = data[i -1]\n        let v_2 = data[i]\n        let area = ((v_1+v_2))/2 * d_t\n        total_area += area\n        const duration = timestamps[i] - timestamps[i - 1];\n        total_duration += duration;\n    }\n    \n    const time_weighted_avg = total_area / total_duration;\n    return time_weighted_avg;\n}\n\n\nlet new_payload = {}\nconst data = msg.payload.data\nconst times = msg.payload.times\n\n\n//https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Math/max\nconst min = Math.min(...data)\nconst max = Math.max(...data)\n\n// https://stackoverflow.com/a/10624256\nconst sum = data.reduce((a, b) => a + b, 0);\n\n\nconst avg_qty = (sum / data.length) || 0;\nlet avg_time = timeWeightedAverage(data, times)\n\n\n\nnew_payload[\"min\"] = min\nnew_payload[\"max\"] = max\nnew_payload[\"sum\"] = sum\nnew_payload[\"avg_qty\"] = avg_qty\nnew_payload[\"avg_time\"] = avg_time\n\nmsg.payload = new_payload\nreturn msg;","outputs":1,"timeout":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":940,"y":440,"wires":[["1f262f56255cd37a"]]},{"id":"3eb39d92eb329e8d","type":"debug","z":"1160bb977c192b32","name":"debug 80","active":false,"tosidebar":true,"console":false,"tostatus":true,"complete":"payload","targetType":"msg","statusVal":"payload","statusType":"auto","x":940,"y":520,"wires":[]},{"id":"1e1b9cc2bc90798e","type":"inject","z":"1160bb977c192b32","name":"test time weight","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"{\"data\":[0,10,0],\"times\":[1,2,3]}","payloadType":"json","x":500,"y":380,"wires":[["39b19b80dbca327d"]]},{"id":"7ffa4bd22e78b1cd","type":"debug","z":"1160bb977c192b32","name":"debug 83","active":true,"tosidebar":true,"console":false,"tostatus":true,"complete":"payload","targetType":"msg","statusVal":"payload","statusType":"auto","x":960,"y":380,"wires":[]}]
1 Like

Is that doing a sliding time window? I can't see where it is dropping old samples off as it adds new ones.

When you dump the data, the current data in the function node is dropped. If you dump the data every x second, you can use the smooth node for more damping the data

Then you have the data flow:
Raw data a irregular intervall -> time_weighted_avg -> avg(time_weighted_avg)

[{"id":"1386b228cb3e1a1c","type":"function","z":"8eabc3a82cb1c5d6","name":"store","func":"if (msg.topic == 'dump') {\n    let data = context.get('data') || []\n    let times = context.get('times') || []\n    \n    msg.payload = {}\n    msg.payload[\"data\"] = data\n    msg.payload[\"times\"]= times\n\n    context.set('data', [])\n    context.set('times', [])\n\n} else {\n    let payload = msg.payload\n    let now = Date.now()\n    let data = context.get('data') || []\n    let times = context.get('times') || []\n\n    data.push(payload)\n    times.push(now)\n    context.set('data',data)\n    context.set('times',times)\n}\nreturn msg;","outputs":1,"timeout":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":530,"y":340,"wires":[["39b19b80dbca327d"]]},{"id":"64c92e04ec4f3808","type":"inject","z":"8eabc3a82cb1c5d6","name":"add values","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"noise","v":"noise","vt":"flow"}],"repeat":"1","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"$floor($random() * 10)\t\t","payloadType":"jsonata","x":150,"y":340,"wires":[["263120f34b38e12f"]]},{"id":"05ad16d40813b354","type":"inject","z":"8eabc3a82cb1c5d6","name":"dump","props":[{"p":"topic","vt":"str"}],"repeat":"15","crontab":"","once":false,"onceDelay":0.1,"topic":"dump","x":380,"y":400,"wires":[["1386b228cb3e1a1c"]]},{"id":"1f262f56255cd37a","type":"debug","z":"8eabc3a82cb1c5d6","name":"debug 78","active":true,"tosidebar":true,"console":false,"tostatus":true,"complete":"payload","targetType":"msg","statusVal":"payload","statusType":"auto","x":1180,"y":300,"wires":[]},{"id":"39b19b80dbca327d","type":"switch","z":"8eabc3a82cb1c5d6","name":"","property":"payload","propertyType":"msg","rules":[{"t":"istype","v":"object","vt":"object"},{"t":"else"}],"checkall":"true","repair":false,"outputs":2,"x":690,"y":340,"wires":[["fde7749eea9562dc"],["3eb39d92eb329e8d","6dcac8967fe192b6"]]},{"id":"fde7749eea9562dc","type":"function","z":"8eabc3a82cb1c5d6","name":"calculate stuff","func":"function timeWeightedAverage(data, timestamps) {\n    if (data.length !== timestamps.length) {\n        node.warn(\"array lenght is not equal\");\n    }\n    \n    let total_weighted_sum = 0.0;\n    let total_duration = 0.0;\n    let total_area = 0.0\n    \n    for (let i = 1; i < data.length; i++) {\n        let d_t = timestamps[i] - timestamps[i - 1];\n        let v_1 = data[i -1]\n        let v_2 = data[i]\n        let area = ((v_1+v_2))/2 * d_t\n        total_area += area\n        const duration = timestamps[i] - timestamps[i - 1];\n        total_duration += duration;\n    }\n    \n    const time_weighted_avg = total_area / total_duration;\n    return time_weighted_avg;\n}\n\n\nlet new_payload = {}\nconst data = msg.payload.data\nconst times = msg.payload.times\n\n\n//https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Math/max\nconst min = Math.min(...data)\nconst max = Math.max(...data)\n\n// https://stackoverflow.com/a/10624256\nconst sum = data.reduce((a, b) => a + b, 0);\n\n\nconst avg_qty = (sum / data.length) || 0;\nlet avg_time = timeWeightedAverage(data, times)\n\n\n\nnew_payload[\"min\"] = min\nnew_payload[\"max\"] = max\nnew_payload[\"sum\"] = sum\nnew_payload[\"avg_qty\"] = avg_qty\nnew_payload[\"avg_time\"] = avg_time\n\nmsg.payload = new_payload\nreturn msg;","outputs":1,"timeout":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":940,"y":300,"wires":[["1f262f56255cd37a","40bc9563f5d8199c"]]},{"id":"3eb39d92eb329e8d","type":"debug","z":"8eabc3a82cb1c5d6","name":"debug 80","active":false,"tosidebar":true,"console":false,"tostatus":true,"complete":"payload","targetType":"msg","statusVal":"payload","statusType":"auto","x":1060,"y":400,"wires":[]},{"id":"6dcac8967fe192b6","type":"change","z":"8eabc3a82cb1c5d6","name":"","rules":[{"t":"set","p":"topic","pt":"msg","to":"raw","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":780,"y":480,"wires":[["3fb31fa63d166b47"]]},{"id":"3fb31fa63d166b47","type":"ui-chart","z":"8eabc3a82cb1c5d6","group":"b96c5dc1f181495b","name":"","label":"chart","order":9007199254740991,"chartType":"line","category":"topic","categoryType":"msg","xAxisProperty":"","xAxisPropertyType":"msg","xAxisType":"time","yAxisProperty":"","ymin":"","ymax":"","action":"append","pointShape":"circle","pointRadius":4,"showLegend":true,"removeOlder":"2","removeOlderUnit":"60","removeOlderPoints":"200","colors":["#1f77b4","#aec7e8","#ff7f0e","#2ca02c","#98df8a","#d62728","#ff9896","#9467bd","#c5b0d5"],"width":6,"height":8,"className":"","x":1790,"y":440,"wires":[[]]},{"id":"40bc9563f5d8199c","type":"change","z":"8eabc3a82cb1c5d6","name":"","rules":[{"t":"set","p":"payload","pt":"msg","to":"payload.avg_time","tot":"msg"},{"t":"set","p":"topic","pt":"msg","to":"time_weighted_avg","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":1230,"y":220,"wires":[["3fb31fa63d166b47","6b81f1075c570a7b"]]},{"id":"6b81f1075c570a7b","type":"smooth","z":"8eabc3a82cb1c5d6","name":"","property":"payload","action":"mean","count":"10","round":"","mult":"single","reduce":false,"x":1480,"y":200,"wires":[["236ec8d67c810693"]]},{"id":"f27f95968b05d2be","type":"ui-slider","z":"8eabc3a82cb1c5d6","group":"b96c5dc1f181495b","name":"add noise","label":"noise","tooltip":"","order":0,"width":0,"height":0,"passthru":false,"outs":"end","topic":"topic","topicType":"msg","thumbLabel":true,"min":"-10","max":10,"step":1,"className":"","x":200,"y":240,"wires":[["ea12657eb7300151"]]},{"id":"ea12657eb7300151","type":"change","z":"8eabc3a82cb1c5d6","name":"","rules":[{"t":"set","p":"noise","pt":"flow","to":"payload","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":380,"y":240,"wires":[[]]},{"id":"263120f34b38e12f","type":"change","z":"8eabc3a82cb1c5d6","name":"","rules":[{"t":"set","p":"payload","pt":"msg","to":"payload * noise","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":330,"y":340,"wires":[["1386b228cb3e1a1c"]]},{"id":"b96c5dc1f181495b","type":"ui-group","name":"Group Name","page":"d2203bba5961af37","width":"6","height":"1","order":-1,"showTitle":true,"className":"","visible":"true","disabled":"false"},{"id":"d2203bba5961af37","type":"ui-page","name":"Page Name","ui":"2c91ba6fb44ea823","path":"/page1","icon":"home","layout":"grid","theme":"e40d02373fbb58c8","order":-1,"className":"","visible":"true","disabled":"false"},{"id":"2c91ba6fb44ea823","type":"ui-base","name":"UI Name","path":"/dashboard","includeClientData":true,"acceptsClientConfig":["ui-notification","ui-control"],"showPathInSidebar":false},{"id":"e40d02373fbb58c8","type":"ui-theme","name":"Theme Name","colors":{"surface":"#ffffff","primary":"#0094ce","bgPage":"#eeeeee","groupBg":"#ffffff","groupOutline":"#cccccc"},"sizes":{"pagePadding":"12px","groupGap":"12px","groupBorderRadius":"4px","widgetGap":"12px"}}]

That isn't a sliding window. A sliding window of, for example, 10 minutes, would drop any samples older than 10 minutes each time a new sample is added to the buffer, and would calculate the statistics each time for the data currently in the buffer.

Thank you @Colin - that's correct. The data comes in at an irregular, unpredictable rate. For a ten second window, there could be 2, 5, 10 or 0 samples.

Thank you @kitori for the excellent explanation and details on what a time-based average calculation does.

I'll read more, but I think @Colin states it well: I call it a sliding window, but I think it's also called a moving average. The number of samples in the time period is unknown, the arrival rate of the sample will vary. For a "60 second window" samples in the buffer that are older than 60 seconds are discarded.

In my case, the trigger for the processing can be another message/sample. I don't need a clock tick. For example, if there's a 120 second gap between two samples, then when the 120-second-delayed sample arrives, all older samples in the buffer would be discarded.

The rate-avg node does this (I think). It does deliver the average in of samples in a sliding window. I just wanted to expand it to include some additional data with the average - have it also return the min/max of data in the buffer.

thank you all for your help!

1 Like

Maybe the batch node and a reduced sequence join would do what is required.
e.g.

[{"id":"7414c41373289610","type":"inject","z":"b779de97.b1b46","name":"random","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"$round($random()*10)","payloadType":"jsonata","x":90,"y":7700,"wires":[["172082b9894bf1c0"]]},{"id":"172082b9894bf1c0","type":"batch","z":"b779de97.b1b46","name":"","mode":"interval","count":10,"overlap":0,"interval":10,"allowEmptySequence":false,"topics":[],"x":260,"y":7720,"wires":[["0e84b598dde37e10"]]},{"id":"0e84b598dde37e10","type":"join","z":"b779de97.b1b46","name":"","mode":"reduce","build":"object","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","accumulate":true,"timeout":"","count":"","reduceRight":false,"reduceExp":"$append($A,$$.payload)","reduceInit":"[]","reduceInitType":"json","reduceFixup":"(\t    $sorted := $A^($);\t    {\t        \"average\": $sum($sorted) / $count($sorted),\t        \"max\": $sorted[-1],\t        \"min\": $sorted[0]\t    }\t)","x":410,"y":7720,"wires":[["2f74c498199630e5"]]},{"id":"2f74c498199630e5","type":"debug","z":"b779de97.b1b46","name":"debug 2490","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":590,"y":7720,"wires":[]}]

Thank you @E1cid - I'll look at that one.

I took the dog for a walk, on a beautiful warm day in Colorado and started thinking that I could be asking too much for one node. Maybe I'd be better served by one node that simply aggregates the samples over a time window and then passes it on to another node for the calculations. I could use existing stats nodes for the min/max/avg calcs.

The great thing about software is that there's usually a handful of ways to solve a problem. I'll look to see if there's a simple "buffer values over a given time period" node.

thanks

Why do you need a sliding time window? It is unusual for this to be the best solution.

Agreed. I have a number of sensors that send values. Some times they will send once a second, other times maybe no values for several minutes. When they do send, I'm interested in their average value over the course of a minute. Sometimes over the course of 15 minutes.

Accumulating the values of "the last N seconds/minutes" is only what I'm after. Like mentioned, the "older" values should age out and be discarded.

If all the values are discarded, if everything is aged out, I can deal with that. That's ok.

I don't think, in my case, there's a pragmatic difference if I use a sliding window node, or just a node that dumps it's buffer every N seconds. The only difference would be when t0 is. Which wouldn't really matter in my case.

[ don't know if it helps, but one sensor is GPS based. And something like 'track' only makes sense to me if it's a number of values, recent values. ]

Usually a time based low pass filter is better for situations like this.

I'm not smart enough to know about those - I'll do some research! Thank you!

Here is a 15 second time constant low pass RC filter.

// Applies a simple RC low pass filter to incoming payload values
const tc = 15*1000;       // time constant in milliseconds

let lastValue = context.get('lastValue');
if (typeof lastValue == "undefined") lastValue = msg.payload;
const lastTime = context.get('lastTime') || null;
const now = new Date();
const currentValue = msg.payload;
let newValue
if (lastTime === null) {
    // first time through
    newValue = currentValue;
} else {
    const dt = now.getTime() - lastTime.getTime();
    if (dt > 0) {
        let dtotc = dt / tc;
        // clamp to max of 1 to prevent silly results
        dtotc = Math.min(dtotc, 1)
        newValue = lastValue * (1 - dtotc) + currentValue * dtotc;
    } else {
        // no time has elapsed leave output the same as last time
        newValue = lastValue;
    }
}
context.set('lastValue', newValue);
context.set('lastTime', now);

msg.payload = newValue;
return msg;

Hi @Colin,
Can you please explain that a bit more in detail, in noob terminology. Thanks!!