Four InfluxDB node (data is very confusing and not unique)


When I use four influxDB nodes to send four measurements, these four measurements are the data of four channels of an adc. I added tags and timestamps, and the data visualization is very confusing.
As shown below(The normal waveform is a square wave):

Why are you not writing the four values to one measurement? That would be a lot more efficient.
Or are you sending them all the same measurement but not all in one message, so you get only one channel at a time, so each record in the db has data from one channel and the others are missing.
Show us what you are sending to the db nodes and how you have configured them.

Also, if this is current data then don't add a timestamp, influx will add the current time when you write it.

The serial port input is separated by commas, and then each group of four strings forms four channels of ECG signal data. The fecg_raw function takes different bits for the reorganization and outputs them to the four channels through four different outputs. After some processing, it will be sent to the influxDB out node. Excuse me. Do I need an influxDB batch node here? Because I have four outputs, and I don't know how to add them to one measurement.

[
    {
        "id": "199c6067e4648a8f",
        "type": "tab",
        "label": "流程 10",
        "disabled": false,
        "info": "",
        "env": []
    },
    {
        "id": "f847df7b369debf6",
        "type": "batch",
        "z": "199c6067e4648a8f",
        "name": "",
        "mode": "count",
        "count": "4",
        "overlap": 0,
        "interval": 10,
        "allowEmptySequence": false,
        "topics": [],
        "x": 290,
        "y": 140,
        "wires": [
            [
                "ba7c9bf7fb764023"
            ]
        ]
    },
    {
        "id": "e02481556eb6cfc7",
        "type": "serial in",
        "z": "199c6067e4648a8f",
        "name": "",
        "serial": "fe7056354f94eec3",
        "x": 130,
        "y": 140,
        "wires": [
            [
                "f847df7b369debf6"
            ]
        ]
    },
    {
        "id": "ba7c9bf7fb764023",
        "type": "join",
        "z": "199c6067e4648a8f",
        "name": "",
        "mode": "auto",
        "build": "object",
        "property": "payload",
        "propertyType": "msg",
        "key": "topic",
        "joiner": "\\n",
        "joinerType": "str",
        "accumulate": true,
        "timeout": "",
        "count": "",
        "reduceRight": false,
        "reduceExp": "",
        "reduceInit": "",
        "reduceInitType": "",
        "reduceFixup": "",
        "x": 150,
        "y": 240,
        "wires": [
            [
                "7fd5129a3d1bb6ae"
            ]
        ]
    },
    {
        "id": "af6fdeee62c69904",
        "type": "function",
        "z": "199c6067e4648a8f",
        "name": "Fecg_raw",
        "func": "var msg1,msg2,msg3,msg4,l;\nif(msg.payload.length == 41)\n {\n msg1 = {payload:msg.payload.toString().substring(1,7)};\n msg2 = {payload:msg.payload.toString().substring(11,17)};\n msg3 = {payload:msg.payload.toString().substring(21,27)};\n msg4 = {payload:msg.payload.toString().substring(31,37)};\n }\n return [[msg1], [msg2], [msg3],[msg4]];",
        "outputs": 4,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 460,
        "y": 240,
        "wires": [
            [
                "350ce3c92346556d"
            ],
            [
                "ebd1e1a9a1486eed"
            ],
            [
                "b8a19e88566c62d4"
            ],
            [
                "5be163749b85fd6f"
            ]
        ]
    },
    {
        "id": "7fd5129a3d1bb6ae",
        "type": "csv",
        "z": "199c6067e4648a8f",
        "name": "",
        "sep": ",",
        "hdrin": "",
        "hdrout": "none",
        "multi": "mult",
        "ret": "\\r\\n",
        "temp": "",
        "skip": "0",
        "strings": true,
        "include_empty_strings": true,
        "include_null_values": true,
        "x": 290,
        "y": 240,
        "wires": [
            [
                "af6fdeee62c69904"
            ]
        ]
    },
    {
        "id": "350ce3c92346556d",
        "type": "function",
        "z": "199c6067e4648a8f",
        "name": "Fecg_CH1",
        "func": "var tmp=msg.payload;\nvar y;\nvar m,n;\nvar x=parseInt(tmp,16);\nmsg.topic = \"ch1\";\nif(x > 8388608)\n{\n    x = (x-16777216)*2.235/100000;\n}\nelse if(0<=x<8388608)\n{\n    x = x*2.235/100000;\n}\ny=parseFloat(x);\nm = y.toFixed(2);\nmsg.payload = parseFloat(m);\n\nreturn msg;",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 670,
        "y": 140,
        "wires": [
            []
        ]
    },
    {
        "id": "ebd1e1a9a1486eed",
        "type": "function",
        "z": "199c6067e4648a8f",
        "name": "Fecg_CH2",
        "func": "var tmp=msg.payload;\nvar y;\nvar m;\nvar x=parseInt(tmp,16);\nmsg.topic = \"ch2\";\nif(x > 8388608)\n{\n    x = (x-16777216)*2.235/100000;\n}\nelse if(0<=x<8388608)\n{\n    x = x*2.235/100000;\n}\ny=parseFloat(x);\nm = y.toFixed(2);\nmsg.payload = parseFloat(m);\nreturn msg;",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 670,
        "y": 200,
        "wires": [
            []
        ]
    },
    {
        "id": "b8a19e88566c62d4",
        "type": "function",
        "z": "199c6067e4648a8f",
        "name": "Fecg_CH3",
        "func": "var tmp=msg.payload;\nvar y;\nvar m;\nvar x=parseInt(tmp,16);\nmsg.topic = \"ch3\";\nif(x > 8388608)\n{\n    x = (x-16777216)*2.235/100000;\n}\nelse if(0<=x<8388608)\n{\n    x = x*2.235/100000;\n}\ny=parseFloat(x);\nm = y.toFixed(2);\nmsg.payload  = parseFloat(m);\nreturn msg;",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 670,
        "y": 260,
        "wires": [
            []
        ]
    },
    {
        "id": "5be163749b85fd6f",
        "type": "function",
        "z": "199c6067e4648a8f",
        "name": "Fecg_CH4",
        "func": "var tmp=msg.payload;\nvar y;\nvar m;\nvar x=parseInt(tmp,16);\nmsg.topic = \"ch4\";\nif(x > 8388608)\n{\n    x = (x-16777216)*2.235/100000;\n}\nelse if(0<=x<8388608)\n{\n    x = x*2.235/100000;\n}\ny=parseFloat(x);\nm = y.toFixed(2);\nmsg.payload = parseFloat(m);\nreturn msg;",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 670,
        "y": 320,
        "wires": [
            []
        ]
    },
    {
        "id": "2364abaca35273a9",
        "type": "influxdb batch",
        "z": "199c6067e4648a8f",
        "influxdb": "56723ee92658165f",
        "precision": "",
        "retentionPolicy": "",
        "name": "",
        "database": "database",
        "precisionV18FluxV20": "ms",
        "retentionPolicyV18Flux": "",
        "org": "fecg",
        "bucket": "test",
        "x": 990,
        "y": 240,
        "wires": []
    },
    {
        "id": "bb206c915cd9b8c4",
        "type": "function",
        "z": "199c6067e4648a8f",
        "name": "Fecg_CH1with tag",
        "func": "var tmp=msg.payload;\nvar y;\nvar m,n;\nvar x=parseInt(tmp,16);\n// msg.topic = 'CH4';\nif(x > 8388608)\n{\n    x = (x-16777216)*2.235/100000;\n}\nelse if(0<=x<8388608)\n{\n    x = x*2.235/100000;\n}\ny=parseFloat(x);\nm = y.toFixed(2);\nn = parseFloat(m);\nmsg.payload = [\n    {\n     \n        temp: n,\n        tags:{\n            location:\"ch4\"\n        },\n        timestamp: new Date()\n    }\n];\nreturn msg;",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 270,
        "y": 460,
        "wires": [
            []
        ]
    },
    {
        "id": "62b5d207241ddb96",
        "type": "ui_spacer",
        "z": "199c6067e4648a8f",
        "name": "spacer",
        "group": "f9c34dde.1e2f4",
        "order": 12,
        "width": 8,
        "height": 1
    },
    {
        "id": "ce80b1b26551e9d6",
        "type": "ui_spacer",
        "z": "199c6067e4648a8f",
        "name": "spacer",
        "group": "f9c34dde.1e2f4",
        "order": 18,
        "width": 8,
        "height": 1
    },
    {
        "id": "fe7056354f94eec3",
        "type": "serial-port",
        "serialport": "COM3",
        "serialbaud": "230400",
        "databits": "8",
        "parity": "none",
        "stopbits": "1",
        "waitfor": "",
        "dtr": "none",
        "rts": "none",
        "cts": "none",
        "dsr": "none",
        "newline": ",",
        "bin": "false",
        "out": "char",
        "addchar": "",
        "responsetimeout": "10000"
    },
    {
        "id": "56723ee92658165f",
        "type": "influxdb",
        "hostname": "127.0.0.1",
        "port": "8086",
        "protocol": "http",
        "database": "database",
        "name": "",
        "usetls": false,
        "tls": "d50d0c9f.31e858",
        "influxdbVersion": "2.0",
        "url": "http://localhost:8086",
        "rejectUnauthorized": true
    },
    {
        "id": "f9c34dde.1e2f4",
        "type": "ui_group",
        "name": "ICD",
        "tab": "17d09b07.741a55",
        "order": 1,
        "disp": true,
        "width": 26,
        "collapse": false,
        "className": ""
    },
    {
        "id": "d50d0c9f.31e858",
        "type": "tls-config",
        "name": "",
        "cert": "",
        "key": "",
        "ca": "",
        "certname": "",
        "keyname": "",
        "caname": "",
        "servername": "",
        "verifyservercert": false
    },
    {
        "id": "17d09b07.741a55",
        "type": "ui_tab",
        "name": "Home",
        "icon": "dashboard",
        "disabled": false,
        "hidden": false
    }
]

