Trouble sending multiple records to InfluxDB via JSON

I have been working with Node Red and InfluxDB for only a couple of weeks and have run into some trouble.

My PLC sends data via MQTT in CSV format, and I have successfully parsed and injected a single row of data into InfluxDB. I have attempted to scale up to store multiple records simultaneously, though when I do so, Influx only seems to store the last record in the JSON array.

I have imported the demo flow on storing multiple records via an array of arrays and it seems to work fine. My parsing differs since I don't know how many records are being sent by any given system, so I have to dynamically create the JSON. I have verified the string's format using the JSON Parser node, and other than the data in the record, the format looks the same.

Influx only stores the last record in the array, record "Name2" in the current test data. If I add or remove a record it is always the last one that gets stored.

I've been fighting with this all day and am not sure where I've gone wrong.

Here is a copy of my flow:

[{"id":"b5a08af2.2ed918","type":"tab","label":"Flow 1","disabled":false,"info":""},{"id":"2a0a3ef2.1901b2","type":"function","z":"b5a08af2.2ed918","name":"Parse 1 to 10 Bins of Data","func":"var bins = msg.payload.split(\";;\");\nvar jsonArray = [];\n\nbins.forEach(BinToJson);\nmsg.payload = jsonArray\nreturn msg;\n\n// function to parse data of individual bins\nfunction BinToJson(item, index){\n    var tokens = item.split(\";\")\n    var binData = [{\n        BinID:      tokens[0],\n        OsaTemp:    parseFloat(tokens[1]),\n        PlenumTemp: parseFloat(tokens[2]),\n        ReturnTemp: parseFloat(tokens[3]),\n        Pile1:      parseFloat(tokens[4]),\n        Pile2:      parseFloat(tokens[5]),\n        Pile3:      parseFloat(tokens[6]),\n        Pile4:      parseFloat(tokens[7]),\n        Pile5:      parseFloat(tokens[8]),\n        Pile6:      parseFloat(tokens[9]),\n        OsaRh:      parseFloat(tokens[10]),\n        PlenumRh:   parseFloat(tokens[11]),\n        ReturnRh:   parseFloat(tokens[12]),\n        Co2:        parseFloat(tokens[13]),\n        time:       new Date()\n    }]\n    \n    jsonArray.push(binData);\n}","outputs":1,"noerr":0,"x":310,"y":100,"wires":[["c1875cd7.553a1","add867.8117c798","1c4f0221.f514ae"]]},{"id":"c1875cd7.553a1","type":"debug","z":"b5a08af2.2ed918","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","x":710,"y":100,"wires":[]},{"id":"fde2780c.51fd98","type":"inject","z":"b5a08af2.2ed918","name":"Bin Test Data","topic":"","payload":"Name0;1;2;3;4;5;6;7;8;9;10;11;12;13;;Name1;14;15;16;17;18;19;20;21;22;23;24;25;26;;Name2;27;28;29;30;31;32;33;34;35;36;37;38;39","payloadType":"str","repeat":"","crontab":"","once":false,"onceDelay":"2","x":90,"y":100,"wires":[["2a0a3ef2.1901b2"]]},{"id":"e723c399.627c5","type":"function","z":"b5a08af2.2ed918","name":"Node-Red Multi-Row Demo","func":"msg.payload = [\n    [{\n        numValue: 10,\n        randomValue: Math.random()*10,\n        strValue: \"message1\",\n        time: new Date(\"2015-12-28T19:41:13Z\").getTime()\n    }],\n    [{\n        numValue: 20,\n        randomValue: Math.random()*10,\n        strValue: \"message2\",\n        time: new Date(\"2015-12-28T19:41:14Z\").getTime()\n    }]\n];\nreturn msg;","outputs":1,"noerr":0,"x":320,"y":260,"wires":[["c46ddb8e.83dff8","1ded6713.815f79"]]},{"id":"e46a3f60.54e61","type":"inject","z":"b5a08af2.2ed918","name":"","topic":"","payload":"","payloadType":"date","repeat":"","crontab":"","once":false,"onceDelay":"","x":96,"y":260,"wires":[["e723c399.627c5"]]},{"id":"c46ddb8e.83dff8","type":"influxdb out","z":"b5a08af2.2ed918","influxdb":"e9084128.38462","name":"","measurement":"test","precision":"","retentionPolicy":"","x":710,"y":260,"wires":[]},{"id":"add867.8117c798","type":"influxdb out","z":"b5a08af2.2ed918","influxdb":"e9084128.38462","name":"","measurement":"testing","precision":"","retentionPolicy":"","x":720,"y":60,"wires":[]},{"id":"1c4f0221.f514ae","type":"json","z":"b5a08af2.2ed918","name":"","property":"payload","action":"","pretty":false,"x":530,"y":140,"wires":[["c1875cd7.553a1"]]},{"id":"1ded6713.815f79","type":"json","z":"b5a08af2.2ed918","name":"","property":"payload","action":"","pretty":false,"x":530,"y":300,"wires":[["5995febf.456f9"]]},{"id":"5995febf.456f9","type":"debug","z":"b5a08af2.2ed918","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","x":710,"y":300,"wires":[]},{"id":"e9084128.38462","type":"influxdb","z":"","hostname":"192.168.0.103","port":"8086","protocol":"http","database":"SensorData","name":"rPi DB","usetls":false,"tls":""}]

