Node loop restarting before it is finished?

I have a couple of function nodes in a row, each with loops. The first node seems to be doing its business. The second function looks like this:

for (let i = 0; i < msg.payload[0].timeseries.length; i++) {
    var value_timestamp = msg.payload[0].timeseries[i].time;
    var value = msg.payload[0].timeseries[i].value;
    msg.payload = [
        {
            do things here with value and value_timestamp
        }
    ]
    node.send(msg);
};
return msg;

by the time it gets to the second node, messages are coming in by the thousands and very fast.

I am only getting one datapoint written out (one value / time pair) even though that array that is being looped is huge (10k objects)

msg.payload[0].timeseries[10000]

here is what the data coming into this function looks like:

How come I'm not getting all my datapoints?

Can you do an inject node into the 2nd function with the necessary data format and see it process as you would expect ? (And a debug output would be good)

Also - (not certain) but i do not think you can get the length of an object directly ?

Craig

I think there could be two problems: your input rate might be to high and processing the array eventually takes to long. I've tested to process a flow with 100000 items each 10 ms. My machine then takes 300 ms to process one array. Dividing the array into chunks of 100 isn't blocking parallel branches. But the input can only be read each 300 ms not to get an overflow:

[
    {
        "id": "5bf81ce259910494",
        "type": "inject",
        "z": "839d658c3816e9e7",
        "name": "Timestamp start each 10 ms",
        "props": [
            {
                "p": "topic",
                "vt": "str"
            },
            {
                "p": "timeStart",
                "v": "",
                "vt": "date"
            },
            {
                "p": "timeEnd",
                "v": "",
                "vt": "date"
            }
        ],
        "repeat": "0.01",
        "crontab": "",
        "once": true,
        "onceDelay": 0.1,
        "topic": "measure duration",
        "x": 190,
        "y": 1740,
        "wires": [
            [
                "6e9696f91d3fc4c1"
            ]
        ]
    },
    {
        "id": "6e9696f91d3fc4c1",
        "type": "function",
        "z": "839d658c3816e9e7",
        "name": "timestamp 1.1 (array of 100000 items)",
        "func": "msg.topic = \"value 1.1\"\nmsg.timeEnd = new Date().valueOf();\nmsg.duration11 = msg.timeEnd - msg.timeStart;\nmsg.payload = new Array(100000).fill(\"starting value\");\nreturn msg;",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 510,
        "y": 1740,
        "wires": [
            [
                "85333a3f7fecd05f"
            ]
        ]
    },
    {
        "id": "17d4f4617b63fc68",
        "type": "debug",
        "z": "839d658c3816e9e7",
        "name": "",
        "active": true,
        "tosidebar": true,
        "console": false,
        "tostatus": false,
        "complete": "true",
        "targetType": "full",
        "statusVal": "",
        "statusType": "auto",
        "x": 1810,
        "y": 1920,
        "wires": []
    },
    {
        "id": "4bb669bcfe173505",
        "type": "function",
        "z": "839d658c3816e9e7",
        "name": "process the items + timestamp 1.2 (split to chunks of 100)",
        "func": "const arrPart = []\nlet startIdx;\nlet endIdx;\n\nif(msg.trigger === undefined){\n    context.set('array', msg.payload);\n    msg.parts = {};\n    msg.parts.type = \"array\";\n    msg.parts.count = Math.ceil(msg.payload.length/100);\n    msg.parts.len = 100;\n    msg.parts.index = 0;\n    msg.parts.id = msg._msgid;\n    msg.arraylength = msg.payload.length;\n    startIdx = msg.parts.index;\n}else{\n    startIdx = msg.parts.index * 100\n}\n\n//divide into chunks of 100\nendIdx = startIdx + 100;\nif(msg.arraylength < endIdx){\n    endIdx = msg.arraylength;\n    msg.parts.len = endIdx - startIdx;\n}\n\nfor(let i=startIdx; i<endIdx; i++){\n    //************************************************************\n    //WORK TO BE DONE FOR EVERY ITEM!!!!!!\n    if(msg.parts.index==0){\n        msg.duration12Start = new Date().valueOf() - msg.timeStart;\n    }else{\n        msg.duration12End = new Date().valueOf() - msg.timeStart; \n    }\n    arrPart.push(context.get(\"array\")[i] + \" was changed\");\n    //************************************************************\n}\n\nmsg.payload = arrPart;\nmsg._msgid = RED.util.generateId();\nreturn msg;",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 1290,
        "y": 1780,
        "wires": [
            [
                "375d5a507181218e"
            ]
        ]
    },
    {
        "id": "375d5a507181218e",
        "type": "function",
        "z": "839d658c3816e9e7",
        "name": "process control (while do)",
        "func": "if(msg.parts.index < msg.parts.count-1){\n    let triggerMsg = RED.util.cloneMessage(msg);\n    triggerMsg.trigger = true;\n    triggerMsg.parts.index++;\n    return[msg, triggerMsg];\n}\nreturn [msg, null];\n\n",
        "outputs": 2,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 1230,
        "y": 1700,
        "wires": [
            [
                "abdc75fee10c8291"
            ],
            [
                "4bb669bcfe173505"
            ]
        ]
    },
    {
        "id": "abdc75fee10c8291",
        "type": "join",
        "z": "839d658c3816e9e7",
        "name": "",
        "mode": "auto",
        "build": "object",
        "property": "payload",
        "propertyType": "msg",
        "key": "topic",
        "joiner": "\\n",
        "joinerType": "str",
        "accumulate": "false",
        "timeout": "",
        "count": "",
        "reduceRight": false,
        "x": 1490,
        "y": 1700,
        "wires": [
            [
                "17d4f4617b63fc68",
                "79db583c55da54b7"
            ]
        ]
    },
    {
        "id": "e5666ac6e3592b45",
        "type": "function",
        "z": "839d658c3816e9e7",
        "name": "timestamp 2.1",
        "func": "msg.topic = \"measure 2.1\"\nmsg.timeEnd = new Date().valueOf();\nmsg.duration21 = msg.timeEnd - msg.timeStart;\nreturn msg;",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 900,
        "y": 1920,
        "wires": [
            [
                "6d7aeefef80e5b92"
            ]
        ]
    },
    {
        "id": "6d7aeefef80e5b92",
        "type": "function",
        "z": "839d658c3816e9e7",
        "name": "timestamp 2.2",
        "func": "msg.topic = \"measure 2.2\"\nmsg.timeEnd = new Date().valueOf();\nmsg.duration22 = msg.timeEnd - msg.timeStart;\nreturn msg;",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 1140,
        "y": 1920,
        "wires": [
            [
                "6031269dfd3065d3"
            ]
        ]
    },
    {
        "id": "6031269dfd3065d3",
        "type": "function",
        "z": "839d658c3816e9e7",
        "name": "timestamp 2.3",
        "func": "msg.topic = \"measure 2.3\"\nmsg.timeEnd = new Date().valueOf();\nmsg.duration23 = msg.timeEnd - msg.timeStart;\nreturn msg;",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 1380,
        "y": 1920,
        "wires": [
            [
                "17d4f4617b63fc68"
            ]
        ]
    },
    {
        "id": "7482684a98383440",
        "type": "comment",
        "z": "839d658c3816e9e7",
        "name": "Process the first 100 array items. Then split them of and direct them to the Join Node. Input reverse to the Function Node to process the next 100 items. Split the items of...etc.  ==> No blocking of parallel branches (or the node.js event loop) + faster handling of huge arrays",
        "info": "blockiert parallele Nodes stark",
        "x": 930,
        "y": 1560,
        "wires": []
    },
    {
        "id": "91fb4e0bc0b15941",
        "type": "simple-queue",
        "z": "839d658c3816e9e7",
        "name": "",
        "firstMessageBypass": true,
        "bypassInterval": "0",
        "x": 950,
        "y": 1780,
        "wires": [
            [
                "4bb669bcfe173505"
            ]
        ]
    },
    {
        "id": "79db583c55da54b7",
        "type": "function",
        "z": "839d658c3816e9e7",
        "name": "trigger next array",
        "func": "\nreturn {trigger:true};",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 970,
        "y": 1600,
        "wires": [
            [
                "91fb4e0bc0b15941"
            ]
        ]
    },
    {
        "id": "85333a3f7fecd05f",
        "type": "change",
        "z": "839d658c3816e9e7",
        "name": "set ttl",
        "rules": [
            {
                "t": "set",
                "p": "ttl",
                "pt": "msg",
                "to": "10",
                "tot": "num"
            }
        ],
        "action": "",
        "property": "",
        "from": "",
        "to": "",
        "reg": false,
        "x": 730,
        "y": 1780,
        "wires": [
            [
                "91fb4e0bc0b15941"
            ]
        ]
    },
    {
        "id": "2be3e438fb8f1ec0",
        "type": "inject",
        "z": "839d658c3816e9e7",
        "name": "Timestamp start each 1000 ms",
        "props": [
            {
                "p": "topic",
                "vt": "str"
            },
            {
                "p": "timeStart",
                "v": "",
                "vt": "date"
            },
            {
                "p": "timeEnd",
                "v": "",
                "vt": "date"
            }
        ],
        "repeat": "1",
        "crontab": "",
        "once": true,
        "onceDelay": 0.1,
        "topic": "Laufzeit messen",
        "x": 210,
        "y": 1920,
        "wires": [
            [
                "e5666ac6e3592b45"
            ]
        ]
    },
    {
        "id": "a130d79ac0568a80",
        "type": "comment",
        "z": "839d658c3816e9e7",
        "name": "Throttle the input rate!",
        "info": "",
        "x": 840,
        "y": 1740,
        "wires": []
    }
]

