Robust join with reset

Struggling a bit to understand how to use reset in join node in a robust manner.

Flow looks like this:

  1. TRIGGER: An input, for example cron-plus node (injects a message at a regular time interval, for example every minute)
  2. PREPARE READ: Generate messages for read requests, a simple function node that spits out x amount of messages.
  3. READ: Some node that fetches data from an external resource, for example bacnet
  4. PREPARE WRITE: Handle the response data and prepare write request
  5. WRITE: Some node that writes data to an external resource, for example http

Now my plan was to add parts to function node (2) and a join node after write node (3). But in case something piles up, join might have incomplete group when next arrives? So I thought about sending a "reset" message first. But how to do that, and make sure it arrives before the normal ones?

Originally all messages were sent individually over http, but sometimes got status code 429 (too many messages). So want to send a single message instead.

As long as parts object has id then the join should only pass the incoming messages with that parts.id.
You can also make the function node send msg.reset before process the other messages it will spit out.
You can also use a switch node to check responses from (3), the switch node can reconstruct the message sequence (msg.parts) for messages or errors from a catch node. which should allow the join to complete in automatic maode

I tried with id, but it seems an incomplete group is emitted if a new group/id arrives?

Can you supply a example flow as each id should not effect another.

Thanks, I was not able to recreate that haha. So now the question is, to avoid incomplete groups piling up, how to guarantee that reset msg arrives before the first message in the new group? Perhaps just declare it first in the array?

Surely you can just make the function send reset in first line. and queue access to the function node and join node..

Here is a demonstration of a function node sending parts, and the joining using an id. It incorporates a error and a catch. You can click the inject multiple times and the joins all succeed.

