Is there a way to force MQTT nodes to re-subscribe from the flow?

Let's consider the following:

[{"id":"f484ccd9.7480e","type":"mqtt in","z":"ced90757.b4e1e8","name":"","topic":"vDevice/State","qos":"2","datatype":"auto","broker":"d0a0bfc5.b4176","x":110,"y":220,"wires":[["7d4cd0fc.00e96"]]},{"id":"c14e8beb.ee34e8","type":"mqtt in","z":"ced90757.b4e1e8","name":"","topic":"vDevice/Telemetrics","qos":"2","datatype":"auto","broker":"d0a0bfc5.b4176","x":130,"y":280,"wires":[["3c727614.33e56a"]]},{"id":"3c727614.33e56a","type":"switch","z":"ced90757.b4e1e8","name":"","property":"retain","propertyType":"msg","rules":[{"t":"false"},{"t":"true"}],"checkall":"true","repair":false,"outputs":2,"x":330,"y":280,"wires":[["b19f805b.83a39"],["3c81bd84.3dfc12"]]},{"id":"3c81bd84.3dfc12","type":"change","z":"ced90757.b4e1e8","name":"","rules":[{"t":"set","p":"payload","pt":"msg","to":"\"(\" & msg.payload & \")\"","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":520,"y":300,"wires":[["b19f805b.83a39"]]},{"id":"7d4cd0fc.00e96","type":"switch","z":"ced90757.b4e1e8","name":"","property":"retain","propertyType":"msg","rules":[{"t":"false"},{"t":"true"}],"checkall":"true","repair":false,"outputs":2,"x":330,"y":220,"wires":[["753bff47.3299f"],["526cdae4.842964"]]},{"id":"526cdae4.842964","type":"change","z":"ced90757.b4e1e8","name":"","rules":[{"t":"set","p":"payload","pt":"msg","to":"\"(\" & msg.payload & \")\"","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":520,"y":240,"wires":[["753bff47.3299f"]]},{"id":"b24f40e8.e7898","type":"mqtt in","z":"ced90757.b4e1e8","name":"","topic":"vDevice/NetworkState","qos":"2","datatype":"auto","broker":"d0a0bfc5.b4176","x":140,"y":120,"wires":[["92b368e5.8e5238"]]},{"id":"92b368e5.8e5238","type":"switch","z":"ced90757.b4e1e8","name":"","property":"retain","propertyType":"msg","rules":[{"t":"false"},{"t":"true"}],"checkall":"true","repair":false,"outputs":2,"x":330,"y":120,"wires":[["e93e9089.cd42c"],["3f56ee40.1655e2"]]},{"id":"3f56ee40.1655e2","type":"change","z":"ced90757.b4e1e8","name":"","rules":[{"t":"set","p":"payload","pt":"msg","to":"\"(\" & msg.payload & \")\"","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":520,"y":140,"wires":[["e93e9089.cd42c"]]},{"id":"e93e9089.cd42c","type":"debug","z":"ced90757.b4e1e8","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","x":730,"y":120,"wires":[]},{"id":"753bff47.3299f","type":"debug","z":"ced90757.b4e1e8","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","x":730,"y":220,"wires":[]},{"id":"b19f805b.83a39","type":"debug","z":"ced90757.b4e1e8","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","x":730,"y":280,"wires":[]},{"id":"d0a0bfc5.b4176","type":"mqtt-broker","z":"","name":"localhost MQTT","broker":"localhosta","port":"1883","clientid":"","usetls":false,"compatmode":false,"keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","closeTopic":"","closeQos":"0","closePayload":"","willTopic":"","willQos":"0","willPayload":""}]

Each flow reads a certain value from MQTT and passes it forward either:

  • intact,
  • or if the message had msg.retain set, it marks the payload as last-known-state (in the example: by enclosing it in parentheses).

Now, I would like to add logic that: if the device goes offline (the value of vDevice/NetworkState changes to offline), then the remaining values are "degraded" to last-known-state.

If I could force the MQTT-in nodes to re-subscribe, then they would get the values from MQTT with msg.retain flag set (automatically, by design) and pass the message according to existing logic.

Is it doable?


Or should I store the value of vDevice/NetworkState in context and modify each flow to consider messages matching (retained: true or flow.networkstate: offline) as last-known-state?

1 Like

I’d say your last mentioned option is the best here, for exactly that reason. On top of that it’s also clear and easily maintainable, rather to somewhat obscure where you don’t know for sure whether at any point it has a subscription going. I’d say to keep the cat in the box where it is, but Schrödinger’s MQTT wouldn’t be my choice :wink:
Though maybe others think otherwise :slight_smile:

2 Likes

When all is operating normally do the State and Telemetrics values come in from MQTT on a regular basis? If so then you can use a Trigger node to achieve what you want by making it send an appropriate message if no new data is received for a certain time.

Otherwise I would solve this by using a Join node in the mode described in the cookbook example on joining streams to join the NetworkState in with the State, and the Telemetrics. Then you can access all the data you need in a Function node (or Change node) after the Join and send on the appropriate message each time a new message comes in.

1 Like

But I need to add a second message emitter into each node as change in the context value (to offline) wouldn't cause bottom flows to reevaluate.

Causing MQTT-in nodes to re-subscribe already deals with that.

On the other hand, it would be a cookie-cutter change to each flow, so not that bad.

Using a Join node instead of context solves that problem.

Yes, but with different periods for each flow. Worse yet, some values would come in with different periods over time. Say, if a device is active it sends values more often.

And unfortunately, trigger node "wait for"-value is set in its properties―there's no way to change it dynamically.


How?

join node needs to collect all values to pass them further, it doesn't remember the last state:

[{"id":"f7f35dce.1bb7f","type":"tab","label":"Flow 8","disabled":false,"info":""},{"id":"92864639.ab94f8","type":"join","z":"f7f35dce.1bb7f","name":"","mode":"custom","build":"object","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","accumulate":false,"timeout":"","count":"","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":570,"y":240,"wires":[["8154828d.37646"]]},{"id":"a1914348.8f37a","type":"inject","z":"f7f35dce.1bb7f","name":"","topic":"value","payload":"1","payloadType":"num","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":150,"y":180,"wires":[["92864639.ab94f8"]]},{"id":"b98a1be9.d3d8f8","type":"inject","z":"f7f35dce.1bb7f","name":"","topic":"state","payload":"offline","payloadType":"str","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":170,"y":260,"wires":[["faaa41c0.e9de9"]]},{"id":"faaa41c0.e9de9","type":"change","z":"f7f35dce.1bb7f","name":"","rules":[{"t":"set","p":"complete","pt":"msg","to":"true","tot":"bool"}],"action":"","property":"","from":"","to":"","reg":false,"x":390,"y":280,"wires":[["92864639.ab94f8"]]},{"id":"8154828d.37646","type":"debug","z":"f7f35dce.1bb7f","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","x":750,"y":240,"wires":[]},{"id":"58516c1f.1599b4","type":"inject","z":"f7f35dce.1bb7f","name":"","topic":"state","payload":"online","payloadType":"str","repeat":"","crontab":"","once":false,"onceDelay":0.1,"x":170,"y":300,"wires":[["faaa41c0.e9de9"]]}]

Assuming the online/offline message sets msg.complete:

  1. the device goes online (inject online)
  2. the device sends a value (inject 1) → a joined message is emitted
  3. the device goes offline → a message is emitted, but the last-known-state value is lost

join node has no purpose in this scenario.

I haven't got any time at your flow at this moment, are you using the Join node in the mode described in the link? If so then every time it receives a message it sends on a message with the new information and the previous values of all the other inputs (as key/value pairs). So it absolutely does remember the previous values of all input streams.

Edit Just send the messages to it direct from mqtt and look at what you get out.

1 Like

Thank you for insisting on your idea.

It does indeed do that with and every subsequent message option checked (which is not in the example on the linked page).

This functionality simplifies not only this, but a whole lot of other flows. Thank you!

2 Likes

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.