hello. So I switched to node-red-contrib-postgrestor-next and It's actually working but for some reason I have a problem in json format. I have a posgres column that is jsonb and when I tried to deploy I encounter an error it says "error: invalid input syntax for type json"
my json data is a valid json I already checked that:
[{"key":"pumpingstation","value":"0.00"},{"key":"centrifuge","value":"2.2"},{"key":"Dafpump1","value":"5.01"},{"key":"Dafpump2","value":"22.69"}]
and in my postgres node query I have:
INSERT INTO public.mqtt_table (data, created_at, created_time, created_date)
VALUES ('{{msg.payload}}', '{{msg.timestamp}}', '{{msg.time}}', '{{msg.date}}');
The error exist in msg.payload. I have a function before my postgres node:
var input = msg.payload
// Split the input string into key-value pairs
var pairs = input.split(",");
// Initialize an array to store the JSON objects
var output = [];
// Loop through each key-value pair
for (var i = 0; i < pairs.length; i++) {
// Split the pair into key and value
var parts = pairs[i].split(" ");
// Create a JSON object with the key and value
var obj = {
"key": parts[0],
"value": parts[1]
};
// Add the object to the output array
output.push(obj);
}
var currentDate = new Date();
var date = currentDate.getFullYear() + "-" + (currentDate.getMonth() + 1) + "-" + currentDate.getDate();
var time = currentDate.getHours() + ":" + currentDate.getMinutes() + ":" + currentDate.getSeconds() + "." + currentDate.getMilliseconds();
msg.date = date;
msg.time = time;
msg.payload = JSON.stringify(output);
msg.timestamp = new Date().toISOString();
return msg;
for my flow I included it below.
[
{
"id": "14ee6d7566a42e0e",
"type": "tab",
"label": "Flow 1",
"disabled": false,
"info": "",
"env": []
},
{
"id": "d7befe8c4eedf592",
"type": "tab",
"label": "Flow 2",
"disabled": false,
"info": "",
"env": []
},
{
"id": "460f2b8f.a23dbc",
"type": "tab",
"label": "Postgres example flows",
"disabled": false,
"info": "# Postgres example flows\n\nThese flows demonstrate the use of the\n`node-red-contrib-postgres-multi` node.\n\n## Setup\n\nFor the flows in this tab,\nyou'll need a PostgreSQL table like so:\n\n begin;\n create table mytable\n (\n id integer not null,\n message character varying(20)\n );\n create unique index on mytable (id);\n commit;\n\nThen you'll need to configure the postgres\nblocks to have access to this database and table.\n"
},
{
"id": "48acf0a74b57fcc9",
"type": "tab",
"label": "Flow 5",
"disabled": false,
"info": ""
},
{
"id": "39b7a323071521ea",
"type": "tab",
"label": "Flow 5",
"disabled": false,
"info": ""
},
{
"id": "bc8faf152aa409d6",
"type": "mqtt-broker",
"name": "broker",
"broker": "localhost",
"port": "1884",
"clientid": "",
"autoConnect": true,
"usetls": false,
"protocolVersion": "4",
"keepalive": "60",
"cleansession": true,
"birthTopic": "",
"birthQos": "0",
"birthPayload": "",
"birthMsg": {},
"closeTopic": "",
"closeQos": "0",
"closePayload": "",
"closeMsg": {},
"willTopic": "",
"willQos": "0",
"willPayload": "",
"willMsg": {},
"userProps": "",
"sessionExpiry": ""
},
{
"id": "e0ddc1469372b4eb",
"type": "postgresDB",
"name": "postgres@127.0.0.1:5432/users",
"host": "127.0.0.1",
"hostFieldType": "str",
"port": "5432",
"portFieldType": "num",
"database": "users",
"databaseFieldType": "str",
"ssl": "false",
"sslFieldType": "bool",
"max": "10",
"maxFieldType": "num",
"min": "1",
"minFieldType": "num",
"idle": "1000",
"idleFieldType": "num",
"connectionTimeout": "10000",
"connectionTimeoutFieldType": "num",
"user": "postgres",
"userFieldType": "str",
"password": "postgres",
"passwordFieldType": "str"
},
{
"id": "3095d062c0825ec3",
"type": "mqtt in",
"z": "14ee6d7566a42e0e",
"name": "",
"topic": "data",
"qos": "0",
"datatype": "utf8",
"broker": "bc8faf152aa409d6",
"nl": false,
"rap": true,
"rh": 0,
"inputs": 0,
"x": 190,
"y": 220,
"wires": [
[
"e3844ce7e1b2ac9a"
]
]
},
{
"id": "e151017ec914297c",
"type": "postgrestor",
"z": "14ee6d7566a42e0e",
"name": "",
"query": "INSERT INTO public.mqtt_table (data, created_at, created_time, created_date) \nVALUES ('\"{{msg.payload}}\"', '{{msg.timestamp}}', '{{msg.time}}', '{{msg.date}}');",
"postgresDB": "e0ddc1469372b4eb",
"output": true,
"outputs": 1,
"x": 870,
"y": 220,
"wires": [
[
"32e561ec20b60389"
]
]
},
{
"id": "1f82b28cc4ca2366",
"type": "mqtt out",
"z": "14ee6d7566a42e0e",
"name": "",
"topic": "data",
"qos": "1",
"retain": "true",
"respTopic": "",
"contentType": "",
"userProps": "",
"correl": "",
"expiry": "",
"broker": "bc8faf152aa409d6",
"x": 1110,
"y": 460,
"wires": []
},
{
"id": "e3844ce7e1b2ac9a",
"type": "rbe",
"z": "14ee6d7566a42e0e",
"name": "",
"func": "rbe",
"gap": "",
"start": "",
"inout": "out",
"septopics": true,
"property": "payload",
"topi": "topic",
"x": 350,
"y": 220,
"wires": [
[
"0111a731629934cf"
]
]
},
{
"id": "0111a731629934cf",
"type": "function",
"z": "14ee6d7566a42e0e",
"name": "function 7",
"func": "if (msg.payload.search('pumping station') != -1) msg.payload = msg.payload.replace('pumping station', 'pumpingstation')\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 520,
"y": 220,
"wires": [
[
"871d89e7315bfc39"
]
]
},
{
"id": "871d89e7315bfc39",
"type": "function",
"z": "14ee6d7566a42e0e",
"name": "function 8",
"func": "var input = msg.payload\n\n// Split the input string into key-value pairs\nvar pairs = input.split(\",\");\n\n// Initialize an array to store the JSON objects\nvar output = [];\n\n// Loop through each key-value pair\nfor (var i = 0; i < pairs.length; i++) {\n // Split the pair into key and value\n var parts = pairs[i].split(\" \");\n\n // Create a JSON object with the key and value\n var obj = {\n \"key\": parts[0],\n \"value\": parts[1]\n };\n\n // Add the object to the output array\n output.push(obj);\n}\n\nvar currentDate = new Date();\nvar date = currentDate.getFullYear() + \"-\" + (currentDate.getMonth() + 1) + \"-\" + currentDate.getDate();\nvar time = currentDate.getHours() + \":\" + currentDate.getMinutes() + \":\" + currentDate.getSeconds() + \".\" + currentDate.getMilliseconds();\n\nmsg.date = date;\nmsg.time = time;\nmsg.payload = JSON.stringify(output);\nmsg.timestamp = new Date().toISOString();\n\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 700,
"y": 220,
"wires": [
[
"e151017ec914297c",
"32e561ec20b60389"
]
]
},
{
"id": "32e561ec20b60389",
"type": "debug",
"z": "14ee6d7566a42e0e",
"name": "debug 6",
"active": true,
"tosidebar": true,
"console": true,
"tostatus": false,
"complete": "payload",
"targetType": "msg",
"statusVal": "",
"statusType": "auto",
"x": 1050,
"y": 120,
"wires": []
},
{
"id": "e9a9a2e8.4f4ca",
"type": "inject",
"z": "d7befe8c4eedf592",
"name": "",
"props": [
{
"p": "payload"
},
{
"p": "topic",
"vt": "str"
}
],
"repeat": "",
"crontab": "",
"once": false,
"onceDelay": 0.1,
"topic": "data",
"payload": "[{\"key\":\"pumpingstation\",\"value\":\"0.00\"},{\"key\":\"centrifuge\",\"value\":\"21.08\"},{\"key\":\"Dafpump1\",\"value\":\"5.20\"},{\"key\":\"Dafpump2\",\"value\":\"23.10\"}]",
"payloadType": "json",
"x": 350,
"y": 340,
"wires": [
[
"bf8fa5f5.5b7e5"
]
]
},
{
"id": "bf8fa5f5.5b7e5",
"type": "function",
"z": "d7befe8c4eedf592",
"name": "Insert into postgresql",
"func": "var topic1 = msg.topic;\nvar payload1 = JSON.stringify(msg.payload);\n\nmsg.payload = [{\n query: \"insert into mqtt_table (topic) values ('data')\",\n params: [topic1]\n}];\n// Return the modified message\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 630,
"y": 340,
"wires": [
[]
]
},
{
"id": "fdab0152a508eee2",
"type": "debug",
"z": "d7befe8c4eedf592",
"name": "debug 2",
"active": true,
"tosidebar": true,
"console": true,
"tostatus": false,
"complete": "payload",
"targetType": "msg",
"statusVal": "",
"statusType": "auto",
"x": 930,
"y": 220,
"wires": []
},
{
"id": "5498d534.e9cc54",
"type": "comment",
"z": "460f2b8f.a23dbc",
"name": "Reset",
"info": "This flow clears any contents from\nthe `mytable` table and inserts a single\nrecord:\n\n id message\n 1 'hello world'\n",
"x": 90,
"y": 40,
"wires": []
},
{
"id": "2ef9ce55.1ee282",
"type": "inject",
"z": "460f2b8f.a23dbc",
"name": "click me",
"repeat": "",
"crontab": "",
"once": false,
"topic": "",
"payload": "",
"payloadType": "date",
"x": 120,
"y": 100,
"wires": [
[
"98595979.a97fe"
]
]
},
{
"id": "98595979.a97fe",
"type": "function",
"z": "460f2b8f.a23dbc",
"name": "prepare",
"func": "\nmsg.payload = [\n {\n query: 'begin',\n },\n {\n query: 'delete from mytable',\n },\n {\n query: 'insert into mytable (id, message) values ($1, $2)',\n params: [1, 'hello world'],\n },\n {\n query: 'select * from mytable',\n output: true,\n },\n {\n query: 'commit',\n },\n];\n\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 300,
"y": 100,
"wires": [
[]
]
},
{
"id": "87793132.0d9d88",
"type": "debug",
"z": "460f2b8f.a23dbc",
"name": "",
"active": true,
"tosidebar": true,
"console": true,
"tostatus": false,
"complete": "payload",
"targetType": "msg",
"statusVal": "",
"statusType": "auto",
"x": 689.5,
"y": 100,
"wires": []
},
{
"id": "27e47fba.f5896",
"type": "comment",
"z": "460f2b8f.a23dbc",
"name": "Demo",
"info": "This flow demonstrates:\n\n* transactions and auto-commit queries\n* positional arguments (i.e. $argName)\n* query output from multiple queries\n",
"x": 90,
"y": 180,
"wires": []
},
{
"id": "2de05c89.ff8454",
"type": "inject",
"z": "460f2b8f.a23dbc",
"name": "click me",
"repeat": "",
"crontab": "",
"once": false,
"topic": "",
"payload": "",
"payloadType": "date",
"x": 120,
"y": 240,
"wires": [
[
"b5127483.4ae8d8"
]
]
},
{
"id": "b5127483.4ae8d8",
"type": "function",
"z": "460f2b8f.a23dbc",
"name": "prepare",
"func": "\nmsg.payload = [\n {\n query: 'begin',\n },\n {\n query: 'delete from mytable',\n },\n {\n query:\n 'insert into mytable (id, message) ' +\n 'values ($foo, $bar), ($baz, $boop)',\n params: {\n foo: 10,\n bar: 'hello',\n baz: 20,\n boop: 'world',\n },\n },\n {\n query: 'commit',\n },\n {\n query: 'select * from mytable',\n output: true,\n },\n {\n query: 'insert into mytable (id, message) values (30, \\'xtra\\')',\n },\n {\n query: 'select message from mytable order by id',\n output: true,\n },\n];\n\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 300,
"y": 240,
"wires": [
[]
]
},
{
"id": "68797b5f.c0a3ec",
"type": "debug",
"z": "460f2b8f.a23dbc",
"name": "",
"active": true,
"console": "false",
"complete": "false",
"x": 679.5,
"y": 240,
"wires": []
},
{
"id": "35f6d8a5.a36018",
"type": "comment",
"z": "460f2b8f.a23dbc",
"name": "Rollback",
"info": "If you start a transaction and don't commit it,\nyour changes will not be saved.",
"x": 100,
"y": 320,
"wires": []
},
{
"id": "ba356a1b.b7c2e8",
"type": "inject",
"z": "460f2b8f.a23dbc",
"name": "click me",
"repeat": "",
"crontab": "",
"once": false,
"topic": "",
"payload": "",
"payloadType": "date",
"x": 120,
"y": 380,
"wires": [
[
"73628965.7539d8"
]
]
},
{
"id": "73628965.7539d8",
"type": "function",
"z": "460f2b8f.a23dbc",
"name": "prepare",
"func": "\nmsg.payload = [\n {\n query: 'begin',\n },\n {\n query: 'delete from mytable',\n },\n {\n query:\n 'insert into mytable (id, message) ' +\n 'values ($foo, $bar), ($baz, $boop), ($bing, $bang)',\n params: {\n foo: 10,\n bar: 'one does not simply',\n baz: 20,\n boop: 'begin',\n bing: 30,\n bang: 'but not commit',\n },\n },\n {\n query: 'select message from mytable order by id',\n output: true,\n },\n];\n\nreturn msg;",
"outputs": 1,
"noerr": 0,
"x": 300,
"y": 380,
"wires": [
[]
]
},
{
"id": "77248a23.6a48b4",
"type": "debug",
"z": "460f2b8f.a23dbc",
"name": "",
"active": true,
"console": "false",
"complete": "false",
"x": 679.5,
"y": 380,
"wires": []
},
{
"id": "b5a1d7d5.4301a8",
"type": "function",
"z": "460f2b8f.a23dbc",
"name": "check",
"func": "\nmsg.payload = [\n {\n query: 'select message from mytable order by id',\n output: true,\n },\n];\n\nreturn msg;",
"outputs": 1,
"noerr": 0,
"x": 290,
"y": 460,
"wires": [
[]
]
},
{
"id": "b0f96526.7a4648",
"type": "debug",
"z": "460f2b8f.a23dbc",
"name": "",
"active": true,
"console": "false",
"complete": "false",
"x": 679.5,
"y": 460,
"wires": []
},
{
"id": "d529a78a2e573d90",
"type": "inject",
"z": "48acf0a74b57fcc9",
"name": "",
"props": [
{
"p": "payload"
},
{
"p": "topic",
"vt": "str"
}
],
"repeat": "",
"crontab": "",
"once": false,
"onceDelay": 0.1,
"topic": "",
"payload": "",
"payloadType": "date",
"x": 140,
"y": 120,
"wires": [
[
"e07778d9b2c8fb02"
]
]
},
{
"id": "20cc6b5c533c30e6",
"type": "function",
"z": "48acf0a74b57fcc9",
"name": "Write to PostgreSQL",
"func": "var a = global.get('value1');\nvar b = global.get('value2');\nvar c = global.get('value3');\nvar now = new Date().toLocaleString(\"DE\");\n\nmsg.payload = \"INSERT INTO mqtt_table (topic) VALUES (\"+a+\");\"\nreturn msg;\n\n",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 580,
"y": 320,
"wires": [
[
"f5e2c8b9af25420c"
]
]
},
{
"id": "f5e2c8b9af25420c",
"type": "debug",
"z": "48acf0a74b57fcc9",
"name": "",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "false",
"statusVal": "",
"statusType": "auto",
"x": 750,
"y": 60,
"wires": []
},
{
"id": "e07778d9b2c8fb02",
"type": "delay",
"z": "48acf0a74b57fcc9",
"name": "",
"pauseType": "delay",
"timeout": "1",
"timeoutUnits": "seconds",
"rate": "1",
"nbRateUnits": "1",
"rateUnits": "second",
"randomFirst": "1",
"randomLast": "5",
"randomUnits": "seconds",
"drop": false,
"allowrate": false,
"outputs": 1,
"x": 370,
"y": 320,
"wires": [
[
"20cc6b5c533c30e6"
]
]
},
{
"id": "7c7d587c0fb0613f",
"type": "inject",
"z": "39b7a323071521ea",
"name": "",
"props": [
{
"p": "payload"
},
{
"p": "topic",
"vt": "str"
}
],
"repeat": "",
"crontab": "",
"once": false,
"onceDelay": 0.1,
"topic": "",
"payload": "",
"payloadType": "date",
"x": 140,
"y": 120,
"wires": [
[
"280df0e833b808c0"
]
]
},
{
"id": "04d4d7e472ea72e8",
"type": "comment",
"z": "39b7a323071521ea",
"name": "Read from PLC",
"info": "",
"x": 140,
"y": 40,
"wires": []
},
{
"id": "313ea4faad2b4e26",
"type": "function",
"z": "39b7a323071521ea",
"name": "Write to PostgreSQL",
"func": "var a = global.get('value1');\nvar b = global.get('value2');\nvar c = global.get('value3');\nvar now = new Date().toLocaleString(\"DE\");\n\nmsg.payload = \"INSERT INTO temperature (topic) VALUES (\"+a+\");\"\nreturn msg;\n\n",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 580,
"y": 320,
"wires": [
[
"00165a9a8befdf68"
]
]
},
{
"id": "00165a9a8befdf68",
"type": "debug",
"z": "39b7a323071521ea",
"name": "",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "false",
"statusVal": "",
"statusType": "auto",
"x": 750,
"y": 60,
"wires": []
},
{
"id": "280df0e833b808c0",
"type": "delay",
"z": "39b7a323071521ea",
"name": "",
"pauseType": "delay",
"timeout": "1",
"timeoutUnits": "seconds",
"rate": "1",
"nbRateUnits": "1",
"rateUnits": "second",
"randomFirst": "1",
"randomLast": "5",
"randomUnits": "seconds",
"drop": false,
"allowrate": false,
"outputs": 1,
"x": 370,
"y": 320,
"wires": [
[
"313ea4faad2b4e26"
]
]
},
{
"id": "c18f0b430ab15f63",
"type": "debug",
"z": "39b7a323071521ea",
"name": "",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "payload",
"targetType": "msg",
"statusVal": "",
"statusType": "auto",
"x": 1010,
"y": 320,
"wires": []
},
{
"id": "8764ea0ccc24c421",
"type": "inject",
"z": "39b7a323071521ea",
"name": "",
"props": [
{
"p": "payload"
},
{
"p": "topic",
"vt": "str"
},
{
"p": "speed1",
"v": "50",
"vt": "str"
},
{
"p": "speed2",
"v": "60",
"vt": "str"
},
{
"p": "speed3",
"v": "70",
"vt": "str"
}
],
"repeat": "",
"crontab": "",
"once": false,
"onceDelay": 0.1,
"topic": "",
"payload": "",
"payloadType": "date",
"x": 370,
"y": 440,
"wires": [
[]
]
},
{
"id": "260cca63ad6725d4",
"type": "debug",
"z": "39b7a323071521ea",
"name": "",
"active": true,
"tosidebar": true,
"console": true,
"tostatus": true,
"complete": "true",
"targetType": "full",
"statusVal": "payload",
"statusType": "auto",
"x": 710,
"y": 440,
"wires": []
}
]
flows (1).json (5.0 KB)