You are returning a msg when the function is finished = 1 msg.

change return msg to return null

from my understanding, return null would just return nothing?

Could you explain your solution further? I don't fully understand the relation between node.send(msg); and return msg;

I have another node with a loop that ends the same way, and it does output all the messages I expect.

node.send(msg) and return msg both pass on a message. If you use send() during the function it will send a message reach time it hits send(). If you also have return msg at the end it will send another message. If that is what you want then ok, if you don't want the extra one at the end then either remove the return or make it return null.

ahh, interesting. In that case, I don't need the end return msg; , but that doesn't seem to be the source of my problem, where only one of the 10000 timestamps are being processed. The loop should keep looping. I think because it has 10k timestamps to go though, it just doesn't have time, as the next message comes into the node immediately, and they just keep coming (roughly 1000 of them). Seems like I need to re-think this architecture.

This is a one-time process, I just need to back-fill the last ~3 months of timestamps from the API. After that, I am only interested in the last timestamp, as I'm storing these datapoints in influxdb.

If it’s a one off, then use a delay node in rate limit mode set to say 20 per second and just let it take its time

In that case don't split it into individual messages. The influx nodes allow an array of samples to be added in one transaction. Split it into sets of maybe 1000 samples and transform that array into the form needed by influx, using Array.map() for example, and pass it to influx. You may well achieve a speed increase of 100 times.