[{"id":"6404f5eb0d159af9","type":"switch","z":"667cec54c048503c","name":"","property":"payload","propertyType":"msg","rules":[{"t":"neq","v":"2","vt":"num"}],"checkall":"true","repair":false,"outputs":1,"x":490,"y":1840,"wires":[["cf0d1e99ca2363b0"]]},{"id":"9fa43bc9381e9411","type":"inject","z":"667cec54c048503c","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"[1,2,$ceil($random()*10)]","payloadType":"jsonata","x":310,"y":1920,"wires":[["6ed1c1da3d723d36","7d1e5c9416b6bbd3"]]},{"id":"1b3a4fba69963aef","type":"link call","z":"667cec54c048503c","name":"","links":["77e090d12365b86d"],"linkType":"static","timeout":"5","x":620,"y":1940,"wires":[["e4b67e9df4d9b1a7"]]},{"id":"e4b67e9df4d9b1a7","type":"switch","z":"667cec54c048503c","name":"","property":"statusCode","propertyType":"msg","rules":[{"t":"eq","v":"200","vt":"num"}],"checkall":"true","repair":true,"outputs":1,"x":770,"y":1940,"wires":[["e9fd2986a9de861a"]]},{"id":"e9fd2986a9de861a","type":"join","z":"667cec54c048503c","name":"","mode":"auto","build":"object","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","useparts":false,"accumulate":"false","timeout":"","count":"","reduceRight":false,"x":930,"y":1940,"wires":[["7d1e5c9416b6bbd3"]]},{"id":"77e090d12365b86d","type":"link in","z":"667cec54c048503c","name":"link in 22","links":[],"x":365,"y":1840,"wires":[["6404f5eb0d159af9"]]},{"id":"ee6ea42105774050","type":"change","z":"667cec54c048503c","name":"","rules":[{"t":"set","p":"statusCode","pt":"msg","to":"200","tot":"num"}],"action":"","property":"","from":"","to":"","reg":false,"x":880,"y":1840,"wires":[["4ff016361e13828a"]]},{"id":"4ff016361e13828a","type":"link out","z":"667cec54c048503c","name":"link out 11","mode":"return","links":[],"x":1025,"y":1840,"wires":[]},{"id":"6ed1c1da3d723d36","type":"function","z":"667cec54c048503c","name":"function 159","func":"const id = new Date().getTime();\nconst count = msg.payload.length;\nmsg.payload.forEach((num, index) => {\n    node.send(\n        {\n            payload: num,\n            parts:{\n                id,\n                count,\n                index,\n                type: \"array\"\n            }\n        }\n    )\n});","outputs":1,"timeout":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":470,"y":1940,"wires":[["1b3a4fba69963aef"]]},{"id":"96001ab6515e0e9b","type":"inject","z":"667cec54c048503c","name":"","props":[{"p":"reset","v":"true","vt":"bool"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":850,"y":2000,"wires":[["e9fd2986a9de861a"]]},{"id":"cf0d1e99ca2363b0","type":"delay","z":"667cec54c048503c","name":"","pauseType":"delay","timeout":"2","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":660,"y":1840,"wires":[["ee6ea42105774050"]]},{"id":"4b2c04cc848fe6bd","type":"catch","z":"667cec54c048503c","name":"satch call errors","scope":["1b3a4fba69963aef"],"uncaught":false,"x":560,"y":1980,"wires":[["e4b67e9df4d9b1a7"]]}]

And here is same with a reset and queue.

[{"id":"9fa43bc9381e9411","type":"inject","z":"667cec54c048503c","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"[1,2,$ceil($random()*10)]","payloadType":"jsonata","x":270,"y":1900,"wires":[["93c5aefbf6af3ed0","7d1e5c9416b6bbd3"]]},{"id":"93c5aefbf6af3ed0","type":"delay","z":"667cec54c048503c","name":"","pauseType":"rate","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"20","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":320,"y":1960,"wires":[["6ed1c1da3d723d36"]]},{"id":"7d1e5c9416b6bbd3","type":"debug","z":"667cec54c048503c","name":"debug 2579","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":990,"y":1900,"wires":[]},{"id":"dfe43d409cd9b434","type":"function","z":"667cec54c048503c","name":"function 160","func":"\nreturn {flush: 1}","outputs":1,"timeout":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":630,"y":2140,"wires":[["93c5aefbf6af3ed0"]]},{"id":"6ed1c1da3d723d36","type":"function","z":"667cec54c048503c","name":"function 159","func":"node.send({reset:true});\nconst id = new Date().getTime();\nconst count = msg.payload.length;\nmsg.payload.forEach((num, index) => {\n    node.send(\n        {\n            payload: num,\n            parts:{\n                id,\n                count,\n                index,\n                type: \"array\"\n            }\n        }\n    )\n});","outputs":1,"timeout":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":490,"y":1960,"wires":[["1b3a4fba69963aef"]]},{"id":"e9fd2986a9de861a","type":"join","z":"667cec54c048503c","name":"","mode":"auto","build":"object","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","useparts":false,"accumulate":"false","timeout":"","count":"","reduceRight":false,"x":930,"y":1960,"wires":[["7d1e5c9416b6bbd3","dfe43d409cd9b434"]]},{"id":"1b3a4fba69963aef","type":"link call","z":"667cec54c048503c","name":"","links":["77e090d12365b86d"],"linkType":"static","timeout":"5","x":640,"y":1960,"wires":[["e4b67e9df4d9b1a7"]]},{"id":"e4b67e9df4d9b1a7","type":"switch","z":"667cec54c048503c","name":"","property":"statusCode","propertyType":"msg","rules":[{"t":"eq","v":"200","vt":"num"}],"checkall":"true","repair":true,"outputs":1,"x":790,"y":1960,"wires":[["e9fd2986a9de861a"]]},{"id":"96001ab6515e0e9b","type":"inject","z":"667cec54c048503c","name":"","props":[{"p":"reset","v":"true","vt":"bool"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":790,"y":2020,"wires":[["e9fd2986a9de861a"]]},{"id":"4b2c04cc848fe6bd","type":"catch","z":"667cec54c048503c","name":"satch call errors","scope":["1b3a4fba69963aef"],"uncaught":false,"x":600,"y":2000,"wires":[["e4b67e9df4d9b1a7"]]}]
1 Like

In your 2nd example there are 2 inject nodes, but no synchronization between them. How can I make sure to send the inject BEFORE the group messages?

Edit: Oh, you send it from the function node. Into the READ node. I can't do that. Perhaps I could send it to a 2nd output, bypassing the whole READ operation. I just wasn't sure if it would always arrive first.

Thanks!