No, that is for when you want to send a complete batch of data to influx in one go. You are adding them one at a time so that is not a problem.

Let us start at the beginning. You are reading four input messages at a time into one set. If the input stream is just a sequence of comma separated values how do you know which is the first one of the four? Can you tell us more about the format of the input data? Is there an end of line or something after each four?

Hi Colin,

I also have similar problem. Combining 2 MQTT topics into single measurement.
InfluxDB Node and Bucket both are only one.

Please check my question here.

Thank you, I have solved this problem, I re-wrote a function. (ps: I really don’t know that only one of the four influxdb out nodes passed the measurement), and now I have a new problem. My data collection is 1kSPS (1kHz), so the sampling interval is 1ms, but found in influxDB The actual interval is a mixture of 20~30us and 5ms, but there are still 1000 points of data per second, but the normal interval is 1ms.

That means that the sometimes the system is not able to process the data fast enough. At that rate of input then I suggest that the first thing you do is to put all four values into one measurement as I have suggested, so there is only one db write for the four values. Perhaps you have done that already. That should give you most of a four times efficiency improvement. That may well not be enough though. You will probably have to buffer up a number of samples and send them to the db using the influxdb batch node, that will make a dramatic difference to the throughput. I suggest buffering 1000 samples at a time.

It may be, however, that node-red is just not the right tool for this job. You might be better using something like Telegraph and a python script, for example, to send it to influx.

