Export MQTT message to InfluxDB

Via a sensor connected to TTN LoRa I receive an MQTT message with the measured temperature and humidity:

payload.uplink_message.decoded_payload.TempC_SHT
payload.uplink_message.decoded_payload.Hum_SHT

My intention is to store the measurements in an Influx Database and therefore I first have to convert the above info to something like:

temp: 23.0
humi: 43.9

I have tried to do that with a Function Node but I did not succeed, probably because of my lack of knowledge. Can someone tell me the correct code to enter in that node?

[{"id":"7975535b.a29c1c","type":"tab","label":"Test LHT65","disabled":false,"info":""},{"id":"ab44a611.e93c38","type":"mqtt in","z":"7975535b.a29c1c","name":"MQTT #","topic":"#","qos":"2","datatype":"json","broker":"c1daf86e.852c98","x":240,"y":380,"wires":[["8f9758ff.9fad18","c499e849.b56128"]]},{"id":"8f9758ff.9fad18","type":"debug","z":"7975535b.a29c1c","name":"Temp","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload.uplink_message.decoded_payload.TempC_SHT","targetType":"msg","x":510,"y":260,"wires":[]},{"id":"c1840ce.1ae62f","type":"influxdb out","z":"7975535b.a29c1c","influxdb":"30c360.5b7f3ca","name":"DB Lucht","measurement":"","precision":"","retentionPolicy":"","x":700,"y":420,"wires":[]},{"id":"fa76fbf5.821468","type":"function","z":"7975535b.a29c1c","name":"","func":"\nreturn msg;","outputs":1,"noerr":0,"x":510,"y":420,"wires":[["c1840ce.1ae62f"]]},{"id":"c499e849.b56128","type":"debug","z":"7975535b.a29c1c","name":"Humi","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload.uplink_message.decoded_payload.Hum_SHT","targetType":"msg","x":510,"y":320,"wires":[]},{"id":"c1daf86e.852c98","type":"mqtt-broker","z":"","name":"Kanweg","broker":"eu1.cloud.thethings.network","port":"1883","clientid":"","usetls":false,"compatmode":false,"keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","closeTopic":"","closeQos":"0","closePayload":"","willTopic":"","willQos":"0","willPayload":""},{"id":"30c360.5b7f3ca","type":"influxdb","z":"","hostname":"192.168.2.185","port":"8086","protocol":"http","database":"lucht","name":"DB Lucht","usetls":false,"tls":""}]

You can do this in a change node:

[{"id":"bf52dfea.c0cf5","type":"change","z":"b8546237.5f9b8","name":"","rules":[{"t":"set","p":"payload","pt":"msg","to":"{\t\"temp\": payload.uplink_message.decoded_payload.TempC_SHT,\t\"humi\":payload.uplink_message.decoded_payload.Hum_SHT\t}","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":270,"y":380,"wires":[[]]}]

Very nice to help me, it works perfectly. When I look at the code like this it seems simple, but for me it was nevertheless a difficult puzzle.

Thanks!

2 Likes

In my own flows, I have a standard output to InfluxDB flow. This has a link-in node and I send to it from anywhere I need to save data. It (re)formats incoming data to the correct output format. and sends it to InfluxDB.

2 Likes

Thanks, appreciated!

Can you possibly share the flows behind these screenshots? I think its a great example.
thanks!

Obviously this is rather specific to the way I have things set up and how I configure InfluxDB. I've shared the configuration of my environment db previously.

Validate Input fn:

/**
 * Validate input suitable for the influxdb-out node.
 * We will ALWAYS use the "array containing two objects" payload
 * OR the singe Object payload (if no tags being used).
 * See the Description tab for more details.
 */

// check measurement field is set - if not exit with error
if ( ! msg.measurement ) {
    node.error('msg.measurement is missing')
    node.send([null,msg])
    return
}

let fields,tags

// if payload is an object, assume that it contains fieldName:fieldValue pairs
if ( msg.payload!== null && msg.payload.constructor.name === 'Object' ) {
    fields = msg.payload
} else if ( msg.payload!== null && msg.payload.constructor.name === 'Array' ) {
    node.error('msg.payload cannot be an array. It must be an object containing fieldName:fieldValue pairs or a single value (which would be written to the `value` field name).')
    node.send([null,msg])
    return
} else {
    // Otherwise, we always use 'value' as the default field name
    fields = {'value': msg.payload}
}

const lstFields = Object.keys(fields)

// check to make sure that there is a value field name - if not, continue but with a warning
if ( ! lstFields.includes('value') ) {
    // Lets us turn off the warning if we know what we are doing :-)
    if ( msg.noValueField !== true )
        node.warn('Default field name "vaue" not present, was that deliberate? Set msg.noValueField=true or use the `value` field name to avoid this msg')
}

// check to make sure that all field values are numeric - if not, exit with a warning
let allNumeric = true
lstFields.forEach( key => {
    // I use On/Off for simple switch values in MQTT but these are not useful
    // in InfluxDB, so translate them to ON=1 and OFF=0 (ignoring case).
    try {
        if ( fields[key].toLowerCase() === 'on' ) fields[key] = 1
        if ( fields[key].toLowerCase() === 'off' ) fields[key] = 0
    } catch (e) {}

    // then check to make sure the field is actually a number
    if ( parseFloat(fields[key]) !== fields[key] ) {
        node.error(`Field msg.payload.${key} is not numeric. Only use numbers for field values, text should go in tags.`)
        allNumeric = false
        node.send([null,msg])
        return
    }
})
if ( allNumeric === false ) {
    return
}

// check to make sure that if msg.tags is present, it is an object - if not, exit with a warning
if ( msg.tags ) {
    if ( !(msg.tags!== null && msg.tags.constructor.name === 'Object') ) {
        node.error('msg.tags is not an object - it must contain tagName:tagValue pairs')
        node.send([null,msg])
        return
    }
    tags = msg.tags
}

// Format the output to go to the InfluxDB out node
if ( msg.tags ) {
    msg.payload = [
        fields,
        tags,
    ]
} else {
    msg.payload = fields
}

return msg;

Second fn:

const splitTopic = msg.topic.split('/')

let location = splitTopic[2]

let type = splitTopic[1]
if ( splitTopic[3] === 'switch' ) type = splitTopic[3]
if ( splitTopic[3] === 'temperature_measured' ) type = 'temperature_trv'

if ( splitTopic[2] )

try {
    msg.payload = JSON.parse(msg.payload)
} catch (e) {
    // node.warn(`Failed to parse incoming MQTT payload. "${msg.topic}", ${e.message}`)
    // return
}

msg.measurement = 'environment'

msg.tags = {
    'location': location,
    'type': type,
}

return msg;

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.