Msg.payload buffer

Dear node-red Experts,

I have a node that sends multiple output @ 5 ms speed.
After that node is the influxDB output node where it write data into influxDB.
But here's the issue, I am having a trouble because it seems influxDB cannot accomodate
the speed of (5ms) to write data on its database. Upon searching on google, they say that it would always be a good idea to "write" data into a batch (multiple points @ a time).

So my question is, is there any node or function that can buff msg.payload output?
Let say everytime I received a payload, it will not output on the next node until condition happens.

For example, when I already received, 1000 payload then that is the time I will continue to the next node. Note, that the payload should be buffed, or in a form of array or anything , I just need it to buff to a certain amount or time, and then output to my influxDB node.

I am hoping that someone had tried this in their project and that can help me.
Thank you everyone!

Wirh kind regards,
Henjoe

Can you explain what you mean by that? I think there may be a couple of typos there.
For influxdb you could use the influx batch node which takes a block of a number of samples to write to the database. For accumulating the messages I would use a Join node, either to build an array, or possibly in Reduce mode, but that is subject to understanding the comment above.

Thanks for replying Colin,
Currently I am using the influx out (not the batch), and I actually want to change my influx out to influx batch out node.

I think you actually get what I want to do. As we know there is an format for batch node right? I already used join node in other applications. Can you suggest a method on how to format the output of join node to be the format does the influx batch node accept?

Again, thank you very much.

With kind regards,

What does the incoming data look like? Feed it into a debug node and do a screenshot if possible.

Thank Colin,
Will get back to you regarding the screenshot of debug node ( I am not on my testing environment right now) . But to give you an idea the data is something like this:

msg.payload = {
           value: value,
           timestamp: 12839801982093,
           quality: Good
}

it is something like that.

Thanks again.

With kind regards,

Are all the records for the same measurement and field (ie 'value')?
What about the quality, is that a field or a tag?

Yes all measurements have the same fields.
And they are all fields, I don't have Tags on my measurements.

Thanks Colin! I appreciate your effort in here.

Ok, in that case all you need to do is to get the data into an array in msg.payload, something like

[
{measurement: "whatever the measurement name is", fields: {value: 73, quality: "good"}, timestamp: 1283....},
{measurement: "whatever the measurement name is", fields: {value: -1, quality: "bad"}, timestamp: 1283....},
etc
]

If the timestamps are javascript milliseconds then in the batch node show the Advanced options and set the time precision accordingly. If you are using retention policies then that is there also.

If you want to be able to query using, for example, where quality="good" (or whatever the exact syntax is) then you should have the quality as a tag so that it is indexed. In that case the elements would be

{measurement: "whatever the measurement name is", fields: {value: 73}, tags: {quality: "good"}, timestamp: 1283....},

Thanks Colin! Yeah I should make quality as Tag, thanks for giving me this idea :slight_smile:
You're such an awesome guy!

Anyways.. So the formatting will be done thru a function node right? Because the join node doesnt format like that? Am I right in here?

You could do it in a function node, or a bit of clever JSONata in a Change node I expect, but I don't get on with JSONata.
In a Function node you can do it using msg.payload.map().
map() is a function that you call on an array and it applies a function to each member of the array and gives you back the new array. Have a look a Array.map() to see how to do it. I expect you can find an example which should get you going. It should only be a couple of lines of code doing it that way.

[Edit] Make sure to feed the result into a debug node and check it looks right before connecting it up the influx node.

1 Like

Hello Colin,

Thanks for this idea. I will try this. And I hope I get a good response on my influxDB.

I'll get back in this forum the results on this so if anyone has the same issue I get encountered might have reference too.

Again, thank you very much.

With kind regards,

Hello Colin, it seems like everything is work as what I need.

var valArr = msg.payload

var newArry = valArr.map(function(value){

var obj = {
    measurement: value.measurement,
    fields: {
        value:value.value,
    },
    tags:{
        quality:value.quality,
        description:"description"
    },
    timestamp: value.time
}

return obj

})

msg.payload = newArry;

return msg;

From Join node -> I put a function node and have the code above. The code convert each payload into a format that InfluxDB Batch out accepts.
So instead of having an speed 5ms write to my influxDB instance, I manage to write multiple data points in every second, which it makes the system more efficient.

Again, Thank you for your health!

Regards,

You can make your code a little simpler, and marginally more efficient, like this.

msg.payload = msg.payload.map(function(value) {
    return {
        measurement: value.measurement,
        fields: {
            value:value.value,
        },
        tags:{
            quality:value.quality,
            description:"description"
        },
        timestamp: value.time
    };
});
return msg;
1 Like

This topic was automatically closed 60 days after the last reply. New replies are no longer allowed.