Thank you for helping me solve my doubts.

Is the influxDB batch node needed to send 1000 data? Node red sends an array of 1000 data. How does influxDB parse this array? Is there an example?
Thanks again.

Have you looked at the help text for the node?

I read the help text. I think the difference between influxDB out node and influxDB batch node is mainly the number of measurements. I mean node red outputs an array, how does the influxDB software handle it.

I don't understand what you mean. Using the batch node you supply an array of samples (or points as the help calls them), each one of which contains the fields, tags, measurement and timestamp for that sample. That is sent off to the influx service in one transaction and influx writes them to the database.

May I ask how to add such a buffer, how to add to 1000sample, is there any example for reference?
Thanks again.

[
    {
        "id": "e02481556eb6cfc7",
        "type": "serial in",
        "z": "199c6067e4648a8f",
        "name": "",
        "serial": "fe7056354f94eec3",
        "x": 130,
        "y": 320,
        "wires": [
            [
                "11d174f3cbafc1c2"
            ]
        ]
    },
    {
        "id": "13a6b43aa0138ae8",
        "type": "function",
        "z": "199c6067e4648a8f",
        "name": "Fecg_influxDB_batch",
        "func": "var myArray = [];\nvar values  = [];\nvar x,y,m;\nif(msg.payload.length == 41)\n {\n myArray[0] = {payload:msg.payload.toString().substring(1,7)};\n myArray[1] = {payload:msg.payload.toString().substring(11,17)};\n myArray[2] = {payload:msg.payload.toString().substring(21,27)};\n myArray[3] = {payload:msg.payload.toString().substring(31,37)};\n }\n \nfor (var i = 0; i < 4; ++i)\n{\n x=parseInt(myArray[i].payload,16);\nif(x > 8388608)\n{\n  x = (x-16777216)*2.235*24/100000;\n}\nelse if(0<=x<8388608)\n{\n  x = x*2.235*24/100000;\n}\ny=parseFloat(x);\nm = y.toFixed(2);\nvalues[i] = parseFloat(m);\n}\nmsg.payload =  {\n    measurement: \"fecg_data\",\n    fields: {\n         ch1: values[0],\n         ch2: values[1],\n         ch3: values[2],\n         ch4: values[3]\n    },\n    tags:{\n    location:\"ffY\"\n    },\n    timestamp: new Date()\n   };\nreturn msg;",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 200,
        "y": 440,
        "wires": [
            [
                "cf1d149013b88e80"
            ]
        ]
    },
    {
        "id": "11d174f3cbafc1c2",
        "type": "batch",
        "z": "199c6067e4648a8f",
        "name": "",
        "mode": "count",
        "count": "4",
        "overlap": 0,
        "interval": 10,
        "allowEmptySequence": false,
        "topics": [],
        "x": 290,
        "y": 320,
        "wires": [
            [
                "d3da8d96b4b827f9"
            ]
        ]
    },
    {
        "id": "d3da8d96b4b827f9",
        "type": "join",
        "z": "199c6067e4648a8f",
        "name": "",
        "mode": "auto",
        "build": "object",
        "property": "payload",
        "propertyType": "msg",
        "key": "topic",
        "joiner": "\\n",
        "joinerType": "str",
        "accumulate": true,
        "timeout": "",
        "count": "",
        "reduceRight": false,
        "reduceExp": "",
        "reduceInit": "",
        "reduceInitType": "",
        "reduceFixup": "",
        "x": 470,
        "y": 320,
        "wires": [
            [
                "ebc5e43302fbb9fb"
            ]
        ]
    },
    {
        "id": "ebc5e43302fbb9fb",
        "type": "csv",
        "z": "199c6067e4648a8f",
        "name": "",
        "sep": ",",
        "hdrin": "",
        "hdrout": "none",
        "multi": "mult",
        "ret": "\\r\\n",
        "temp": "",
        "skip": "0",
        "strings": true,
        "include_empty_strings": true,
        "include_null_values": true,
        "x": 650,
        "y": 320,
        "wires": [
            [
                "13a6b43aa0138ae8"
            ]
        ]
    },
    {
        "id": "bf33d5f394317961",
        "type": "influxdb batch",
        "z": "199c6067e4648a8f",
        "influxdb": "56723ee92658165f",
        "precision": "",
        "retentionPolicy": "",
        "name": "",
        "database": "database",
        "precisionV18FluxV20": "us",
        "retentionPolicyV18Flux": "",
        "org": "fecg",
        "bucket": "test",
        "x": 750,
        "y": 440,
        "wires": []
    },
    {
        "id": "537c7a24.a841a4",
        "type": "debug",
        "z": "199c6067e4648a8f",
        "name": "",
        "active": true,
        "tosidebar": true,
        "console": false,
        "tostatus": false,
        "complete": "payload",
        "x": 770,
        "y": 400,
        "wires": []
    },
    {
        "id": "cf1d149013b88e80",
        "type": "batch",
        "z": "199c6067e4648a8f",
        "name": "",
        "mode": "count",
        "count": "1000",
        "overlap": 0,
        "interval": 10,
        "allowEmptySequence": false,
        "topics": [],
        "x": 390,
        "y": 440,
        "wires": [
            [
                "cc4ccf4db3b5db27"
            ]
        ]
    },
    {
        "id": "cc4ccf4db3b5db27",
        "type": "join",
        "z": "199c6067e4648a8f",
        "name": "",
        "mode": "auto",
        "build": "object",
        "property": "payload",
        "propertyType": "msg",
        "key": "topic",
        "joiner": "\\n",
        "joinerType": "str",
        "accumulate": true,
        "timeout": "",
        "count": "",
        "reduceRight": false,
        "reduceExp": "",
        "reduceInit": "",
        "reduceInitType": "",
        "reduceFixup": "",
        "x": 530,
        "y": 440,
        "wires": [
            [
                "bf33d5f394317961",
                "537c7a24.a841a4"
            ]
        ]
    },
    {
        "id": "fe7056354f94eec3",
        "type": "serial-port",
        "serialport": "COM3",
        "serialbaud": "230400",
        "databits": "8",
        "parity": "none",
        "stopbits": "1",
        "waitfor": "",
        "dtr": "none",
        "rts": "none",
        "cts": "none",
        "dsr": "none",
        "newline": ",",
        "bin": "false",
        "out": "char",
        "addchar": "",
        "responsetimeout": "10000"
    },
    {
        "id": "56723ee92658165f",
        "type": "influxdb",
        "hostname": "127.0.0.1",
        "port": "8086",
        "protocol": "http",
        "database": "database",
        "name": "",
        "usetls": false,
        "tls": "d50d0c9f.31e858",
        "influxdbVersion": "2.0",
        "url": "http://localhost:8086",
        "rejectUnauthorized": true
    },
    {
        "id": "d50d0c9f.31e858",
        "type": "tls-config",
        "name": "",
        "cert": "",
        "key": "",
        "ca": "",
        "certname": "",
        "keyname": "",
        "caname": "",
        "servername": "",
        "verifyservercert": false
    }
]

