Hi any easy way to add queueing of messages with persistence on the hard drive?
I need something that removes messages from the queue on a successful http 201 code.
Have you checked the flows library to see if a custom node exists?
I looked at:
node-red-contrib-simple-message-queue
node-red-contrib-queue-gate
node-red-contrib-msg-queue
I'm not doing great at finding it.
They don't seem to have that build in. Have you seen any that have it in?
Honestly, can't say I've looked. A quick search for the keyword "persist" brought back quite a few entries. Couldn't immediately see anything that does exactly what you've said but I would imagine that some of them could be slotted into a suitable flow.
I'm afraid it is a case of trying a bunch to see if you can get the effect you want.
I've for a while been thinking about a more comprehensive context store backed by a DB and with eventing so that you could have a node that listens for an update. I started looking at GunDB which seemed promising and I even created a quick and dirty experimental node for it - but it is quite hard to work with.
You could probably also create a flow that does what you want with the built-in file based context store. Just watch out for memory use and the default 20sec write delay (which can be changed).
Though actually I have to update that flow, I have made some significant improvements and simplifications. Here is the latest version with the example showing email sending.
[{"id":"149380c1.63e107","type":"subflow","name":"Delivery subflow master copy","info":"","category":"","in":[{"x":60,"y":80,"wires":[{"id":"6a3f78ab.f6b8e"}]}],"out":[{"x":420,"y":80,"wires":[{"id":"6a3f78ab.f6b8e","port":0}]}],"env":[{"name":"controlProperty","type":"str","value":"control","ui":{"label":{"en-US":"Property for OK or FAIL"},"type":"input","opts":{"types":["str","env"]}}},{"name":"OKValue","type":"str","value":"OK","ui":{"label":{"en-US":"Value of success"},"type":"input","opts":{"types":["str","num","bool","env"]}}},{"name":"FAILValue","type":"str","value":"FAIL","ui":{"label":{"en-US":"Value for failure"},"type":"input","opts":{"types":["str","num","bool","env"]}}},{"name":"retrySecs","type":"num","value":"60","ui":{"label":{"en-US":"Retry period (secs)"},"type":"input","opts":{"types":["num","env"]}}},{"name":"maxQueue","type":"num","value":"100","ui":{"label":{"en-US":"Max messages in queue"},"type":"input","opts":{"types":["str","num","env"]}}},{"name":"contextStore","type":"str","value":"default","ui":{"label":{"en-US":"Context Store to use"},"type":"input","opts":{"types":["str","env"]}}}],"color":"#DDAA99","status":{"x":420,"y":160,"wires":[{"id":"ed779289.25b5d8","port":0}]}},{"id":"6a3f78ab.f6b8e","type":"function","z":"149380c1.63e107","name":"State machine","func":"let store = env.get(\"contextStore\")\nif (store === \"default\") store = null\nlet stat = context.get(\"stat\", store) || {state: \"initial\", queue: []}\n// can't use a switch here I think as need to compare against env value\nconst control = msg[env.get(\"controlProperty\")]\n\nif (control === env.get(\"OKValue\")) {\n handleOK(stat)\n} else if (control === env.get(\"FAILValue\")) {\n handleFAIL(stat)\n} else if (control === \"__trigger\") {\n handleTrigger(stat)\n} else {\n // no valid control value so must be incoming message\n handleMessage(msg, stat)\n}\n//node.warn(`state: ${stat.state}`)\n// decide what to do next based on the new state\nswitch (stat.state) {\n case \"initial\":\n case \"waitingForMsg\":\n sendMsg(stat) // send next message if any\n break;\n \n case \"waitingForTrigger\":\n case \"waitingForOKFail\":\n // do nothing\n break;\n}\nnode.status( `${stat.queue.length} ${stat.state}` )\ncontext.set(\"stat\", stat, store)\nreturn null;\n\n// Called when message to be queued is received\nfunction handleMessage(msg, stat) {\n //node.warn(\"handleMessage\")\n // push a clone onto the queue\n stat.queue.push(RED.util.cloneMessage(msg))\n // limit number in queue\n const max = Number(env.get(\"maxQueue\"))\n if (!isNaN(max) && max > 0) {\n // max length hit, remove oldest\n if (stat.queue.length > max) stat.queue.shift()\n }\n // Simulate a trigger event to handle any state change needed\n handleTrigger(stat)\n}\n\n// Called to send the next message off the queue if any, but leaves it on queue\nfunction sendMsg(stat) {\n //node.warn(\"sendMsg\")\n let thisMsg = stat.queue[0]\n if (thisMsg) {\n // send a clone\n //node.warn(\"sending\")\n node.send(RED.util.cloneMessage(thisMsg))\n stat.state = \"waitingForOKFail\"\n } else {\n // nothing in queue\n stat.state = \"waitingForMsg\"\n }\n}\n\n// Called when OK response received\nfunction handleOK(stat) {\n //node.warn(\"handleOK\")\n // ignore if in wrong state\n if (stat.state === \"waitingForOKFail\") {\n // OK received so drop the top message \n stat.queue.shift()\n // set the state to waiting for message, which will allow the next one to be sent\n stat.state = \"waitingForMsg\"\n } else {\n node.warn(\"Ignoring unnexpected OK\")\n }\n}\n\n// Called when FAIL response received\nfunction handleFAIL(stat) {\n //node.warn(\"handleFAIL\")\n // ignore if in wrong state\n if (stat.state === \"waitingForOKFail\") {\n // FAIL received so go to waiting for trigger state\n stat.state = \"waitingForTrigger\"\n } else {\n node.warn(\"Ignoring unnexpected FAIL\")\n }\n}\n\n// Called when a trigger message is received or after a new incoming message is queued\nfunction handleTrigger(stat) {\n //node.warn(\"handleTrigger\")\n if (stat.state === \"waitingForTrigger\") {\n //node.warn(\"state to waiting\")\n // set it to watitingForMsg in order to trigger send \n stat.state = \"waitingForMsg\"\n }\n // ignore for other states\n}","outputs":1,"noerr":0,"initialize":"","finalize":"","x":280,"y":80,"wires":[[]]},{"id":"602725f6.15eee4","type":"inject","z":"149380c1.63e107","name":"Retry ","props":[{"p":"${controlProperty}","v":"__trigger","vt":"str"}],"repeat":"${retrySecs}","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":110,"y":180,"wires":[["6a3f78ab.f6b8e"]]},{"id":"ed779289.25b5d8","type":"status","z":"149380c1.63e107","name":"","scope":["6a3f78ab.f6b8e"],"x":300,"y":160,"wires":[[]]},{"id":"5117df7b.f0f1c8","type":"link out","z":"5325318f.e22788","name":"","links":["abf7063b.6236f"],"x":695,"y":320,"wires":[]},{"id":"f11ec525.632c28","type":"change","z":"5325318f.e22788","name":"OK","rules":[{"t":"set","p":"control","pt":"msg","to":"OK","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":530,"y":320,"wires":[["5117df7b.f0f1c8"]]},{"id":"a2c8d2c2.b5b55","type":"change","z":"5325318f.e22788","name":"FAIL","rules":[{"t":"set","p":"control","pt":"msg","to":"FAIL","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":530,"y":360,"wires":[["5117df7b.f0f1c8"]]},{"id":"7f85d193.f4b8c","type":"inject","z":"5325318f.e22788","name":"Send test email","props":[{"p":"payload"},{"p":"topic","vt":"str"},{"p":"to","v":"someone@gmail.com","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"email subject","payload":"This is the email contents","payloadType":"str","x":140,"y":40,"wires":[["1d0aae54.522262"]]},{"id":"abf7063b.6236f","type":"link in","z":"5325318f.e22788","name":"","links":["5117df7b.f0f1c8"],"x":155,"y":280,"wires":[["60d7d8a0.88859"]]},{"id":"41a50eb7.1646d","type":"e-mail","z":"5325318f.e22788","server":"your server","port":"465","secure":true,"tls":false,"name":"","dname":"","x":330,"y":260,"wires":[]},{"id":"a3d8d998.887698","type":"complete","z":"5325318f.e22788","name":"","scope":["41a50eb7.1646d"],"uncaught":false,"x":350,"y":320,"wires":[["f11ec525.632c28"]]},{"id":"caf8c63f.c19c8","type":"catch","z":"5325318f.e22788","name":"","scope":["41a50eb7.1646d"],"uncaught":false,"x":330,"y":360,"wires":[["a2c8d2c2.b5b55"]]},{"id":"7e7c9c9e.b67d24","type":"link in","z":"5325318f.e22788","name":"Email delivery","links":["9affd2f8.4da0e8","dd379354.00ddd8"],"x":135,"y":180,"wires":[["60d7d8a0.88859","6f44414a.9777a8"]]},{"id":"9affd2f8.4da0e8","type":"link out","z":"5325318f.e22788","name":"","links":["7e7c9c9e.b67d24"],"x":475,"y":40,"wires":[]},{"id":"60d7d8a0.88859","type":"subflow:149380c1.63e107","z":"5325318f.e22788","name":"Guaranteed delivery","env":[{"name":"maxQueue","value":"10","type":"num"}],"x":340,"y":180,"wires":[["41a50eb7.1646d","84354f19.d4d188"]]},{"id":"549df035.bae4a8","type":"comment","z":"5325318f.e22788","name":"Send email messages to this link","info":"","x":150,"y":140,"wires":[]},{"id":"1d0aae54.522262","type":"function","z":"5325318f.e22788","name":"Message counter","func":"let count = context.get(\"count\") || 0\nmsg.payload = `Message number ${count}`\ncontext.set(\"count\", count + 1)\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","x":350,"y":40,"wires":[["9affd2f8.4da0e8"]]},{"id":"6f44414a.9777a8","type":"debug","z":"5325318f.e22788","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":160,"y":400,"wires":[]},{"id":"84354f19.d4d188","type":"debug","z":"5325318f.e22788","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":240,"y":480,"wires":[]}]
To use it for your purpose replace the email node with your http code and on success or failure (however you decide that) feed a message into the appropriate change node, where the Catch and Complete nodes are now. Double click the subflow and you can enter the retry period to use for retrying failed messages, the max number of messages that will be queued and the context store to use. If you want the queue to survive node-red restart then use a persistent store here obviously.
Thanks brilliant.
This topic was automatically closed 60 days after the last reply. New replies are no longer allowed.