MQTT as replacement for Project Call nodes

In our Flowfuse environment, we are using one NR instance for flows that can be called from multiple other NR instances. As these instances are not in the same team, it is not possible to use Project nodes to send\return information to that central instance. We've tried HTTP request (auth issues) and TCP request (limited message size) nodes as a substitute, but on recommendation of Flowfuse we are now using MQTT.

This works well for the most part, but as our environment is scaling up issues arise.
The problem we are now running into: we can send information to an MQTT topic which is then received and processed by the central instance. After processing in the central instance we are using another MQTT topic to return the processed information to the "requesting" flow.
As 2 different instances can now call the same flow (using the same topic) and the returns are also using the same topic, it is possible that an instance receives return information that was sent by another flow.

With Project, TCP and HTTP request nodes, information was generated by these nodes that would send a return message to the correct requesting flow. However, with MQTT there is no MQTT request node. So when using MQTT there appears to be no way to maintain one flow as is possible with any of the request nodes.

Is there a way with MQTT to have a return message only be sent to the requesting instance? If so, please let me know what the best way to do this would be.

Our current workaround is using multiple MQTT topics for the same central flow, and set a flag so that the return message can be returned to the correct return topic. But I hope there is a better way to do this as we now have a couple of central flows that each respond to 10+ topics, and send returns to 10+ topics.

Thanks! DenW

Either you play on the topics or you play on the payload. You can attach a property that contains the identifier of the source instance that you join to the message that will be sent in response. This property can be used as a filter.

This is possibly a good use-case for using MQTT v5 properties.

When using v5 of MQTT you can use the user properties and/or response topic for example.

I've not used these myself in anger but I believe they might be useful here.

There are plenty of examples on the web: mqtt v5 user properties at DuckDuckGo

Hi Den. Have a look at the demo flow I sent you a few month back (~June 2024).

It is exactly this (using MQTT + response topic to simulate the Project-Call nodes)

Simple demo

More realistic version

1 Like

Hi @Steve-Mcl ,

Hope you're doing ok?!

Thanks for your reply. I've searched my email but cannot find anything in your emails about MQTT. Not sure what happened, but so sorry I missed this.
Could you please share the flow JSON (either here or by email)?
With your realistic example, I'm not sure why a second message sent at the same time would go to the correct topic\return. But there's probably something in the function nodes for this.

@TotallyInformation : Yes, thank you. Very curious now where your anger comes from, but V5 options appear to be helpful with what we're trying to do. We're setting up a test using V5 options, using Response Topic probably.

But still: the flow remains interrupted (which would not be the case with a request-type node).

I'll post back on how we solved the issue.
Thanks! Den

Sorry, doubtless a translation error. "used in anger" is an English phrase indicating that a process has been used for something useful or important. :grin:

MQTT is a message queue service and so is asynchronous. You wouldn't expect the response to be "real-time" but rather "near real-time". However, this is rarely an issue in most cases and has the advantage that the two ends are "loosely coupled" which is normally a good design feature.

You have not stated if you are self hosting etc and what the hardware is that you are throwing at the problem.

Obviously with NR (and therefore Presumably FlowFuse also) being single threaded - the scalability is either sideways (with more nodes) or by using a faster processor. Your other option is to run multiple instances on different ports which should then use different processor cores and help you scale better (or run multile virtual machines on the same system and assign different cores to each machine to better utilise existing hardware)

Have you also looked at hardcoding machine IP addresses rather than having the overhead of DNS for each request - again not sure how much load profiling you have done at this point in time

Craig

"used in anger" is an English phrase indicating that a process has been used for something useful or important.

Ah, I see. Yes.. I've had quite a few of these useful or important processes myself. :slight_smile:

1 Like

@craigcurtin

Thanks for your input, but we're not having performance\scalability issues,

Our use case for NR\Flowfuse differs from most, in that we are using multiple dashboard instances (each accessed by different people) that trigger similar actions. Those actions are defined in one "central" instance (to prevent code duplication among other reasons). Hence, those central actions need to be able to send a reply to the instance\flow that is calling the action (and only that flow).

While we can think of several improvements to NR\Dashboard\Flowfuse, performance\scalability is not an issue.

I would do this by:

a) define a global unique identifier (GUID) for each flow that is sending/receiving MQTT messages. This GUID would consist of some unique Id such Mac address or CPU serial number. Important is that its unique but reproducible meaning the same instance would always generate the same GUID. This GUID would be generated on flow start and stored in the global context. This implies that GUID would be generated by instances and not by a global instance and then distributed to instances. This allows instances to be independent of the central instance.

b) each MQTT message would include this GUID, a payload would be something like:

{
  guid: global.get('guid'),
  msgtype: 'request',
  command: <some command specifier>,
  parameters: ...
}

a response would include (echo back) the GUID and the instance which made the request in the first place would filter out using the GUID:

{
  guid: <original guid from request>,
  msgtype: 'response',
  data: ...
}

This approach would allow you to scale up instances without modification of the central instance since GUIDs aren't managed centrally. Reproducibility that if an instance goes down and then comes up again, it's still using the same GUID and therefore any data it has to pass on will be associated with the correct instance.

@gregorius
Thanks! That's an interesting approach but for our needs I think it's a bit too elaborate.
And we could be calling the same central flow from multiple flows in the same Dashboard instance, so we'd need to have a flow identifier as well..

@GogoVega
Sorry for the late reply. Yes, that's what we've been doing (adding a key to the received message and send to a return topic based on that message. But we didn't do it in a smart way: we had multiple MQTT In nodes for the same flow, each with its own topic, and a change node after that to define a key that could be used in a switch node right before multiple MQTT Out noes, again each with its own topic..

But @TotallyInformation was right, it's far easier than that, at least with MQTT V5.
Specifying a return topic in the MQTT Out node will add msg.responseTopic to the message. As long as you don't touch that in the flow, the MQTT Out node (configured without specifying anything except the MQTT server) will send the message to the responseTopic.

This means there only needs to be one Request topic which the central flow will get its requests from, and without the need to define multiple MQTT Out topics in the node, a return message can be sent to any topic.
You will get multiple MQTT return topics in your broker of course, but the central NR flow will be far less cluttered compared to what we were doing. And no need to alter the central flow if a new Dashoard flow is used.

Thanks for all your input and getting us on the right track!

I sent the flows as an attachment in our chats on HobSpot - perhaps you can pick it up again from there?

Aaah sorry - came at it from the wrong end

What about using a unique MQTT client ID to help identify them all ?

Craig