Help with send MQTT with a short lived JWT

#1

Hi,
I'm try to write a flow that send MQTT data using a short lived JWT?

The problem I running into is that the standard MQTT connection node has a static user and password and I need to change it every-time the JWT expires.

So the path I have set down is to write the broker connection in a function node, I've add the require("mqtt") to the setting.js

functionGlobalContext: {
    // os:require('os'),
    // jfive:require("johnny-five"),
    // j5board:require("johnny-five").Board({repl:false})
    mqtt:require("mqtt")
},

and a function node with the connection setup, also adding the client object to flow context so it can be used by the publish and subscribe function, like:

node.status({fill:"red",shape:"ring",text:"disconnected"});
var mqtt = global.get('mqtt');

var options = {
    host: 'mqtt.example.com',
    port: 8883,
    protocol: 'mqtts',
    keepalive: 10000,
    clientId: 'Client001',
    username: (flow.get("access_token") || 0),
    password:'NotUsed'
}

var client = mqtt.connect(options);
flow.set("mqtt_client", client);

client.on('connect', function () {
   node.status({fill:"green",shape:"dot",text:"connected"}); 
   msg.client = client;
   return msg;
})

client.on('reconnect', function () {
   node.status({fill:"yellow",shape:"ring",text:"connecting"}); 
   msg.client = client;
   return msg;
})

client.on('close', function () {
   node.status({fill:"red",shape:"ring",text:"disconnected"}); 
   client.end();
   msg.client = client;
   return msg;
})

return msg;

This seems to work the status updates on change of state but no msg are returned.

same for the subscribe function:

client = flow.get("mqtt_client") || 0;

if (client.connected) {
    node.status({fill:"green",shape:"ring",text:"connected"});
    client.subscribe('test/function/client', function (err) {
        if (err) {
            node.status({fill:"yellow",shape:"ring",text:"error"});
        } else {
            node.status({fill:"green",shape:"dot",text:"subscribed"});
        }
    });
} else {
    node.status({fill:"red",shape:"ring",text:"disconnected"});
    flow.set("mqtt_client", client);
}

client.on('message', function (topic, message) {
    node.status({fill:"green",shape:"dot",text:"message: " + message.toString()});
    msg.topic = topic.toString();
    msg.playload = message.toString();
    return msg;
})

and for completeness the publish function

client = flow.get("mqtt_client") || 0;

if (client.connected) {
    node.status({fill:"green",shape:"ring",text:"connected"});
    client.publish('test/function/client', 'Hello mqtt', function (err) {
        if (err) {
            node.status({fill:"yellow",shape:"ring",text:"error"});
        } else {
            node.status({fill:"green",shape:"dot",text:"published"});
        }
    });
} else {
    node.status({fill:"red",shape:"ring",text:"disconnected"});
    flow.set("mqtt_client", client);
}
0 Likes

#2

You are returning messages from inside callback functions. That is not the same as returning a message from the function node itself.

The docs tell you how to send messages asynchronously. https://nodered.org/docs/writing-functions#sending-messages-asynchronously

0 Likes

#3

Thanks,
Don't know how I missed that one, thanks again.

Regards

0 Likes