Guaranteed delivery of SQL insert data

I am using the Guaranteed delivery of data https://flows.nodered.org/flow/05e6d61f14ef6af763ec4cfd1049ab61 in my workflow on V3.1.0 of node-red. It seemed to be working ok buffering data to SQL. I let a test run over the weekend and the buffered data did not get sent to the SQL server after the connection was reestablished. Could someone sense check my function block that sends the control to the Guaranteed delivery node? I have the max queue set to 6500 and using default context storage. Here is part of my flow:
Buffer_Node_RED

[
    {
        "id": "149380c1.63e107",
        "type": "subflow",
        "name": "Delivery subflow",
        "info": "",
        "category": "",
        "in": [
            {
                "x": 60,
                "y": 80,
                "wires": [
                    {
                        "id": "6a3f78ab.f6b8e"
                    }
                ]
            }
        ],
        "out": [
            {
                "x": 420,
                "y": 80,
                "wires": [
                    {
                        "id": "6a3f78ab.f6b8e",
                        "port": 0
                    }
                ]
            }
        ],
        "env": [
            {
                "name": "controlProperty",
                "type": "str",
                "value": "control",
                "ui": {
                    "label": {
                        "en-US": "Property for OK or FAIL"
                    },
                    "type": "input",
                    "opts": {
                        "types": [
                            "str",
                            "env"
                        ]
                    }
                }
            },
            {
                "name": "OKValue",
                "type": "str",
                "value": "OK",
                "ui": {
                    "label": {
                        "en-US": "Value of success"
                    },
                    "type": "input",
                    "opts": {
                        "types": [
                            "str",
                            "num",
                            "bool",
                            "env"
                        ]
                    }
                }
            },
            {
                "name": "FAILValue",
                "type": "str",
                "value": "FAIL",
                "ui": {
                    "label": {
                        "en-US": "Value for failure"
                    },
                    "type": "input",
                    "opts": {
                        "types": [
                            "str",
                            "num",
                            "bool",
                            "env"
                        ]
                    }
                }
            },
            {
                "name": "retrySecs",
                "type": "num",
                "value": "60",
                "ui": {
                    "label": {
                        "en-US": "Retry period (secs)"
                    },
                    "type": "input",
                    "opts": {
                        "types": [
                            "num",
                            "env"
                        ]
                    }
                }
            },
            {
                "name": "maxQueue",
                "type": "num",
                "value": "100",
                "ui": {
                    "label": {
                        "en-US": "Max messages in queue"
                    },
                    "type": "input",
                    "opts": {
                        "types": [
                            "str",
                            "num",
                            "env"
                        ]
                    }
                }
            },
            {
                "name": "contextStore",
                "type": "str",
                "value": "default",
                "ui": {
                    "label": {
                        "en-US": "Context Store to use"
                    },
                    "type": "input",
                    "opts": {
                        "types": [
                            "str",
                            "env"
                        ]
                    }
                }
            }
        ],
        "color": "#DDAA99",
        "status": {
            "x": 420,
            "y": 160,
            "wires": [
                {
                    "id": "ed779289.25b5d8",
                    "port": 0
                }
            ]
        }
    },
    {
        "id": "6a3f78ab.f6b8e",
        "type": "function",
        "z": "149380c1.63e107",
        "name": "State machine",
        "func": "let store = env.get(\"contextStore\")\nif (store === \"default\") store = null\nlet stat = context.get(\"stat\", store) || {state: \"initial\", queue: []}\n// can't use a switch here I think as need to compare against env value\nconst control = msg[env.get(\"controlProperty\")]\n\nif (control === env.get(\"OKValue\")) {\n    handleOK(stat)\n} else if (control === env.get(\"FAILValue\")) {\n    handleFAIL(stat)\n} else if (control === \"__trigger\") {\n    handleTrigger(stat)\n} else {\n    // no valid control value so must be incoming message\n    handleMessage(msg, stat)\n}\n//node.warn(`state: ${stat.state}`)\n// decide what to do next based on the new state\nswitch (stat.state) {\n    case \"initial\":\n    case \"waitingForMsg\":\n        sendMsg(stat)   // send next message if any\n        break;\n        \n    case \"waitingForTrigger\":\n    case \"waitingForOKFail\":\n        // do nothing\n        break;\n}\nnode.status( `${stat.queue.length} ${stat.state}` )\ncontext.set(\"stat\", stat, store)\nreturn null;\n\n// Called when message to be queued is received\nfunction handleMessage(msg, stat) {\n    //node.warn(\"handleMessage\")\n    // push a clone onto the queue\n    stat.queue.push(RED.util.cloneMessage(msg))\n    // limit number in queue\n    const max = Number(env.get(\"maxQueue\"))\n    if (!isNaN(max) && max > 0) {\n        // max length hit, remove oldest\n        if (stat.queue.length > max) stat.queue.shift()\n    }\n    // Simulate a trigger event to handle any state change needed\n    handleTrigger(stat)\n}\n\n// Called to send the next message off the queue if any, but leaves it on queue\nfunction sendMsg(stat) {\n    //node.warn(\"sendMsg\")\n    let thisMsg = stat.queue[0]\n    if (thisMsg) {\n        // send a clone\n        //node.warn(\"sending\")\n        node.send(RED.util.cloneMessage(thisMsg))\n        stat.state = \"waitingForOKFail\"\n    } else {\n        // nothing in queue\n        stat.state = \"waitingForMsg\"\n    }\n}\n\n// Called when OK response received\nfunction handleOK(stat) {\n    //node.warn(\"handleOK\")\n    // ignore if in wrong state\n    if (stat.state === \"waitingForOKFail\") {\n        // OK received so drop the top message \n        stat.queue.shift()\n        // set the state to waiting for message, which will allow the next one to be sent\n        stat.state = \"waitingForMsg\"\n    } else {\n        node.warn(\"Ignoring unnexpected OK\")\n    }\n}\n\n// Called when FAIL response received\nfunction handleFAIL(stat) {\n    //node.warn(\"handleFAIL\")\n    // ignore if in wrong state\n    if (stat.state === \"waitingForOKFail\") {\n        // FAIL received so go to waiting for trigger state\n        stat.state = \"waitingForTrigger\"\n    } else {\n        node.warn(\"Ignoring unnexpected FAIL\")\n    }\n}\n\n// Called when a trigger message is received or after a new incoming message is queued\nfunction handleTrigger(stat) {\n    //node.warn(\"handleTrigger\")\n    if (stat.state === \"waitingForTrigger\") {\n        //node.warn(\"state to waiting\")\n        // set it to watitingForMsg in order to trigger send \n        stat.state = \"waitingForMsg\"\n    }\n    // ignore for other states\n}",
        "outputs": 1,
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "x": 280,
        "y": 80,
        "wires": [
            []
        ]
    },
    {
        "id": "602725f6.15eee4",
        "type": "inject",
        "z": "149380c1.63e107",
        "name": "Retry ",
        "props": [
            {
                "p": "${controlProperty}",
                "v": "__trigger",
                "vt": "str"
            }
        ],
        "repeat": "${retrySecs}",
        "crontab": "",
        "once": false,
        "onceDelay": 0.1,
        "topic": "",
        "x": 110,
        "y": 180,
        "wires": [
            [
                "6a3f78ab.f6b8e"
            ]
        ]
    },
    {
        "id": "ed779289.25b5d8",
        "type": "status",
        "z": "149380c1.63e107",
        "name": "",
        "scope": [
            "6a3f78ab.f6b8e"
        ],
        "x": 300,
        "y": 160,
        "wires": [
            []
        ]
    },
    {
        "id": "4e6e6f4553eb185d",
        "type": "subflow:149380c1.63e107",
        "z": "b28f7a389dc2457a",
        "name": "Guaranteed delivery",
        "env": [
            {
                "name": "maxQueue",
                "value": "6500",
                "type": "num"
            }
        ],
        "x": 1020,
        "y": 500,
        "wires": [
            [
                "1c1b9256709676f6"
            ]
        ]
    },
    {
        "id": "1c1b9256709676f6",
        "type": "MSSQL",
        "z": "b28f7a389dc2457a",
        "mssqlCN": "64a27a0b.9495fc",
        "name": "MSSQL",
        "outField": "payload",
        "returnType": "0",
        "throwErrors": "0",
        "query": "",
        "modeOpt": "",
        "modeOptType": "query",
        "queryOpt": "payload",
        "queryOptType": "msg",
        "paramsOpt": "",
        "paramsOptType": "none",
        "rows": "",
        "rowsType": "msg",
        "parseMustache": false,
        "params": [],
        "x": 1220,
        "y": 500,
        "wires": [
            [
                "0f505779c4510777"
            ]
        ]
    },
    {
        "id": "0f505779c4510777",
        "type": "function",
        "z": "b28f7a389dc2457a",
        "name": "MSSQL Status ",
        "func": "// Prepare Control for Delivery\nif (msg.status && msg.status.text) {\n    const statusText = msg.status.text;\n    let control;\n\n    if (statusText === \"requesting\") {\n        control = \"OK\"; // Connected\n    } else if (statusText === \"Error\" && msg.status.fill === \"red\") {\n        control = \"FAIL\"; // Disconnected, only when fill is red\n    } else {\n        control = \"__trigger\"; // Retry trigger for other cases\n    }\n\n    // Check if status object has a 'fill' property\n    if (msg.status.fill === \"green\" || msg.status.fill === \"blue\") {\n        control = \"OK\"; // Set control to 'OK' for green or blue status\n    }\n\n    msg.control = control;\n} else {\n    msg.control = \"__trigger\"; // Default to retry trigger if status is missing\n}\n\nreturn msg;\n",
        "outputs": 1,
        "timeout": "",
        "noerr": 0,
        "initialize": "",
        "finalize": "",
        "libs": [],
        "x": 1180,
        "y": 580,
        "wires": [
            [
                "4e6e6f4553eb185d"
            ]
        ]
    },
    {
        "id": "ba1b071dde105f6d",
        "type": "status",
        "z": "b28f7a389dc2457a",
        "name": "",
        "scope": [
            "1c1b9256709676f6"
        ],
        "x": 980,
        "y": 580,
        "wires": [
            [
                "0f505779c4510777"
            ]
        ]
    },
    {
        "id": "64a27a0b.9495fc",
        "type": "MSSQL-CN",
        "name": "SQL Server",
        "server": "localhost",
        "encyption": false,
        "trustServerCertificate": false,
        "database": "YourDatabase",
        "useUTC": true,
        "connectTimeout": 15000,
        "requestTimeout": 15000,
        "cancelTimeout": 5000,
        "parseJSON": true,
        "enableArithAbort": true
    }
]

Does the SQL node trigger Complete and Error nodes? If so you would be better to use those.

No, currently I am only using the Status node to trigger the Guaranteed delivery node. I will try creating Complete and Error nodes to go after my MSSQL node.

I meant Complete and Catch. Link them to the sql node, and if it is operating as it should then it should send a Complete message on success and a Catch message on failure, so you can use exactly the flow as shown in the example for the email node.

There is no need to send __trigger as you are doing in your function, just set an appropriate retry time in the subflow properties.

If you are using mssql-plus you can set the options to throw errors which can be used with a catch node.

Does it trigger Complete when it succeeds, but does not trigger Complete if it fails?

I forget. But when in throw mode, it ONLY outputs upon success - meaning there are enough signals to reliably control a queue. (I know, because I used that in my previous job and it did multiple millions of operations)

This topic was automatically closed 60 days after the last reply. New replies are no longer allowed.