Array of promise.allSettled, but the NodeRed version

Hello,

Having a bit of trouble wrapping my head around this.

I have a list of API calls that need to be dispatched all at the same time. Some are Discord Messages, some are REST requests. The REST requests can take considerable time to complete, ranging between a few milliseconds, to upwards of 30 seconds to a minute to complete. There could be n amount of calls (a set amount for a certain flow, but there are several flows with different amounts of calls).

The catch is that a new message could come in before some of the current REST calls are completed, muddling up the order of the join node.

How can I setup distinct array of messages, without having to wait for each set of calls to complete?

Would also like to handle error scenarios if a REST call fails, but I'm guessing that can be done between the http request output and the join node.

Here's the image of the flow of what I'm trying to achieve:

Your problem is the join/finalise part.
Handle each request separately and write whatever output you are looking for into a context variable.
Once you have collected all required variables "finalise".

Hard to say how to apply this to your flow as the image tells us little about it.

If you can use a split node and join set to auto, then the incoming messages get a separate msg.parts.id and can be join correctly.

so either

use a split node

or

Give the messages correct msg.parts properties.

e,g,

[{"id":"2ea561030929bd7f","type":"inject","z":"d1395164b4eec73e","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":240,"y":4040,"wires":[["9037ca690a7af20c"]]},{"id":"9037ca690a7af20c","type":"trigger","z":"d1395164b4eec73e","name":"simulate 2 messages","op1":"[{\"topic\":\"one\",\"payload\":1},{\"topic\":\"two\",\"payload\":2},{\"topic\":\"three\",\"payload\":3}]","op2":"[{\"topic\":\"one\",\"payload\":3},{\"topic\":\"two\",\"payload\":4},{\"topic\":\"three\",\"payload\":5}]","op1type":"json","op2type":"json","duration":"1","extend":false,"overrideDelay":false,"units":"s","reset":"","bytopic":"all","topic":"topic","outputs":1,"x":400,"y":4100,"wires":[["cb240c4064c27d8b"]]},{"id":"cb240c4064c27d8b","type":"split","z":"d1395164b4eec73e","name":"","splt":"\\n","spltType":"str","arraySplt":1,"arraySpltType":"len","stream":false,"addname":"","x":470,"y":4040,"wires":[["8723b2f7e8358aa3","5ec1677aa77126a8"]]},{"id":"8723b2f7e8358aa3","type":"delay","z":"d1395164b4eec73e","name":"","pauseType":"random","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"10","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":600,"y":4040,"wires":[["bae4bab624601f95"]]},{"id":"5ec1677aa77126a8","type":"debug","z":"d1395164b4eec73e","name":"see msg.parts","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":600,"y":3980,"wires":[]},{"id":"bae4bab624601f95","type":"join","z":"d1395164b4eec73e","name":"","mode":"auto","build":"object","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","accumulate":"false","timeout":"","count":"","reduceRight":false,"x":730,"y":4040,"wires":[["71884565f1d58e25"]]},{"id":"71884565f1d58e25","type":"debug","z":"d1395164b4eec73e","name":"debug 2469","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":790,"y":4100,"wires":[]}]

This is how I managed to do it.

The only code is in the Combiner Function node:

const messageQueue = flow.get('messageQueue', 'memoryOnly') ?? {};
const EXPECTED_MESSAGES = 3;

messageQueue[msg._msgid] = {
    ...(messageQueue?.[msg._msgid] ?? {}),
    [msg.topic]: msg.payload
};

if (Object.keys(messageQueue[msg._msgid]).length >= EXPECTED_MESSAGES) {
    msg.messageResult = JSON.parse(JSON.stringify(messageQueue?.[msg._msgid] ?? {}));
    delete messageQueue[msg._msgid];
    flow.set('messageQueue', messageQueue, 'memoryOnly');
    return [msg, null];
}

flow.set('messageQueue', messageQueue, 'memoryOnly');

msg.messageQueue = messageQueue;

return [null, msg];

In each of the Timeout Function nodes I just have:

setTimeout(() => {
  node.send({ ...msg, payload: 'hi2000', topic: 'rest_2000' });
}, 2000);

To simulate a REST request.

The output is as follows:
image

For info purposes of those not great at coding using a function node.

This is all possible using no code and use of msg.parts in a join.

In this example you can send as many injects as you wish and all responses will be joined correctly and grouped as per inject.

[{"id":"2ea561030929bd7f","type":"inject","z":"d1395164b4eec73e","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"{}","payloadType":"json","x":250,"y":4080,"wires":[["a8ae2c70a9972bd4"]]},{"id":"5ec1677aa77126a8","type":"debug","z":"d1395164b4eec73e","name":"see msg.parts","active":false,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":620,"y":4360,"wires":[]},{"id":"71884565f1d58e25","type":"debug","z":"d1395164b4eec73e","name":"debug 2469","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":930,"y":4200,"wires":[]},{"id":"e1cfee19c7cc5c01","type":"change","z":"d1395164b4eec73e","name":"","rules":[{"t":"set","p":"parts","pt":"msg","to":"{\"id\":\"\",\"type\":\"object\",\"count\":3,\"key\":\"rest_100\",\"index\":0}","tot":"json"},{"t":"set","p":"parts.id","pt":"msg","to":"id","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":440,"y":4100,"wires":[["8781925b233eebe1"]]},{"id":"a8ae2c70a9972bd4","type":"change","z":"d1395164b4eec73e","name":"","rules":[{"t":"set","p":"id","pt":"msg","to":"","tot":"date"}],"action":"","property":"","from":"","to":"","reg":false,"x":250,"y":4140,"wires":[["e1cfee19c7cc5c01","8ae5c1adb781d973","f4f842f64e15e888"]]},{"id":"8ae5c1adb781d973","type":"change","z":"d1395164b4eec73e","name":"","rules":[{"t":"set","p":"parts","pt":"msg","to":"{\"id\":\"\",\"type\":\"object\",\"count\":3,\"key\":\"rest_200\",\"index\":1}","tot":"json"},{"t":"set","p":"parts.id","pt":"msg","to":"id","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":440,"y":4140,"wires":[["b3f0b99201ea26c1"]]},{"id":"f4f842f64e15e888","type":"change","z":"d1395164b4eec73e","name":"","rules":[{"t":"set","p":"parts","pt":"msg","to":"{\"id\":\"\",\"type\":\"object\",\"count\":3,\"key\":\"rest_300\",\"index\":2}","tot":"json"},{"t":"set","p":"parts.id","pt":"msg","to":"id","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":440,"y":4180,"wires":[["1de7b805ac06c5de"]]},{"id":"886a5198dfb8d291","type":"http in","z":"d1395164b4eec73e","name":"","url":"/testapi","method":"post","upload":false,"swaggerDoc":"","x":310,"y":4300,"wires":[["8723b2f7e8358aa3","5ec1677aa77126a8"]]},{"id":"e0b75eb34c2663c2","type":"http response","z":"d1395164b4eec73e","name":"","statusCode":"","headers":{},"x":830,"y":4300,"wires":[]},{"id":"d2c71c18a324958c","type":"http request","z":"d1395164b4eec73e","name":"","method":"POST","ret":"txt","paytoqs":"body","url":"","tls":"","persist":false,"proxy":"","insecureHTTPParser":false,"authType":"","senderr":false,"headers":[],"x":850,"y":4140,"wires":[["bae4bab624601f95"]]},{"id":"8723b2f7e8358aa3","type":"delay","z":"d1395164b4eec73e","name":"","pauseType":"random","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"10","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":520,"y":4300,"wires":[["117cca4438303b40"]]},{"id":"8781925b233eebe1","type":"change","z":"d1395164b4eec73e","name":"","rules":[{"t":"set","p":"url","pt":"msg","to":"http://192.168.1.10:1880/testapi","tot":"str"},{"t":"set","p":"payload.value","pt":"msg","to":"100","tot":"str"},{"t":"set","p":"payload.id","pt":"msg","to":"id","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":620,"y":4100,"wires":[["d2c71c18a324958c"]]},{"id":"117cca4438303b40","type":"change","z":"d1395164b4eec73e","name":"","rules":[{"t":"set","p":"payload","pt":"msg","to":"$$.payload.value & \" response\" & $$.payload.id","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":680,"y":4300,"wires":[["e0b75eb34c2663c2"]]},{"id":"b3f0b99201ea26c1","type":"change","z":"d1395164b4eec73e","name":"","rules":[{"t":"set","p":"url","pt":"msg","to":"http://192.168.1.10:1880/testapi","tot":"str"},{"t":"set","p":"payload.value","pt":"msg","to":"200","tot":"str"},{"t":"set","p":"payload.id","pt":"msg","to":"id","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":620,"y":4140,"wires":[["d2c71c18a324958c"]]},{"id":"1de7b805ac06c5de","type":"change","z":"d1395164b4eec73e","name":"","rules":[{"t":"set","p":"url","pt":"msg","to":"http://192.168.1.10:1880/testapi","tot":"str"},{"t":"set","p":"payload.value","pt":"msg","to":"300","tot":"str"},{"t":"set","p":"payload.id","pt":"msg","to":"id","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":620,"y":4180,"wires":[["d2c71c18a324958c"]]},{"id":"bae4bab624601f95","type":"join","z":"d1395164b4eec73e","name":"","mode":"auto","build":"object","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","accumulate":true,"timeout":"","count":"","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":1010,"y":4140,"wires":[["71884565f1d58e25"]]}]

Not sure why you did all of that?

It can be done with zero custom functions - 100% "no-code"

chrome_2ZQCsY02Uo

Demo flow

[{"id":"3a47873a9b684ff8","type":"http in","z":"13be7647f0632f2f","name":"","url":"/slow","method":"get","upload":false,"swaggerDoc":"","x":520,"y":180,"wires":[["613abffde0fe0c8c"]]},{"id":"613abffde0fe0c8c","type":"delay","z":"13be7647f0632f2f","name":"","pauseType":"delay","timeout":"3.6","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":700,"y":180,"wires":[["a84f089e0e6bc766"]]},{"id":"b594f6d9afba4851","type":"http response","z":"13be7647f0632f2f","name":"","statusCode":"","headers":{},"x":1070,"y":180,"wires":[]},{"id":"a84f089e0e6bc766","type":"change","z":"13be7647f0632f2f","name":"","rules":[{"t":"set","p":"payload","pt":"msg","to":"{\t    \"name\": req.url,\t    \"time\": $now()\t}","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":900,"y":180,"wires":[["b594f6d9afba4851"]]},{"id":"1a62ec505b5573f9","type":"http request","z":"13be7647f0632f2f","name":"/slow","method":"GET","ret":"obj","paytoqs":"ignore","url":"http://localhost:1880/slow","tls":"","persist":false,"proxy":"","insecureHTTPParser":false,"authType":"","senderr":false,"headers":[],"x":890,"y":420,"wires":[["10bba1c78e57013f","b2090fc4d45a02f7"]]},{"id":"10bba1c78e57013f","type":"debug","z":"13be7647f0632f2f","name":"debug 168","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":1090,"y":420,"wires":[]},{"id":"a8439ed6b3959a95","type":"delay","z":"13be7647f0632f2f","name":"","pauseType":"delay","timeout":"2.4","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":700,"y":240,"wires":[["9b4b710d75d873ed"]]},{"id":"1e51695499e5de37","type":"http in","z":"13be7647f0632f2f","name":"","url":"/medium","method":"get","upload":false,"swaggerDoc":"","x":530,"y":240,"wires":[["a8439ed6b3959a95"]]},{"id":"5f13e96029cf9ad2","type":"http response","z":"13be7647f0632f2f","name":"","statusCode":"","headers":{},"x":1070,"y":240,"wires":[]},{"id":"9b1f14c8ddee80c9","type":"delay","z":"13be7647f0632f2f","name":"","pauseType":"delay","timeout":"1.2","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":700,"y":300,"wires":[["0c10ed97c196c345"]]},{"id":"d0f55284cf13bc2d","type":"http in","z":"13be7647f0632f2f","name":"","url":"/fast","method":"get","upload":false,"swaggerDoc":"","x":520,"y":300,"wires":[["9b1f14c8ddee80c9"]]},{"id":"3f036007dac8e530","type":"http response","z":"13be7647f0632f2f","name":"","statusCode":"","headers":{},"x":1070,"y":300,"wires":[]},{"id":"9b4b710d75d873ed","type":"change","z":"13be7647f0632f2f","name":"","rules":[{"t":"set","p":"payload","pt":"msg","to":"{\t    \"name\": req.url,\t    \"time\": $now()\t}","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":900,"y":240,"wires":[["5f13e96029cf9ad2"]]},{"id":"0c10ed97c196c345","type":"change","z":"13be7647f0632f2f","name":"","rules":[{"t":"set","p":"payload","pt":"msg","to":"{\t    \"name\": req.url,\t    \"time\": $now()\t}","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":900,"y":300,"wires":[["3f036007dac8e530"]]},{"id":"a9b663fdf1718981","type":"http request","z":"13be7647f0632f2f","name":"/medium","method":"GET","ret":"obj","paytoqs":"ignore","url":"http://localhost:1880/medium","tls":"","persist":false,"proxy":"","insecureHTTPParser":false,"authType":"","senderr":false,"headers":[],"x":880,"y":480,"wires":[["9e9446b76f549dcf","b2090fc4d45a02f7"]]},{"id":"9e9446b76f549dcf","type":"debug","z":"13be7647f0632f2f","name":"debug 170","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":1090,"y":480,"wires":[]},{"id":"05c684b90a943282","type":"http request","z":"13be7647f0632f2f","name":"/fast","method":"GET","ret":"obj","paytoqs":"ignore","url":"http://localhost:1880/fast","tls":"","persist":false,"proxy":"","insecureHTTPParser":false,"authType":"","senderr":false,"headers":[],"x":890,"y":540,"wires":[["1d0efd195cc4cd41","b2090fc4d45a02f7"]]},{"id":"1d0efd195cc4cd41","type":"debug","z":"13be7647f0632f2f","name":"debug 171","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":1090,"y":540,"wires":[]},{"id":"b2090fc4d45a02f7","type":"join","z":"13be7647f0632f2f","name":"","mode":"custom","build":"object","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","accumulate":false,"timeout":"","count":"3","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":1050,"y":600,"wires":[["18770a094e347d62"]]},{"id":"f797a9890af4b10a","type":"change","z":"13be7647f0632f2f","name":"topic=slow","rules":[{"t":"set","p":"topic","pt":"msg","to":"slow","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":710,"y":420,"wires":[["1a62ec505b5573f9"]]},{"id":"e40372b2cc0aa83a","type":"change","z":"13be7647f0632f2f","name":"topic=medium","rules":[{"t":"set","p":"topic","pt":"msg","to":"medium","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":720,"y":480,"wires":[["a9b663fdf1718981"]]},{"id":"5e6cce70fd118327","type":"change","z":"13be7647f0632f2f","name":"topic=fast","rules":[{"t":"set","p":"topic","pt":"msg","to":"fast","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":700,"y":540,"wires":[["05c684b90a943282"]]},{"id":"80f2f3a2c1effd85","type":"inject","z":"13be7647f0632f2f","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"true","payloadType":"bool","x":495,"y":420,"wires":[["f797a9890af4b10a"]],"l":false},{"id":"014282ae834f430d","type":"inject","z":"13be7647f0632f2f","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"true","payloadType":"bool","x":495,"y":480,"wires":[["e40372b2cc0aa83a"]],"l":false},{"id":"74acba4f41d80523","type":"inject","z":"13be7647f0632f2f","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"true","payloadType":"bool","x":495,"y":540,"wires":[["5e6cce70fd118327"]],"l":false},{"id":"12c9a54bcea1e7ad","type":"inject","z":"13be7647f0632f2f","name":"All","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"true","payloadType":"bool","x":530,"y":600,"wires":[["f797a9890af4b10a","e40372b2cc0aa83a","5e6cce70fd118327","62db6a55efb726f0"]]},{"id":"18770a094e347d62","type":"debug","z":"13be7647f0632f2f","name":"Promise All","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":1090,"y":660,"wires":[]},{"id":"62db6a55efb726f0","type":"change","z":"13be7647f0632f2f","name":"","rules":[{"t":"set","p":"reset","pt":"msg","to":"true","tot":"bool"}],"action":"","property":"","from":"","to":"","reg":false,"x":720,"y":600,"wires":[["b2090fc4d45a02f7"]]}]

The issue with this approach is that several fast requests could come in before the first slow request completes. Each outgoing request needs to be tied to the Nodered message that spawned it.

My actual use case here is connecting a Discord server bot up with several LLMs using Ollama and LangChain. When a user types a message (Or speaks a command into a M5Stack Echo that Home Assistant will then type into Discord) Nodered will immediately sends a response in Discord stating that the bot is replying while the LLM processes the request. The LLM may send several requests back and forth to Nodered depending on what it needs to do (ie, gather sensor info from Postgres/Redis). Once the LLM is ready to reply to the user, it'll edit the original message. The LLM can also send requests back to Nodered so that it can communicate with Home Assistant.

I find Nodered by far the easiest way to connect up all these systems.

image
A few seconds later...

You are using it wrong, you should use deferReply then editReply.
When a user types a message, send deferReply and trigger http requests then when all promises are resolved use editReply to respond.

1 Like

deferReply

Oh, that's cool. I didn't know about it! I'll update to use this instead.

It appears that deferReply can only be triggered by commands, not casual messages, since it uses the discordInteraction node: Interaction Examples · Markoudstaal/node-red-contrib-discord-advanced Wiki · GitHub

Oh sorry, I forgot that this is only for interactions.

Can I make up for it by suggesting you send sendTyping to replace the deferReply then respond with send

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.