Making flows asynchronous by default

In Node-RED 1.0 we're changing the message passing between nodes to be fully asynchronous.

To find out what that means and how it might affect your flows, read this new blog post:

We'll have a new beta of 1.0 next week which has this change in.

10 Likes

That is a brilliantly written post if I may say so, making a complex subject reasonably easy to understand, with the help of simple but clear animations.

6 Likes

I'll be looking forward to trying it. Initially I incorrectly assumed node-red was asynchronous by default due to the underlying use of nodejs.

Here is a simple flow used on a PiZeroW for a color changing display with RGB LEDs. I suspect asynchronous will make it "smoother".

[{"id":"eeaedf65.c07cc","type":"inject","z":"b3610d6f.9372","name":"Tick","topic":"Tick","payload":"1","payloadType":"num","repeat":"0.5","crontab":"","once":false,"onceDelay":0.1,"x":110,"y":240,"wires":[["250f496b.672aa6"]]},{"id":"250f496b.672aa6","type":"function","z":"b3610d6f.9372","name":"Red Ramp","func":"var redCount = context.get('redCount')||0;\nvar countDown = context.get('countDown')||0;\nvar armed = context.get('armed')||0;\nvar garageDoor = context.get('garageDoor')||0;\nvar tick = context.get('tick')||0;\nvar flash = context.get('flash')||0;\nvar newMsg={ payload : \"\", topic : \"\", };\n\nif(msg.topic === 'Tick'){\n    tick = tick+1;\n    context.set('tick',tick);\n    if (tick%2 === 0){\n        if(flash === 0)\n            context.set('flash',1);\n        else\n            context.set('flash',0);\n    }\n\n    if(armed === 0 && garageDoor === 0){\n        newMsg.payload=0;  // green will be flashing\n        return [ msg, newMsg ];\n    }\n    if(garageDoor === 1){\n        if(flash)\n            newMsg.payload=100;\n        else\n            newMsg.payload=10;\n        return [ msg, newMsg ];\n    }else if(tick%7 === 0){\n        if (countDown === 0){\n            context.set('redCount', redCount+1);\n            redCount=context.get('redCount');\n            if(redCount>=50){\n                context.set('countDown',1);\n            }\n        }else{\n            context.set('redCount', redCount-1);\n            redCount=context.get('redCount');\n            if(redCount<=0){\n                context.set('countDown',0);\n            }\n        }\n        newMsg.payload=redCount;\n        return [ msg, newMsg ];\n    }\n    //return  [msg, null ]\n}\n\nif(msg.topic === 'Alarm/LOREX_MODE'){\n    if(msg.payload.indexOf(\"Idle\") === 0){\n        context.set('armed',0);    \n    }else{\n        context.set('armed',1);\n    }\n    return [ msg, null ];\n}\nif(msg.topic === 'Alarm/GarageDoor'){\n    if(msg.payload.indexOf(\"Open\") === 0){\n        context.set('garageDoor',1);    \n    }else{\n        context.set('garageDoor',0);\n    }\n    return [ msg, null ];\n}\nreturn [ msg, null ];","outputs":2,"noerr":0,"x":310,"y":240,"wires":[["872b75c3.4e2718"],["d78627f.e3224d8"]]},{"id":"872b75c3.4e2718","type":"function","z":"b3610d6f.9372","name":"Green Ramp","func":"var greenCount = context.get('greenCount')||0;\nvar gcountDown = context.get('gcountDown')||0;\nvar armed = context.get('armed')||0;\nvar garageDoor = context.get('garageDoor')||0;\nvar tick = context.get('tick')||0;\nvar flash = context.get('flash')||0;\nvar newMsg={ payload : \"\", topic : \"\", };\n\nif(msg.topic === 'Tick'){\n    tick = tick+1;\n    context.set('tick',tick);\n    if (tick%2 === 0){\n        if(flash === 0)\n            context.set('flash',1);\n        else\n            context.set('flash',0);\n    }\n    if(garageDoor === 1){\n        newMsg.payload=0;   // red will be flashing\n        return [ msg, newMsg ];\n    }\n    if(armed === 0){\n        if(flash)\n            newMsg.payload=100;\n        else\n            newMsg.payload=10;\n        return [ msg, newMsg ];\n    }else{\n        if (tick%13 === 0){\n            if (gcountDown === 0){\n                context.set('greenCount', greenCount+1);\n                greenCount=context.get('greenCount');\n                if(greenCount>=50){\n                    context.set('gcountDown',1);\n                }\n            }else{\n                context.set('greenCount', greenCount-1);\n                greenCount=context.get('greenCount');\n                if(greenCount<=0){\n                    context.set('gcountDown',0);\n                }\n            }\n            newMsg.payload=greenCount;\n            return [ msg, newMsg ];\n        }\n    }\n}\n\nif(msg.topic === 'Alarm/LOREX_MODE'){\n    if(msg.payload.indexOf(\"Idle\") === 0){\n        context.set('armed',0);    \n    }else{\n        context.set('armed',1);\n    }\n    return [ msg, null ];\n}\nif(msg.topic === 'Alarm/GarageDoor'){\n    if(msg.payload.indexOf(\"Open\") === 0){\n        context.set('garageDoor',1);    \n    }else{\n        context.set('garageDoor',0);\n    }\n    return [ msg, null ];\n}\nreturn [ msg, null ];\n","outputs":2,"noerr":0,"x":490,"y":200,"wires":[["17902703.3c2779"],["711bf7d6.884dd8"]]},{"id":"d78627f.e3224d8","type":"rpi-gpio out","z":"b3610d6f.9372","name":"Red LED 40","pin":"40","set":"","level":"0","freq":"100","out":"pwm","x":490,"y":260,"wires":[]},{"id":"17902703.3c2779","type":"function","z":"b3610d6f.9372","name":"Blue Ramp","func":"var blueCount = context.get('blueCount')||0;\nvar bcountDown = context.get('bcountDown')||0;\nvar armed = context.get('armed')||0;\nvar garageDoor = context.get('garageDoor')||0;\nvar tick = context.get('tick')||0;\nvar flash = context.get('flash')||0;\nvar newMsg={ payload : \"\", topic : \"\", };\n\nif(msg.topic === 'Tick'){\n    tick = tick+1;\n    context.set('tick',tick);\n    if(armed === 0 || garageDoor === 1){\n        newMsg.payload=0;   // either red or green will be flashin\n        return [ msg, newMsg ];\n    }\n    if (tick%19 === 0){\n            if (bcountDown === 0){\n                context.set('blueCount', blueCount+1);\n                blueCount=context.get('blueCount');\n                if(blueCount>=100){\n                    context.set('bcountDown',1);\n                }\n            }else{\n                context.set('blueCount', blueCount-1);\n                blueCount=context.get('blueCount');\n                if(blueCount<=50){\n                    context.set('bcountDown',0);\n                }\n            }\n            newMsg.payload=blueCount;\n            return [ msg, newMsg ];\n    }\n}\n\nif(msg.topic === 'Alarm/LOREX_MODE'){\n    if(msg.payload.indexOf(\"Idle\") === 0){\n        context.set('armed',0);    \n    }else{\n        context.set('armed',1);\n    }\n    return [ msg, null ];\n}\n\nif(msg.topic === 'Alarm/GarageDoor'){\n    if(msg.payload.indexOf(\"Open\") === 0){\n        context.set('garageDoor',1);    \n    }else{\n        context.set('garageDoor',0);\n    }\n    return [ msg, null ];\n}\nreturn [ msg, null ];","outputs":2,"noerr":0,"x":670,"y":160,"wires":[[],["42c62c42.d66bc4"]]},{"id":"711bf7d6.884dd8","type":"rpi-gpio out","z":"b3610d6f.9372","name":"Green LED 38","pin":"38","set":"","level":"0","freq":"100","out":"pwm","x":680,"y":220,"wires":[]},{"id":"42c62c42.d66bc4","type":"rpi-gpio out","z":"b3610d6f.9372","name":"Blue LED 36","pin":"36","set":"","level":"0","freq":"100","out":"pwm","x":850,"y":180,"wires":[]}]

