Problem with dynamic MQTT subscription: receiving messages from another topic!

image

[{"id":"c60b2dd0aed70159","type":"mqtt out","z":"fec3a58f1cc8836e","name":"","topic":"RGL/221600001/up","qos":"1","retain":"","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"846ddbe9b23fde48","x":670,"y":360,"wires":[]},{"id":"8c191eacea8dd2f2","type":"mqtt in","z":"fec3a58f1cc8836e","name":"dynamic subscription MQTT","topic":"","qos":"1","datatype":"auto-detect","broker":"846ddbe9b23fde48","nl":false,"rap":true,"rh":0,"inputs":1,"x":580,"y":140,"wires":[["03b125f38db1c6f4"]]},{"id":"03b125f38db1c6f4","type":"debug","z":"fec3a58f1cc8836e","name":"dynamic topic","active":true,"tosidebar":true,"console":false,"tostatus":true,"complete":"topic","targetType":"msg","statusVal":"topic","statusType":"auto","x":840,"y":140,"wires":[]},{"id":"d1a42c96ed235cb1","type":"function","z":"fec3a58f1cc8836e","name":"action & topic","func":"msg.action =\"subscribe\"\nmsg.topic = \"RGL/\" + msg.payload + \"/up\"\nnode.status({\"text\":msg.topic})\n return msg","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":340,"y":140,"wires":[["8c191eacea8dd2f2"]]},{"id":"ba6c3d52cef8d2f6","type":"inject","z":"fec3a58f1cc8836e","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":350,"y":430,"wires":[["da099b394c632927"]]},{"id":"0d927d795af07a55","type":"inject","z":"fec3a58f1cc8836e","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":350,"y":360,"wires":[["c60b2dd0aed70159"]]},{"id":"398aca50be6d61c6","type":"inject","z":"fec3a58f1cc8836e","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":"0.5","topic":"","payload":"221600001","payloadType":"num","x":100,"y":120,"wires":[["d1a42c96ed235cb1"]]},{"id":"da099b394c632927","type":"mqtt out","z":"fec3a58f1cc8836e","name":"","topic":"RGL/221600002/up","qos":"1","retain":"","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"846ddbe9b23fde48","x":670,"y":430,"wires":[]},{"id":"819f147d2179740f","type":"inject","z":"fec3a58f1cc8836e","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":"0.5","topic":"","payload":"221600002","payloadType":"num","x":100,"y":210,"wires":[["d1a42c96ed235cb1"]]},{"id":"90fb576b4ef7da9a","type":"comment","z":"fec3a58f1cc8836e","name":"1st device talk","info":"","x":410,"y":390,"wires":[]},{"id":"f45b2e4a6a2d8204","type":"comment","z":"fec3a58f1cc8836e","name":"2nd device talk","info":"","x":420,"y":460,"wires":[]},{"id":"cd0e9e9d4960b35c","type":"comment","z":"fec3a58f1cc8836e","name":"listen 1st device","info":"","x":120,"y":150,"wires":[]},{"id":"c1394791b6bcdd8a","type":"comment","z":"fec3a58f1cc8836e","name":"listen 2nd device","info":"","x":120,"y":240,"wires":[]},{"id":"5daf788c232f5492","type":"mqtt in","z":"fec3a58f1cc8836e","name":"","topic":"RGL/221600001/up","qos":"1","datatype":"auto-detect","broker":"846ddbe9b23fde48","nl":false,"rap":true,"rh":0,"inputs":0,"x":550,"y":230,"wires":[["80dad9e0d6a9878b"]]},{"id":"80dad9e0d6a9878b","type":"debug","z":"fec3a58f1cc8836e","name":"","active":true,"tosidebar":true,"console":false,"tostatus":true,"complete":"topic","targetType":"msg","statusVal":"topic","statusType":"auto","x":820,"y":230,"wires":[]},{"id":"846ddbe9b23fde48","type":"mqtt-broker","name":"","broker":"192.168.0.58","port":"1883","clientid":"","autoConnect":true,"usetls":false,"protocolVersion":"4","keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","birthMsg":{},"closeTopic":"","closeQos":"0","closePayload":"","closeMsg":{},"willTopic":"","willQos":"0","willPayload":"","willMsg":{},"userProps":"","sessionExpiry":""}]

hello all,
I am trying to read messages from 2 different devices. Each has its topic to differentiate the data.
On the MQTT receiver, I dynamically change the topic to only receive messages from one device BUT if the 2nd device "speaks" I also receive its data?
When I "subscribe to single topic" in the receiver, the 2nd device is not received. It is indeed a problem of the topic dynamic.
Did I do it wrong or is there a bug in the MQTT node?
(NR v3.0.2)

You have subscribed both to the dynamic subscription. You need to unsubscribe to remove a subscription.

I don't understand:
when I inject the topic of the 1st device to listen to the 1st device and then I inject the topic of the 2nd device, does it not overwrite the already saved topic of the MQTT node?

Edited your function to unsubscribe from last subscription, before subscibing to a new one.

[{"id":"1fb4ba40eeafebfe","type":"inject","z":"da8a6ef0b3c9a5c8","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":"0.5","topic":"","payload":"221600001","payloadType":"num","x":390.00000762939453,"y":2936.6667623519897,"wires":[["0450685ce97bd101"]]},{"id":"0450685ce97bd101","type":"function","z":"da8a6ef0b3c9a5c8","name":"action & topic","func":"msg.topic = context.get(\"subscribed\") ?? \"default/topic\";\nmsg.action = \"unsubscribe\";\n\nnode.send(msg);\n\nmsg.action =\"subscribe\";\nmsg.topic = \"RGL/\" + msg.payload + \"/up\";\nnode.status({\"text\":msg.topic});\ncontext.set(\"subscribed\", msg.topic);\n return msg","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":630.0000076293945,"y":2956.6667623519897,"wires":[["35b1220411e8d077"]]},{"id":"4386d59bb91c13ab","type":"inject","z":"da8a6ef0b3c9a5c8","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":"0.5","topic":"","payload":"221600002","payloadType":"num","x":390.00000762939453,"y":3026.6667623519897,"wires":[["0450685ce97bd101"]]},{"id":"35b1220411e8d077","type":"mqtt in","z":"da8a6ef0b3c9a5c8","name":"dynamic subscription MQTT","topic":"","qos":"1","datatype":"auto-detect","broker":"e8ba3ef5.22f4a8","nl":false,"rap":true,"rh":0,"inputs":1,"x":880.0000076293945,"y":2956.6667623519897,"wires":[["f97f2165a4d11088"]]},{"id":"f97f2165a4d11088","type":"debug","z":"da8a6ef0b3c9a5c8","name":"dynamic topic","active":true,"tosidebar":true,"console":false,"tostatus":true,"complete":"topic","targetType":"msg","statusVal":"topic","statusType":"auto","x":1130.0000076293945,"y":2956.6667623519897,"wires":[]},{"id":"5daf788c232f5492","type":"mqtt in","z":"da8a6ef0b3c9a5c8","name":"","topic":"RGL/221600001/up","qos":"1","datatype":"auto-detect","broker":"e8ba3ef5.22f4a8","nl":false,"rap":true,"rh":0,"inputs":0,"x":720,"y":3030,"wires":[["80dad9e0d6a9878b"]]},{"id":"0d927d795af07a55","type":"inject","z":"da8a6ef0b3c9a5c8","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":520,"y":3160,"wires":[["c60b2dd0aed70159"]]},{"id":"80dad9e0d6a9878b","type":"debug","z":"da8a6ef0b3c9a5c8","name":"","active":true,"tosidebar":true,"console":false,"tostatus":true,"complete":"topic","targetType":"msg","statusVal":"topic","statusType":"auto","x":990,"y":3030,"wires":[]},{"id":"c60b2dd0aed70159","type":"mqtt out","z":"da8a6ef0b3c9a5c8","name":"","topic":"RGL/221600001/up","qos":"1","retain":"","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"e8ba3ef5.22f4a8","x":840,"y":3160,"wires":[]},{"id":"da099b394c632927","type":"mqtt out","z":"da8a6ef0b3c9a5c8","name":"","topic":"RGL/221600002/up","qos":"1","retain":"","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"e8ba3ef5.22f4a8","x":840,"y":3230,"wires":[]},{"id":"ba6c3d52cef8d2f6","type":"inject","z":"da8a6ef0b3c9a5c8","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":520,"y":3230,"wires":[["da099b394c632927"]]},{"id":"e8ba3ef5.22f4a8","type":"mqtt-broker","name":"testb","broker":"192.168.1.25","port":"1883","clientid":"node-red-test","autoConnect":true,"usetls":false,"compatmode":false,"protocolVersion":"4","keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthRetain":"false","birthPayload":"","birthMsg":{},"closeTopic":"","closeQos":"0","closePayload":"","closeMsg":{},"willTopic":"","willQos":"0","willPayload":"","willMsg":{},"userProps":"","sessionExpiry":""}]

you can subscribe to multiple topics with dynamic mode. When you add a new one the old one stays.

With dynamic subscription, if you subscribe to a topic that does not unsubscribe existing topics. If you want to unsubscribe from the first topic you must do that explicitly. See the help text for the node, the section on Dynamic Subscription

@colin @E1cid Ok great guys, I missed that part! Thank you

[{"id":"eaabcdc141e65db5","type":"inject","z":"fec3a58f1cc8836e","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":"0.5","topic":"","payload":"221600001","payloadType":"num","x":130,"y":20,"wires":[["f6524aa0cd9929cb"]]},{"id":"f6524aa0cd9929cb","type":"function","z":"fec3a58f1cc8836e","name":"action & topic","func":"msg.topic = context.get(\"subscribed\") ?? \"default/topic\";\nmsg.action = \"unsubscribe\";\n\nnode.send(msg);\n\nmsg.action =\"subscribe\";\nmsg.topic = \"RGL/\" + msg.payload + \"/up\";\nnode.status({\"text\":msg.topic});\ncontext.set(\"subscribed\", msg.topic);\n return msg\n\n\n /*\n ORIGINAL\n msg.action =\"subscribe\"\nmsg.topic = \"RGL/\" + msg.payload + \"/up\"\nnode.status({\"text\":msg.topic})\n return msg\n */","outputs":1,"noerr":0,"initialize":"","finalize":"","libs":[],"x":370,"y":40,"wires":[["db18132228452dab","2cd30fe03e4d32ee"]]},{"id":"bc0c1f18e3ed30dc","type":"inject","z":"fec3a58f1cc8836e","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":"0.5","topic":"","payload":"221600002","payloadType":"num","x":130,"y":110,"wires":[["f6524aa0cd9929cb"]]},{"id":"db18132228452dab","type":"mqtt in","z":"fec3a58f1cc8836e","name":"dynamic subscription MQTT","topic":"","qos":"1","datatype":"auto-detect","broker":"846ddbe9b23fde48","nl":false,"rap":true,"rh":0,"inputs":1,"x":620,"y":40,"wires":[["4603a08acd226b6c"]]},{"id":"4603a08acd226b6c","type":"debug","z":"fec3a58f1cc8836e","name":"dynamic topic","active":true,"tosidebar":true,"console":false,"tostatus":true,"complete":"topic","targetType":"msg","statusVal":"topic","statusType":"auto","x":870,"y":40,"wires":[]},{"id":"376ea748c67f94b8","type":"mqtt in","z":"fec3a58f1cc8836e","name":"","topic":"RGL/221600001/up","qos":"1","datatype":"auto-detect","broker":"846ddbe9b23fde48","nl":false,"rap":true,"rh":0,"inputs":0,"x":650,"y":110,"wires":[["633931f9dd88bb65"]]},{"id":"99930e4f658f2702","type":"inject","z":"fec3a58f1cc8836e","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":259.99999237060547,"y":243.33323764801025,"wires":[["18ab148a0ab5e1e7"]]},{"id":"633931f9dd88bb65","type":"debug","z":"fec3a58f1cc8836e","name":"","active":false,"tosidebar":true,"console":false,"tostatus":true,"complete":"topic","targetType":"msg","statusVal":"topic","statusType":"auto","x":850,"y":110,"wires":[]},{"id":"18ab148a0ab5e1e7","type":"mqtt out","z":"fec3a58f1cc8836e","name":"","topic":"RGL/221600001/up","qos":"1","retain":"","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"846ddbe9b23fde48","x":579.9999923706055,"y":243.33323764801025,"wires":[]},{"id":"2a8fe612fbc27146","type":"mqtt out","z":"fec3a58f1cc8836e","name":"","topic":"RGL/221600002/up","qos":"1","retain":"","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"846ddbe9b23fde48","x":579.9999923706055,"y":313.33323764801025,"wires":[]},{"id":"0538950d4ceace96","type":"inject","z":"fec3a58f1cc8836e","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":259.99999237060547,"y":313.33323764801025,"wires":[["2a8fe612fbc27146"]]},{"id":"cd0e9e9d4960b35c","type":"comment","z":"fec3a58f1cc8836e","name":"listen 1st device","info":"","x":100,"y":50,"wires":[]},{"id":"c1394791b6bcdd8a","type":"comment","z":"fec3a58f1cc8836e","name":"listen 2nd device","info":"","x":100,"y":140,"wires":[]},{"id":"90fb576b4ef7da9a","type":"comment","z":"fec3a58f1cc8836e","name":"1st device talk","info":"","x":300,"y":270,"wires":[]},{"id":"f45b2e4a6a2d8204","type":"comment","z":"fec3a58f1cc8836e","name":"2nd device talk","info":"","x":310,"y":340,"wires":[]},{"id":"80017dec4e393341","type":"comment","z":"fec3a58f1cc8836e","name":"subscribe to single topic","info":"","x":660,"y":160,"wires":[]},{"id":"2cd30fe03e4d32ee","type":"debug","z":"fec3a58f1cc8836e","name":"all","active":true,"tosidebar":true,"console":false,"tostatus":true,"complete":"true","targetType":"full","statusVal":"topic","statusType":"auto","x":420,"y":130,"wires":[]},{"id":"846ddbe9b23fde48","type":"mqtt-broker","name":"","broker":"192.168.0.58","port":"1883","clientid":"","autoConnect":true,"usetls":false,"protocolVersion":"4","keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","birthMsg":{},"closeTopic":"","closeQos":"0","closePayload":"","closeMsg":{},"willTopic":"","willQos":"0","willPayload":"","willMsg":{},"userProps":"","sessionExpiry":""}]

Sorry to bring this topic up but it still doesn't work even in "unsubscribe":

1/ I listen to the 1st device: unsubscribe then subscribe
2/ the 1st device speaks => ok I receive it well
3/ the 2nd speaks => I also receive the 2nd device !!??

We can clearly see under the Function node that we have selected the "1" as the topic, and the result on the Debug node that the "2" is passed!?

That works correctly for me. I don't see the erroneous messages that you are seeing.
You said you are using node red 3.0.2, what version of nodejs are you using? Use node -v to find out.
What broker and version are you using?

Looks like you had already subscribed to RGL/221600002/up before injecting 221600001, but not via [that version of] "action & topic" so it was not in context.subscribed and did not unsubscribe.

1 Like

I could only see your error if i hit the topic 2 subscribe inject two or more times.

I think you will have to put a check in so only one press is passed

const topic = "RGL/" + msg.payload + "/up";
msg.topic = context.get("subscribed") ?? "default/topic";
if(topic != msg.topic){
    msg.action = "unsubscribe";
    node.send(msg);

    msg.action ="subscribe";
    msg.topic = topic;
    node.status({"text":msg.topic});
    context.set("subscribed", msg.topic);
}else{
    msg = null;
}
 return msg

I am not seeing the problem in that case, and it should not matter as each time you press it, it should unsubscribe then resubscribe.

I did see it. It is not consistent, i believe it is at fresh start up. Two of us have seen it, so it is occurring. With the new function (below) I have not seen it again.

const topic = "RGL/" + msg.payload + "/up";
msg.topic = context.get("subscribed") ?? "start";
if(topic != msg.topic){
    if(msg.topic != "start"){
        msg.action = "unsubscribe";
        node.send(msg);
    }
    
    msg.action ="subscribe";
    msg.topic = topic;
    node.status({"text":msg.topic});
    context.set("subscribed", msg.topic);
}else{
    msg = null;
}
 return msg

In that case it suggests there is a bug in the node that unsubscribing and resubscribing the same topic does not always work, or something similar. In which case it would be good to track it down. Could you revert to the original and attempt to find a consistent failure mode?

@SuperNinja are you seeing a consistent failure or one that is intermittent?

Digging further it occurs if subscribed and the context gets deleted, @SuperNinja is your context persistent?
https://nodered.org/docs/user-guide/context#context-stores

Clearing the context while currently subscribed would certainly trigger it, as it would not unsubscribe the current one. However, I don't think that non-persistent context would cause that as it would only get deleted on a node-red restart, in which case nothing would be subscribed at the start.

I think @SuperNinja may of restart node-red, or delete context manually, or replaced the function after subscribing,

Yes, agreed. That is why we need to know whether it recovers when he presses the buttons again, or whether it is just not working for him at all.

Hey guys, @Colin @E1cid @jbudd that's it I'm back!

Digging further it occurs if subscribed and the context gets deleted, @SuperNinja is your context persistent?
Working with context : Node-RED

I would find it difficult to "dig" into the NodeRed files because it runs on a synology server and the network admin is not easily reachable ... I would still try to have the content of "contextStorage settings. js"

However ,
1 / this morning when I arrived, I did not have a ghost frame.
2/ unfortunately I don't have control over the server but it seems to me that it restarted last night.

By dynamically changing the topic, I only have the response from the correct device. I put the @E1cid code that works:

msg.topic = context.get("subscribed") ?? "default/topic";
msg.action = "unsubscribe";

node.send(msg);

msg.action ="subscribe";
msg.topic = "RGL/" + msg.payload + "/up";
node.status({"text":msg.topic});
context.set("subscribed", msg.topic);
 return msg

It works without modifying the settings.js file, all stock v3.0.2 . To conclude: as written in the instructions, you must unsubscribe before subscribing to a new topic and above all, Once again: do not forget to restart Node red!!
Thanks all :wink:

That is not a general requirement. The suggestion is that you had previously subscribed and, since that was not recorded in your context var, your code did not unsubscribe it. Also, it is not necessary to restart node-red, a Restart Flows from the Deploy button dropdown would also have done it (if the assumption about how it happened is correct). However, if something strange is happening that you don't understand it is often a good idea to restart node-red just in case...

1 Like

What @colin says is exactly what will be happening.

The solution @E1cid will work from a fresh start however in your case, there is no real need to maintain the subscription - you simply unsubscribe everything BEFORE subscribe.

// 1. unsubscribe EVERYTHING
node.send({ topic: true, action: "unsubscribe" });

// 2. subscribe to RGL/<payload>/up
msg.action = "subscribe";
msg.topic = "RGL/" + msg.payload + "/up";
node.status({ "text": msg.topic });
return msg
1 Like