Waiting for parallel processes to complete in order?

I have run into a bit of a problem where I'm trying to wait on two things to complete before I move to the next step in a sequential process. Ordinarily a join node would work, with both inputs fed in and set the node to trigger once a set number of inputs are received.

My problem is that one of the inputs is an MQTT topic, which is published to multiple times in the same flow. That means that only the first join node in the sequence works as intended.

What's the best way to implement this so that it works as planned? Is there a way to wait for one input first to then poll for the MQTT packet?

Example flow:

[{"id":"809f81b8.4f989","type":"link in","z":"4ffa72da.fb000c","name":"Start trigger","links":[],"x":410,"y":1120,"wires":[["f190634d.26ab7"]],"l":true},{"id":"f190634d.26ab7","type":"change","z":"4ffa72da.fb000c","name":"","rules":[{"t":"set","p":"payload","pt":"msg","to":"collection_zPos","tot":"flow"}],"action":"","property":"","from":"","to":"","reg":false,"x":600,"y":1120,"wires":[["bc14c79a.7a26e8","5acdad32.a1f1e4"]]},{"id":"bc14c79a.7a26e8","type":"mqtt out","z":"4ffa72da.fb000c","name":"","topic":"MQTT/action","qos":"","retain":"","broker":"4d04db7.06a6024","x":810,"y":1120,"wires":[]},{"id":"7e275467.c9855c","type":"mqtt in","z":"4ffa72da.fb000c","name":"","topic":"MQTT/action/complete","qos":"2","datatype":"auto","broker":"4d04db7.06a6024","x":580,"y":1180,"wires":[["5acdad32.a1f1e4"]]},{"id":"5acdad32.a1f1e4","type":"join","z":"4ffa72da.fb000c","name":"","mode":"custom","build":"merged","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","accumulate":true,"timeout":"","count":"2","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":790,"y":1180,"wires":[["91f9a5c.e1c3358","df79de77.90f6e"]]},{"id":"91f9a5c.e1c3358","type":"delay","z":"4ffa72da.fb000c","name":"","pauseType":"delay","timeout":"0.5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"x":950,"y":1240,"wires":[["e093f84d.6b3008"]]},{"id":"e093f84d.6b3008","type":"change","z":"4ffa72da.fb000c","name":"","rules":[{"t":"set","p":"payload","pt":"msg","to":"collection_xPos","tot":"flow"}],"action":"","property":"","from":"","to":"","reg":false,"x":1130,"y":1240,"wires":[["eb2d1c08.b90a5","4a52cb3.b8f3f34"]]},{"id":"eb2d1c08.b90a5","type":"join","z":"4ffa72da.fb000c","name":"","mode":"custom","build":"string","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","accumulate":false,"timeout":"","count":"2","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"num","reduceFixup":"","x":1330,"y":1300,"wires":[["40ed58ea.3213b8"]]},{"id":"df79de77.90f6e","type":"link out","z":"4ffa72da.fb000c","name":"Internal action","links":[],"x":960,"y":1180,"wires":[],"l":true},{"id":"40ed58ea.3213b8","type":"link out","z":"4ffa72da.fb000c","name":"process complete","links":[],"x":1510,"y":1300,"wires":[],"l":true},{"id":"4a52cb3.b8f3f34","type":"mqtt out","z":"4ffa72da.fb000c","name":"","topic":"MQTT/action","qos":"","retain":"","broker":"4d04db7.06a6024","x":1350,"y":1240,"wires":[]},{"id":"eeb11b10.896968","type":"mqtt in","z":"4ffa72da.fb000c","name":"","topic":"MQTT/action/complete","qos":"2","datatype":"auto","broker":"4d04db7.06a6024","x":1120,"y":1300,"wires":[["eb2d1c08.b90a5"]]},{"id":"4d04db7.06a6024","type":"mqtt-broker","name":"Mosquitto localhost","broker":"localhost","port":"1883","clientid":"","usetls":false,"compatmode":false,"keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","closeTopic":"","closeQos":"0","closePayload":"","willTopic":"","willQos":"0","willPayload":""}]