I look forward to trying!

Looking forward to this as well. I may have to make some adjustments to my current project but making things more responsive by going async (not blocking on a chain of events) will be great.

Interesting. And well described.

I'm just curiously wondering, why now and not from start? Was there a change in node.js that suddenly enables this? Or was it something that was not thought upon in the original NR architecture design?

Anyway, makes sense I think,

Now thinking if my flows will be affected or not. Have I "by mistake & not knowing" some parts of my flows that benefits from and works just because it is synchronous? And if so, how do I find & fix those parts? The runtimeSyncDelivery "lifeline" will help me to keep the thing running but if I really would like to update the flows correctly, how? Would it be possible to run a script of some kind, checking a flow and identifying where things would break? A separate inconsistency tool...

great to hear this is changed.. thanks for sharing the information.
wonder now that given there will be no guarantee a previous connected node has finished processes , what will be a mechanism to ensure msg data is complete prior to processing in the next node ? or would we need to include some checks before / wrap everything differently and we receive a msg "promise" of some sort ?
Thanks

Probably I am the only one who find async flows to bring more complications as compared to benefit it brings.
I understand sometimes programmers want things to go parallely but most of the time with async flows (e.g. http) my next action is based on response from previous node. Sync flows make it super easy for noobs like me to understand and debug flows easily. now i would be fighting to make the flows sync again (callback within callbacks) instead of focusing on flows.
Please consider providing option to keep flow sync with 1.0. I see there is a flag to get sync behavior but comments ask to consider that temporary/deprecated.

