I am using the Guaranteed delivery of data https://flows.nodered.org/flow/05e6d61f14ef6af763ec4cfd1049ab61 in my workflow on V3.1.0 of node-red. It seemed to be working ok buffering data to SQL. I let a test run over the weekend and the buffered data did not get sent to the SQL server after the connection was reestablished. Could someone sense check my function block that sends the control to the Guaranteed delivery node? I have the max queue set to 6500 and using default context storage. Here is part of my flow:
[
{
"id": "149380c1.63e107",
"type": "subflow",
"name": "Delivery subflow",
"info": "",
"category": "",
"in": [
{
"x": 60,
"y": 80,
"wires": [
{
"id": "6a3f78ab.f6b8e"
}
]
}
],
"out": [
{
"x": 420,
"y": 80,
"wires": [
{
"id": "6a3f78ab.f6b8e",
"port": 0
}
]
}
],
"env": [
{
"name": "controlProperty",
"type": "str",
"value": "control",
"ui": {
"label": {
"en-US": "Property for OK or FAIL"
},
"type": "input",
"opts": {
"types": [
"str",
"env"
]
}
}
},
{
"name": "OKValue",
"type": "str",
"value": "OK",
"ui": {
"label": {
"en-US": "Value of success"
},
"type": "input",
"opts": {
"types": [
"str",
"num",
"bool",
"env"
]
}
}
},
{
"name": "FAILValue",
"type": "str",
"value": "FAIL",
"ui": {
"label": {
"en-US": "Value for failure"
},
"type": "input",
"opts": {
"types": [
"str",
"num",
"bool",
"env"
]
}
}
},
{
"name": "retrySecs",
"type": "num",
"value": "60",
"ui": {
"label": {
"en-US": "Retry period (secs)"
},
"type": "input",
"opts": {
"types": [
"num",
"env"
]
}
}
},
{
"name": "maxQueue",
"type": "num",
"value": "100",
"ui": {
"label": {
"en-US": "Max messages in queue"
},
"type": "input",
"opts": {
"types": [
"str",
"num",
"env"
]
}
}
},
{
"name": "contextStore",
"type": "str",
"value": "default",
"ui": {
"label": {
"en-US": "Context Store to use"
},
"type": "input",
"opts": {
"types": [
"str",
"env"
]
}
}
}
],
"color": "#DDAA99",
"status": {
"x": 420,
"y": 160,
"wires": [
{
"id": "ed779289.25b5d8",
"port": 0
}
]
}
},
{
"id": "6a3f78ab.f6b8e",
"type": "function",
"z": "149380c1.63e107",
"name": "State machine",
"func": "let store = env.get(\"contextStore\")\nif (store === \"default\") store = null\nlet stat = context.get(\"stat\", store) || {state: \"initial\", queue: []}\n// can't use a switch here I think as need to compare against env value\nconst control = msg[env.get(\"controlProperty\")]\n\nif (control === env.get(\"OKValue\")) {\n handleOK(stat)\n} else if (control === env.get(\"FAILValue\")) {\n handleFAIL(stat)\n} else if (control === \"__trigger\") {\n handleTrigger(stat)\n} else {\n // no valid control value so must be incoming message\n handleMessage(msg, stat)\n}\n//node.warn(`state: ${stat.state}`)\n// decide what to do next based on the new state\nswitch (stat.state) {\n case \"initial\":\n case \"waitingForMsg\":\n sendMsg(stat) // send next message if any\n break;\n \n case \"waitingForTrigger\":\n case \"waitingForOKFail\":\n // do nothing\n break;\n}\nnode.status( `${stat.queue.length} ${stat.state}` )\ncontext.set(\"stat\", stat, store)\nreturn null;\n\n// Called when message to be queued is received\nfunction handleMessage(msg, stat) {\n //node.warn(\"handleMessage\")\n // push a clone onto the queue\n stat.queue.push(RED.util.cloneMessage(msg))\n // limit number in queue\n const max = Number(env.get(\"maxQueue\"))\n if (!isNaN(max) && max > 0) {\n // max length hit, remove oldest\n if (stat.queue.length > max) stat.queue.shift()\n }\n // Simulate a trigger event to handle any state change needed\n handleTrigger(stat)\n}\n\n// Called to send the next message off the queue if any, but leaves it on queue\nfunction sendMsg(stat) {\n //node.warn(\"sendMsg\")\n let thisMsg = stat.queue[0]\n if (thisMsg) {\n // send a clone\n //node.warn(\"sending\")\n node.send(RED.util.cloneMessage(thisMsg))\n stat.state = \"waitingForOKFail\"\n } else {\n // nothing in queue\n stat.state = \"waitingForMsg\"\n }\n}\n\n// Called when OK response received\nfunction handleOK(stat) {\n //node.warn(\"handleOK\")\n // ignore if in wrong state\n if (stat.state === \"waitingForOKFail\") {\n // OK received so drop the top message \n stat.queue.shift()\n // set the state to waiting for message, which will allow the next one to be sent\n stat.state = \"waitingForMsg\"\n } else {\n node.warn(\"Ignoring unnexpected OK\")\n }\n}\n\n// Called when FAIL response received\nfunction handleFAIL(stat) {\n //node.warn(\"handleFAIL\")\n // ignore if in wrong state\n if (stat.state === \"waitingForOKFail\") {\n // FAIL received so go to waiting for trigger state\n stat.state = \"waitingForTrigger\"\n } else {\n node.warn(\"Ignoring unnexpected FAIL\")\n }\n}\n\n// Called when a trigger message is received or after a new incoming message is queued\nfunction handleTrigger(stat) {\n //node.warn(\"handleTrigger\")\n if (stat.state === \"waitingForTrigger\") {\n //node.warn(\"state to waiting\")\n // set it to watitingForMsg in order to trigger send \n stat.state = \"waitingForMsg\"\n }\n // ignore for other states\n}",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"x": 280,
"y": 80,
"wires": [
[]
]
},
{
"id": "602725f6.15eee4",
"type": "inject",
"z": "149380c1.63e107",
"name": "Retry ",
"props": [
{
"p": "${controlProperty}",
"v": "__trigger",
"vt": "str"
}
],
"repeat": "${retrySecs}",
"crontab": "",
"once": false,
"onceDelay": 0.1,
"topic": "",
"x": 110,
"y": 180,
"wires": [
[
"6a3f78ab.f6b8e"
]
]
},
{
"id": "ed779289.25b5d8",
"type": "status",
"z": "149380c1.63e107",
"name": "",
"scope": [
"6a3f78ab.f6b8e"
],
"x": 300,
"y": 160,
"wires": [
[]
]
},
{
"id": "4e6e6f4553eb185d",
"type": "subflow:149380c1.63e107",
"z": "b28f7a389dc2457a",
"name": "Guaranteed delivery",
"env": [
{
"name": "maxQueue",
"value": "6500",
"type": "num"
}
],
"x": 1020,
"y": 500,
"wires": [
[
"1c1b9256709676f6"
]
]
},
{
"id": "1c1b9256709676f6",
"type": "MSSQL",
"z": "b28f7a389dc2457a",
"mssqlCN": "64a27a0b.9495fc",
"name": "MSSQL",
"outField": "payload",
"returnType": "0",
"throwErrors": "0",
"query": "",
"modeOpt": "",
"modeOptType": "query",
"queryOpt": "payload",
"queryOptType": "msg",
"paramsOpt": "",
"paramsOptType": "none",
"rows": "",
"rowsType": "msg",
"parseMustache": false,
"params": [],
"x": 1220,
"y": 500,
"wires": [
[
"0f505779c4510777"
]
]
},
{
"id": "0f505779c4510777",
"type": "function",
"z": "b28f7a389dc2457a",
"name": "MSSQL Status ",
"func": "// Prepare Control for Delivery\nif (msg.status && msg.status.text) {\n const statusText = msg.status.text;\n let control;\n\n if (statusText === \"requesting\") {\n control = \"OK\"; // Connected\n } else if (statusText === \"Error\" && msg.status.fill === \"red\") {\n control = \"FAIL\"; // Disconnected, only when fill is red\n } else {\n control = \"__trigger\"; // Retry trigger for other cases\n }\n\n // Check if status object has a 'fill' property\n if (msg.status.fill === \"green\" || msg.status.fill === \"blue\") {\n control = \"OK\"; // Set control to 'OK' for green or blue status\n }\n\n msg.control = control;\n} else {\n msg.control = \"__trigger\"; // Default to retry trigger if status is missing\n}\n\nreturn msg;\n",
"outputs": 1,
"timeout": "",
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 1180,
"y": 580,
"wires": [
[
"4e6e6f4553eb185d"
]
]
},
{
"id": "ba1b071dde105f6d",
"type": "status",
"z": "b28f7a389dc2457a",
"name": "",
"scope": [
"1c1b9256709676f6"
],
"x": 980,
"y": 580,
"wires": [
[
"0f505779c4510777"
]
]
},
{
"id": "64a27a0b.9495fc",
"type": "MSSQL-CN",
"name": "SQL Server",
"server": "localhost",
"encyption": false,
"trustServerCertificate": false,
"database": "YourDatabase",
"useUTC": true,
"connectTimeout": 15000,
"requestTimeout": 15000,
"cancelTimeout": 5000,
"parseJSON": true,
"enableArithAbort": true
}
]