Have you tried using the complete node?

I'd not noticed this node before, but I realise it presents a similar problem. Unless you mean to use it in a different way...

Right now the complete nodes further on in the process will be half complete long before the action they have to wait on has been started...

I don't understand why you are having the same message arrive via two MQTT nodes?
Why not have only one MQTT node - if the data is going to change and you need the original data, use a change node to copy msg.payload to msg.original_payload. Then at the point of the second join, you will have both items of data and you can merge them before sending them on.

You could use it like this.

[{"id":"7e3a17af.07e5","type":"inject","z":"5a245aa1.510164","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":140,"y":1680,"wires":[["3c9b9fd3.48334","7b6e94e5.ad70bc"]]},{"id":"3c9b9fd3.48334","type":"delay","z":"5a245aa1.510164","name":"","pauseType":"delay","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"x":410,"y":1680,"wires":[["4e0018ca.fe4be"]]},{"id":"7b6e94e5.ad70bc","type":"change","z":"5a245aa1.510164","name":"function  2","rules":[{"t":"set","p":"payload","pt":"msg","to":"function complete","tot":"str"},{"t":"set","p":"topic","pt":"msg","to":"function2","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":320,"y":1740,"wires":[["566148e.f07e238"]]},{"id":"4e0018ca.fe4be","type":"change","z":"5a245aa1.510164","name":"function 1","rules":[{"t":"set","p":"payload","pt":"msg","to":"function complete","tot":"str"},{"t":"set","p":"topic","pt":"msg","to":"function1","tot":"str"},{"t":"set","p":"additional","pt":"msg","to":"more info from other msg properties","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":590,"y":1680,"wires":[[]]},{"id":"566148e.f07e238","type":"join","z":"5a245aa1.510164","name":"","mode":"custom","build":"object","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","accumulate":true,"timeout":"","count":"","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":530,"y":1740,"wires":[["3f35d274.99189e"]]},{"id":"3f35d274.99189e","type":"debug","z":"5a245aa1.510164","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":610,"y":1800,"wires":[]},{"id":"afa19114.546f08","type":"change","z":"5a245aa1.510164","name":"","rules":[{"t":"set","p":"complete","pt":"msg","to":"true","tot":"bool"}],"action":"","property":"","from":"","to":"","reg":false,"x":330,"y":1840,"wires":[["566148e.f07e238"]]},{"id":"a7cd3bfe.6ccbd","type":"complete","z":"5a245aa1.510164","name":"","scope":["4e0018ca.fe4be"],"uncaught":false,"x":120,"y":1840,"wires":[["afa19114.546f08"]]}]

Or you could try to use msg.complete in your flow rather than msg count.

I'm running a python process which operates a stepper driver in the background. Commands are sent to that process to move the stepper, and once the move has complete then "stepper/moveComplete" MQTT topic is published.

So after the first move, the topic is published to all listening nodes. I'm trying to find a way that the MQTT nodes only become relevant once the move that is important to them has completed.

In a standard programming language this would be trivial, so this is what I'm trying to emulate;

newPosition = 1024;

// issue a move command
moveMotor(newPosition);

// poll for a completed move
while(!moveCompleteFlag)
{
    // blocks until the move is done
}
// clear the flag (can this be done in Node Red?)
moveCompleteFlag = false;

// repeat as many times as you like
newPositon = 100;

Basically I need the output of a particular stepper/moveComplete subscription to only become relevant once a specific node before it has been completed, i.e. the move command.

You could use a node-red-contrib-simple-gate and feed the two streams into it. Configure it with the control topic MQTT/action/complete and use that to open/close the gate. Then it will pass messages when the action is complete and stop passing them when it becomes not complete again.

This topic was automatically closed 60 days after the last reply. New replies are no longer allowed.