Can you share an example of a flow that you will be fighting to make sync again?

If you have a single flow of nodes, one after the other, then the async change won't change anything about the behaviour of the flow. An http request node still passes its result to the next node in the flow.

The main place where the async change can be observed is where a flow branches - and once a flow branches, they are entirely independent branches and there is no link between how messages pass down them.

Oh.. then maybe I misunderstood. I thought all IO's will be async (like HTTP etc) just like in regular javascript.
As long as a node will send message to next one synchronously even for IO nodes, then it would work for me.
Mea culpa. Thanks for clarification.

First of all: I am a supporter of asynchronous flows. But today I found a problem with one of my flows.

Maybe this will help others too.

I have a scenario where I have several nodes behind each other. In a first node, an object loaded from a context and then it is enriched with data from an incoming message and forwarded to the next nodes. The last node of the flow then stores the object back to the context.

With syncronous flows you have in the enriched object the information of all messages:
https://nodered.org/blog/content/images/2019/08/flow1.svg

In asyncronic flows this is not the case. Then in the enriched Object the information of some messages will get lost when they arrive shortly one after another, because several enriched objects are then created at the same time where only the last is stored in the context:
https://nodered.org/blog/content/images/2019/08/flow1-async.svg

I solved the problem for me by implementing everything in a function node. Maybe I can think of a more elegant solution.

If possible removing the use of flow/global context would remove the problem.

1 Like

Well that sounds an awfully lot like a race condition... Would this cause issues in a SINGLE node, like a function that loads, manipulates, and then saves back a global context variable? I think the answer is no unless you were doing additional async work WITHIN that node somehow (can you await inside a function node?), but I figured I should ask since a good portion of my state management is like that.

1 Like

Valid for a SINGLE node:
If your function node's code is strictly synchronous, then you are safe.
If you are doing async operations (callbacks, async/await), NodeJS' eventloop will process other code in the meantime, so these race conditions can happen.

1 Like

I get this is the new MO... just having trouble even conceiving all the places this will be an issue. This basically takes you from assurances that a single FLOW will run to completion on the thread first to having to think about concurrency BETWEEN flows for individual nodes then? I tend to do most of my stuff in function nodes so I think MOST of my stuff will be ok, but for people who are using a lot of chaining through various nodes for transformations, I'd expect flows like @Hypnos to be relatively common...

Explaining to people why their flows do weird things with hidden race conditions might become... interesting. I get some of the reasons why you want to do this, but without the benefits of locks to control flow execution entry, I'm kinda concerned that race conditions are gonna be pretty common issues to deal with for people now...

The sort of flow in question is where per-message state is being held in context rather than in the message where it should be. In my experience, they are not so comment.

Putting per-message state in context is an anti-pattern we have consistently warned against whenever it gets suggested.

It would only work today if every node in the flow is entire synchronous in its behaviour. And if that's the case, there really is no good reason to be putting per-message state in context rather than on the message itself. In that scenario, if context must be used, then it can use msg._msgid as a unique key into context.

This type of discussion is exactly why we're talking about it now and not after we release 1.0. If there are other features we need to look at to help with these scenarios then we should get them into the roadmap.

1 Like

Agreed, but only up to a point. The real world is synchronous -- events actually occur in a particular order. If I push buttons on a dashboard in a specific order, I would like to be certain that the events they trigger occur in that order. If I understand correctly, the only way I will be able to assure this is if I process each message in a single function node. Much of the advantage of wiring together a sequence of core nodes, each "doing a single thing well", could be lost.

Avoiding global variables is part of my religion, but sometimes they work well to maintain a record of the system state. I'm afraid that @Hypnos' example suggests that they may not be reliable in the future.

I agree with Nick here. I try to keep everything that is related to the current flow execution on the message that triggered it.

As soon as you have to keep a state across multiple executions of a flow and use the context, you have to know what you are doing, and be aware of the consequences.

Even with the current version <1.0.0 you can run into these cases as soon as your flow branches, or performing some async stuff in your functions by using callbacks and node.send().

So even today, I never make any assumption of the order in which the nodes are executed in a flow.

When have to access and modify something that is stored in the context, I always use the function node (with purely synchronous code) so this operation is guaranteed to be atomic.

2 Likes

Exactly right, but if you avoid these things, you can be sure that a message will be processed through a flow to completion before the next message is handled. In 1.0.0 that won't be the case.

You cannot be sure that every node in your flow is synchronous in its internal implementation.

For example, the Change and Switch nodes became async nodes if they use JSONata a few releases ago.

But ultimately, this is why we're making this change in a major version bump and why we're highlighting it.