Is it working now?


2fc1a356d8edc47f72410b0f72d1199

Three questions

  1. msg.payload does not output array[1000].
    2.As shown in the second picture: a node is outputting values ​​(node ​​as circle marked), this node isn't debug node. I don't know where this node is?
    3.In influxDB , I download csv file. I found it's just 100 samples(actual I need 1000 samples). The interval between two samples is 1ms(min)、30ms、50ms........
    In summary, I use influxDB out node(interval 30us\50us\5ms...) and I can get 1000 samples per second,
    I use influxDB batch node(interval 1ms(min)\30ms\50ms...) and I just can get 100 samples.

Does this mean that node red is not the ideal tool for this problem?
About other methods, can you give me a link so that I can learn about it?
Thanks again. :grinning:

I your original solution you were producing a stream of individual records to go to influx. Have you looked at the influx batch node and worked out what you need to generate? It needs an array of point objects, as defined in the help text. So first adjust the data you have to be correct for a point object (so you need to move the existing data to msg.payload.fields and then add a measurement field and a timestamp field. Then you can feed that into a Join node to combine them in blocks of 1000 in an array.

Note though that even with this method the time stamps may not be accurate. Linux is not a real time OS and at times node red may get locked out for a short while. Also at times node red has housekeeping tasks to perform which can prevent your nodes getting the processor exactly when needed. You may be better using something like an ESP to collect and timestamp the data into chunks and paas that to node red to put in the db

1 Like

Sorry, colin. I thought about my problem. I collect data through Bluetooth, and then use the serial port to transmit to node red, so my interval is originally 20us, 30us (the time from node red processing data to sending to influxDB) 5ms (Bluetooth interval), so this The time is right. The key to my question should be that the influxDB abscissa should be in units of data points, not time intervals.
Thank you again, you taught me a lot about this.