Thanks in advance.

If you are just trying to write a set of values to fields for the current time then the format you want is described in the info tab for the node as
" If msg.payload is an object containing multiple properties, the fields will be written to the measurement."
That means that it is much simpler than you are trying to do. All you need to do is to contruct an object in msg.payload something like

msg.payload = {field1: value1, field2:value2...}
return msg

@Colin That is exactly what I did for a system sending a single record of data, and it works great. If you can suggest how to to that for a dynamic number of records, please let me know.

I have multiple systems, and they all have a different number of bins they need to log data for, all of which I need to be stored in independent records. Thus the parsing needs to be dynamic. In the Info tab it also says, "if msg.payload is an array of arrays, it will be written as a series of points containing fields and tags," so what I need is more like the following, where N could be 1 to 10:

msg.payload =  [[{record1field1: record1value1, ...}],
                [{record2field1: record2value1, ...}],
                ...                                  ,
                [{recordNfield1: recordNvalue1, ...}]]

Which seems to be what I have generated, but isn't working as the demo flow does in the same format.

So it appears my problem is an issue with the timestamp, the last record is stored because it is overwriting the previous records with the same time. It looks like I'll have to store the data in separate measurements or make sure the timestamp is unique.

Thanks for you input @Colin.

Is the timestamp always the current time?

Do you mean that you need to specify both fields and tags?

Yes, but as all of the data is collected at the same time so I've only been grabbing the timestamp once per cycle. There is a bit more overhead polling the RTC for the time than storing it through the cycle. Also, there would be no guarantee that different systems wouldn't log the same time, unlikely but not impossible.

Sorry, I shouldn't have asked two questions one after the other. Is the 'yes' referring to the fields and tags question?
[Edit] Will be away for some hours.

I had not seen the field/tag question. I'm only using fields at the moment.

I did some reading up on Tags, it appears if I use different tags (ex BinID) I shouldn't overwrite data with the same timestamp as a single bin should never have two records with the same time.

If you don't use tags, then using the same timestamp will definitely result in one data point.

The first thing you should do is making yourself familiar with the base concepts of Influx, it's really important to understand what a series is and how it is composed. This documentation is a great point to start.

Then analyze your data and how it can fit into those concepts, i.e. find a schema for your data.
Keep in mind: What you want to do with it? What queries will you run?

After that, the rest is easy. But if you are going to insert multiple points at once, I would consider using the Influx batch insert node. It expects an array of point objects.

Here's an example of a point.

{
    "timestamp": 1581489124000,
    "measurement": "environment",
    "tags": {
        "sensorType": "bme280",
        "sensorId": "5",
        "location": "bath"
    },
    "fields": {
        "temperature": 19.7,
        "humidity": 48.68,
        "pressure": 976
    }
}

I think using the Influx batch insert also makes it a little more clear how to find a schema for your data, because it follows the base concepts of a point.

I think I see now what you are trying to do, which is to write records and use one of the fields in the record to identify what the other fields mean. While this would work in a conventional database it will not work in influx. To do that you must use tags as the identifier. So for a simple example you might have fields temperature and humidity representing those values at various locations. To identify the location you would use a tag. So if the tag is "bathroom" then the temperature and humidity field values are for the bathroom, if the tag is "bedroom" then they are for that room. The to send that to influx you would use payloads like

{ [  temperature: 21, humidity: 45 ], [ room: "bedroom"]}

@Colin and @kuema: Thank you for your feedback!

It appears I need to do some reading up on InfluxDB. I have not worked with time series DBs before and made some assumptions in my rush to get things working.

Yes, the concept is a bit different compared to relational DBs, it is highly optimized for the use-case. But once you get the hang of it, you can achieve a lot of cool stuff with Influx. :nerd_face:

The good thing is, it is well documented. And there are lots of examples.