Long running process, waiting for a response via a message broker

I'm hoping to use node-red to set up some audio processing for a project I am working on. I have a service which takes some audio and performs some processing on it, this can take up to 30 minutes. I was planning to write a simple service and use message broking to emit an update when the job is finished. This is all good but I want my node to "wait" for the message from the message broker telling me the processing is complete before it moves onto the next node.

This is greenfield and I am totally open to ideas, I planned to use either rabbitmq or kafka for the message brokering. I just have no idea on how to write a node to "wait" for a complete message from a broker.

Does anyone have any suggestions?

Welcome to the forum @lebowen

MQTT is the most widely used message broker with node-red. If MQTT will do the job for you then I suggest you go that way. In node-red an MQTT In node will automatically wait for data and when it arrives it passes that on to the next node. If you are not familiar with MQTT then then have a look at this tutorial MQTT Essentials - All Core Concepts explained which will tell you all you need to know.

Mosquitto is a broker commonly used for MQTT, it is free, open source, and easy to install.

Hi Colin,

I am not sure I am clear on how that works. I've used MQTT in other projects.

Can I do an HTTP POST to a service and then have a node that follows it that is an MQTT in node?

I am considering having a simple web service that performs some processing that returns a Job ID. I then want to wait (within the same flow) for a response from MQTT. I don't want the chain of nodes in the flow to be broken as this will complicate my workflow.

With MQTT, you either use an MQTT library on a server (Node.js, Python, etc) or on an embedded microprocessor or you can (if you configure your MQTT broker) use a MQTT over websocket library.

MQTT is a publish and subscribe model so the MQTT-in node simply subscribes to a "topic". Your process will publish to that topic. Similar to an HTTP PUT but using a different protocol.

What you are trying to do is pretty ideal for Node-RED and MQTT. I believe you might even be able send the audio stream over MQTT but I don't really know about that. I would probably do as you are suggesting and simply use MQTT as the broker to tell Node-RED when the process has finished and either tell it the file name, full file URL or just some id. Then your Node-RED flow can listen for the notification and process the output accordingly.

Thanks for the responses. I am still not really clear on how I can dynamically create MQTT in nodes that can listen for the emitted message.

I basically need to form a chain of audio processing nodes that each take the output of the previous node. Each of these nodes could take a long time.

I don't know how I can achieve.....

http in -> Process Audio -> Wait for MQTT msg -> Process Audio (2) -> Wait for MQTT -> finish

Do you really need dynamic mqtt-in? Can't you use a standard topic? Or if you need multiple topics, group them under a top-level. For example mystuff/process1, mystuff/process2 - you can listen for both of these topics using a subscription of mystuff/#.

That is TWO flows not one.

1) http-in -> process audio -> mqtt-out
           |
           +-> http-out (you need this to tell the caller that their request was successful)

2) mqtt-in -> do something else

I think you're right and perhaps I am over complicating it in my head.

I had envisaged an unbroken chain of nodes however I don't think that is feasible, and even if it is, probably way too complicated.

I am going to go down the route that you have suggested.

Thank you for clarifying and for your suggestions.

Yup! :grinning:

KISS principal applies here. Multiple flows are the Node-RED equivalent of modular code. I suppose we should call them flow-fragments - the word "flow" in Node-RED actually applies to several things which can get a bit confusing.

I just want to update this thread for others that may have the same issue in the future.

It turns out the contrib MQTT package DOES support dynamic subscriptions, which will allow me to set up a subscription to wait for an event from MQTT and carry on with the flow.

Here is some more information on dynamic MQTT subscriptions

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.