1 Like

Colin that's a very smart solution... I hope I'm smart enough to implement it :sweat_smile:

I am starting with a list of available "signals", fetched from the API, plunked into a flow variable

Then I fetch a list of "machines". I use a split node to bust this list up into individual messages. Then I feed that into this function, to build the next API request:

// get array of signals from flow variable
let signals = flow.get('signals') || [];

// get machine ID from incoming payload
var id = msg.payload.id;

// loop through each signal in the flow.signals array one at a time until the array is empty
for (let i = 0; i < signals.signals.length; i++) {
    msg.payload = {
        "bucketsize": 10000,
        "limit": 10000,
        "from": 1651435441000,
        "to": 1660075441000,
        "queries": [
            {
                "signal": signals.signals[i].key,
                "aggregationFunction": "raw",
                "groupBy": {
                    "type": "machine",
                    "id": id
                }
            }
        ]
    }
    node.send(msg);
};

I send the output msg back into the API at a /timeseries endpoint, this fetches the timeseries data, one "signal" at a time, one machine at a time. Each of these requests returns the msg.payload[0].timeseries[10000]
that I shared above.

Now I need to take that and put it into InfluxDB... which is where I came up with this:

var id = msg.info.id;
var name = msg.info.name ;
var serial = msg.info.serial;
var continent = msg.info.continent;
var signal = msg.payload[0].signal;
var i = 0;

// loop through each signal in the flow.signals array one at a time until the array is empty
for (let i = 0; i < msg.payload[0].timeseries.length; i++) {
    var value_timestamp = msg.payload[0].timeseries[i].time;
    var value = msg.payload[0].timeseries[i].value;
    msg.payload = [
        {
            measurement: "data",
            fields: { [signal]: value },
            tags: {
                machine_id: id,
                machine_name: name,
                machine_continent: continent,
                machine_serial: serial
            },
            timestamp: new Date(value_timestamp)
        }
    ]
    node.send(msg);
};
return null;

I appreciate any advice on how to make this all more efficient. Right now, it's sort of working, but for each signal, for each machine, I only get one timestamp written to the db.

Did you look up Array.map() as I suggested? It allows you to easily specify how to convert each each element of an array into an element of a new array without all that tedious looping.

yes, just need to read it 20 more times lol :slight_smile:

Colin, thank you again!

I took the array.map() plunge and came away happy.

You are absolutely correct with the efficiency. It only takes around 2 seconds to write 10k datapoints into influxdb this way... I almost didn't believe it worked!

The API I am interfacing with requires one API call per machine, so my first loop is still necessary to gather the timeseries data. But once I have that big array of 10k objects, I can write it all at once.

1 Like

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.