Data Fetch / Persistency

Hi,
Im using Node-red in a limited enviroment as im running it in a 4G router, where i cant install all the pallettes as i wish. I figured out a way to still get some basic ones that where provided by the router brand.

Here we go: Im pulling data constantly from various sensors, gps, time, etc. and I have created a string to use it a msg.topic and send it to the web server. So far so Good , everything is working good until i loose connectivity. Then all the data that i dont sent is lost.

I know i need to create a table and for that im using a switch to validate is the message was sent correctly or not, if not. im writing the string into a .txt file with "append to file" option.

The problem is that now i dont know how to make the system send this .txt and the inmediately deleted after the confirmation of receiving from the server. is it possible or there is a better way to store data strings and then republish them when having 4g connectivity again.

Well with the vague description of your setup, it will be hard to give precise answers. So my first questions are

  1. what is the device you are running NR on?
  2. How much usable memory does the device have?
  3. what version of NR and node.js are you using?
  4. can you edit the settings.js file?
  5. is the 'web server on the same device?
  6. can you use the mqtt nodes of your device?

As for knowing when the device comes back on line, you should be able to use an inject node with the Inject once after [0.1] seconds option checked off. This would trigger when NR starts and you could have a flow that read, sends and deletes the saved data.

You also might be able to use persistent context but that means editing the settings.js file.

Hi Zenofmud

I highly appreciate you quick response, here are some clarifications:

1 Device Runing ICR 3232
Datasheet: https://pacificautomation.com.au/content/files/ProductDataSheets/Advantech/Datasheet_ICR3232.pdf
2 Usable memory 2GB in flash and about 500mb in customizable applications (I don’t know how to access to the root files, sudo doesn’t work on the inbuilt command lines, so I have to install just the apps that Advantech has encapsulated).
3. NR 3.0.2 NJS 18.15.0
4. I wish, but no access to the root.
5. Web server is on a separate location, it’s using 4G comms and a htttp request using GET ( I need to install OpenVPN after that and also struggling )
6. I will try tomorrow at the lab and update.

Also, how do I make an inject node to that turns off and on just when detecting if the 4G signal or the ping to a specific IPaddress ? Because I’m getting the data when I do have signal ( when it’s online) but as this device will travel to record sensoring. It will loose signal through the road. I’m thinking in using a switch to verify the http request output and use a switch, if fails
Store the data in a file with a field status:fails in the position x+1, if the http request goes well, then send all the file , make a copy “file 2” and then clean file.

Is that possible or it’s a better way to do it?
need to store it ,

Have a look at this flow, it is specifically designed to store data if it cannot be sent immediately, and then send it later. https://flows.nodered.org/flow/05e6d61f14ef6af763ec4cfd1049ab61

I am not sure why you have exported all those nodes, is it just the Make Request Inject node through to the http request that we are interested in here?

Yes! should i just put it between the last function and the http request and replace mqtt complete to somewith that analize the HTTP delivery estatus? is there a way to do that?

Thanks Colin, i used the link you shared and I figured out how to convert the http request into mqtt and integrated the persistancy flow into my flow. Now its working!!!! You are a legend. Thanks!

You did not need to convert to MQTT, you could just have used the return code from the http node to send OK or FAIL back to the subflow. However MQTT be a better way of doing it anyway.

Unfortunately I have recently realised that the example MQTT flow given in the example is not reliable, but have not yet updated the flow. This does work reliably however:

[{"id":"3418138ef87c2d58","type":"group","z":"bdd7be38.d3b55","name":"Publishing to MQTT","style":{"label":true},"nodes":["ec54a008d153a091","47e0719cb18a797c","61492772f2839300","2a5700a4463e0a7f","844908ca97278be0","375b6c0ca76abb1d","3480fc2423ad1876","b00a2aa01d8b2d6a","31c942bc3963d973","d37624f402e6140b","1f372fba20b3cc84","25c8d90294cda906","ce51d73fb66b9b19","125e0647eafec278","b1f22cc84dbac0ff"],"x":34,"y":3819,"w":1172,"h":322},{"id":"ae9c6995a156ff5c","type":"subflow","name":"Delivery subflow","info":"","category":"","in":[{"x":50,"y":81,"wires":[{"id":"0b0e1e2df36d2d3a"}]}],"out":[{"x":420,"y":80,"wires":[{"id":"0b0e1e2df36d2d3a","port":0}]}],"env":[{"name":"controlProperty","type":"str","value":"control","ui":{"label":{"en-US":"Property for OK or FAIL"},"type":"input","opts":{"types":["str","env"]}}},{"name":"OKValue","type":"str","value":"OK","ui":{"label":{"en-US":"Value of success"},"type":"input","opts":{"types":["str","num","bool","env"]}}},{"name":"FAILValue","type":"str","value":"FAIL","ui":{"label":{"en-US":"Value for failure"},"type":"input","opts":{"types":["str","num","bool","env"]}}},{"name":"retrySecs","type":"num","value":"60","ui":{"label":{"en-US":"Retry period (secs)"},"type":"input","opts":{"types":["num","env"]}}},{"name":"maxQueue","type":"num","value":"100","ui":{"label":{"en-US":"Max messages in queue"},"type":"input","opts":{"types":["str","num","env"]}}},{"name":"contextStore","type":"str","value":"default","ui":{"label":{"en-US":"Context Store to use"},"type":"input","opts":{"types":["str","env"]}}}],"color":"#DDAA99","status":{"x":420,"y":160,"wires":[{"id":"89ab963214b94f59","port":0}]}},{"id":"e6a7f648cf7f127b","type":"inject","z":"ae9c6995a156ff5c","name":"Retry ","props":[{"p":"${controlProperty}","v":"__trigger","vt":"str"}],"repeat":"${retrySecs}","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":100,"y":181,"wires":[["0b0e1e2df36d2d3a"]]},{"id":"89ab963214b94f59","type":"status","z":"ae9c6995a156ff5c","name":"","scope":["0b0e1e2df36d2d3a"],"x":300,"y":160,"wires":[[]]},{"id":"0b0e1e2df36d2d3a","type":"function","z":"ae9c6995a156ff5c","name":"State machine","func":"let store = env.get(\"contextStore\")\nif (store === \"default\") store = null\nlet stat = context.get(\"stat\", store) || {state: \"initial\", queue: []}\n// can't use a switch here I think as need to compare against env value\nconst control = msg[env.get(\"controlProperty\")]\n\nif (control === env.get(\"OKValue\")) {\n    handleOK(stat)\n} else if (control === env.get(\"FAILValue\")) {\n    handleFAIL(stat)\n} else if (control === \"__trigger\") {\n    handleTrigger(stat)\n} else {\n    // no valid control value so must be incoming message\n    handleMessage(msg, stat)\n}\n//node.warn(`state: ${stat.state}`)\n// decide what to do next based on the new state\nswitch (stat.state) {\n    case \"initial\":\n    case \"waitingForMsg\":\n        sendMsg(stat)   // send next message if any\n        break;\n        \n    case \"waitingForTrigger\":\n    case \"waitingForOKFail\":\n        // do nothing\n        break;\n}\nnode.status( `${stat.queue.length} ${stat.state}` )\ncontext.set(\"stat\", stat, store)\nreturn null;\n\n// Called when message to be queued is received\nfunction handleMessage(msg, stat) {\n    //node.warn(\"handleMessage\")\n    // push a clone onto the queue\n    stat.queue.push(RED.util.cloneMessage(msg))\n    // limit number in queue\n    const max = Number(env.get(\"maxQueue\"))\n    if (!isNaN(max) && max > 0) {\n        // max length hit, remove oldest\n        if (stat.queue.length > max) stat.queue.shift()\n    }\n    // Simulate a trigger event to handle any state change needed\n    handleTrigger(stat)\n}\n\n// Called to send the next message off the queue if any, but leaves it on queue\nfunction sendMsg(stat) {\n    //node.warn(\"sendMsg\")\n    let thisMsg = stat.queue[0]\n    if (thisMsg) {\n        // send a clone\n        //node.warn(\"sending\")\n        node.send(RED.util.cloneMessage(thisMsg))\n        stat.state = \"waitingForOKFail\"\n    } else {\n        // nothing in queue\n        stat.state = \"waitingForMsg\"\n    }\n}\n\n// Called when OK response received\nfunction handleOK(stat) {\n    //node.warn(\"handleOK\")\n    // ignore if in wrong state\n    if (stat.state === \"waitingForOKFail\") {\n        // OK received so drop the top message \n        stat.queue.shift()\n        // set the state to waiting for message, which will allow the next one to be sent\n        stat.state = \"waitingForMsg\"\n    } else {\n        node.warn(\"Ignoring unnexpected OK\")\n    }\n}\n\n// Called when FAIL response received\nfunction handleFAIL(stat) {\n    //node.warn(\"handleFAIL\")\n    // ignore if in wrong state\n    if (stat.state === \"waitingForOKFail\") {\n        // FAIL received so go to waiting for trigger state\n        stat.state = \"waitingForTrigger\"\n    } else {\n        node.warn(\"Ignoring unnexpected FAIL\")\n    }\n}\n\n// Called when a trigger message is received or after a new incoming message is queued\nfunction handleTrigger(stat) {\n    //node.warn(\"handleTrigger\")\n    if (stat.state === \"waitingForTrigger\") {\n        //node.warn(\"state to waiting\")\n        // set it to watitingForMsg in order to trigger send \n        stat.state = \"waitingForMsg\"\n    }\n    // ignore for other states\n}","outputs":1,"noerr":0,"initialize":"","finalize":"","x":280,"y":80,"wires":[[]]},{"id":"ec54a008d153a091","type":"change","z":"bdd7be38.d3b55","g":"3418138ef87c2d58","name":"FAIL","rules":[{"t":"set","p":"control","pt":"msg","to":"FAIL","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":790,"y":4000,"wires":[["47e0719cb18a797c","4f07d289bb428c70","d9b6ec0ccd06bc61"]]},{"id":"47e0719cb18a797c","type":"link out","z":"bdd7be38.d3b55","g":"3418138ef87c2d58","name":"","links":["61492772f2839300"],"x":1165,"y":4000,"wires":[]},{"id":"61492772f2839300","type":"link in","z":"bdd7be38.d3b55","g":"3418138ef87c2d58","name":"","links":["47e0719cb18a797c"],"x":115,"y":3960,"wires":[["d37624f402e6140b"]]},{"id":"2a5700a4463e0a7f","type":"comment","z":"bdd7be38.d3b55","g":"3418138ef87c2d58","name":"Send MQTT messages to this link","info":"","x":190,"y":3860,"wires":[]},{"id":"844908ca97278be0","type":"mqtt out","z":"bdd7be38.d3b55","g":"3418138ef87c2d58","name":"","topic":"","qos":"1","retain":"false","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"e3d45312.a3103","x":690,"y":3900,"wires":[]},{"id":"375b6c0ca76abb1d","type":"trigger","z":"bdd7be38.d3b55","g":"3418138ef87c2d58","name":"Timeout 5 secs","op1":"","op2":"0","op1type":"nul","op2type":"num","duration":"5","extend":false,"overrideDelay":false,"units":"s","reset":"","bytopic":"all","topic":"topic","outputs":1,"x":640,"y":4000,"wires":[["ec54a008d153a091"]]},{"id":"3480fc2423ad1876","type":"comment","z":"bdd7be38.d3b55","g":"3418138ef87c2d58","name":"Adjust timeout time if necessary","info":"","x":670,"y":3960,"wires":[]},{"id":"b00a2aa01d8b2d6a","type":"complete","z":"bdd7be38.d3b55","g":"3418138ef87c2d58","name":"MQTT Complete","scope":["844908ca97278be0"],"uncaught":false,"x":160,"y":4100,"wires":[["31c942bc3963d973","543334d86d6e60da","25c8d90294cda906"]]},{"id":"31c942bc3963d973","type":"change","z":"bdd7be38.d3b55","g":"3418138ef87c2d58","name":"","rules":[{"t":"set","p":"reset","pt":"msg","to":"true","tot":"bool"}],"action":"","property":"","from":"","to":"","reg":false,"x":380,"y":4000,"wires":[["375b6c0ca76abb1d"]]},{"id":"d37624f402e6140b","type":"subflow:ae9c6995a156ff5c","z":"bdd7be38.d3b55","g":"3418138ef87c2d58","name":"Guaranteed delivery test","env":[{"name":"maxQueue","value":"0","type":"num"}],"x":390,"y":3900,"wires":[["375b6c0ca76abb1d","844908ca97278be0","ce51d73fb66b9b19","9d88a9ced9f72851"]]},{"id":"1f372fba20b3cc84","type":"join","z":"bdd7be38.d3b55","g":"3418138ef87c2d58","name":"","mode":"custom","build":"object","property":"payload","propertyType":"msg","key":"key","joiner":"\\n","joinerType":"str","accumulate":true,"timeout":"","count":"1","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":750,"y":4100,"wires":[["125e0647eafec278","b95330923bc779b7"]]},{"id":"25c8d90294cda906","type":"change","z":"bdd7be38.d3b55","g":"3418138ef87c2d58","name":"key: completed, save topic and payload","rules":[{"t":"set","p":"key","pt":"msg","to":"completed","tot":"str"},{"t":"move","p":"payload","pt":"msg","to":"payload.data","tot":"msg"},{"t":"set","p":"payload.topic","pt":"msg","to":"topic","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":500,"y":4100,"wires":[["1f372fba20b3cc84"]]},{"id":"ce51d73fb66b9b19","type":"change","z":"bdd7be38.d3b55","g":"3418138ef87c2d58","name":"key: sending, save topic and payload","rules":[{"t":"set","p":"key","pt":"msg","to":"sending","tot":"str"},{"t":"move","p":"payload","pt":"msg","to":"payload.data","tot":"msg"},{"t":"set","p":"payload.topic","pt":"msg","to":"topic","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":490,"y":4060,"wires":[["1f372fba20b3cc84","5279276f081efde0"]]},{"id":"125e0647eafec278","type":"function","z":"bdd7be38.d3b55","g":"3418138ef87c2d58","name":"Check messages match","func":"/** Check that the message that is has been sent (completed) is the one that has just been passed to the MQTT node (sending)\n * payload.sending contains last msg sent to MQTT node\n * payload.completed contains msg indicated completed by MQTT node\n * in each case the payload contains topic of message and payload value in data\n */ \nconst previousKey = context.get(\"previousKey\") || \"\"\nconst thisKey = msg.key\n\n//node.warn(typeof msg.payload.sending.data)\nif (thisKey === \"completed\"  &&  previousKey === \"sending\") {\n    let equal = false           // whether messages are equivalent\n    // topics must be the same\n    if (msg.payload.sending.topic === msg.payload.completed.topic) {\n        if (typeof msg.payload.sending.data === \"string\") {\n            equal = compareDirect(msg.payload.sending.data, msg.payload.completed.data)\n        } else if (Buffer.isBuffer(msg.payload.sending.data)) {\n            equal = compareBuffers(msg.payload.sending.data, msg.payload.completed.data)\n        } else {\n            equal = compareJSON(msg.payload.sending.data, msg.payload.completed.data)\n        }\n    }\n    if (equal) {\n        msg.control = \"OK\"\n    } else {\n        msg.control = \"FAIL\"\n    }\n} else {\n    // out of sequence so ignore\n    msg = null\n}\ncontext.set(\"previousKey\", thisKey)\nreturn msg;\n\nfunction compareJSON(sending, completed) {\n    // compares JSON version of payload sent with that from the Complete node\n    return completed === JSON.stringify(sending)\n}\n\nfunction compareDirect(sending, completed) {\n    // directly compares payload sent with that from the Complete node\n    return completed === sending \n}\n\nfunction compareBuffers(sending, completed) {\n    // compares buffer payload sent with that from the Complete node\n    let answer = false\n    // check completed is also a buffer\n    if (Buffer.isBuffer(completed)) {\n        answer = Buffer.compare(sending, completed) === 0\n    }\n    return answer \n}","outputs":1,"timeout":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":950,"y":4100,"wires":[["47e0719cb18a797c","4f07d289bb428c70","d9b6ec0ccd06bc61"]]},{"id":"b1f22cc84dbac0ff","type":"link in","z":"bdd7be38.d3b55","g":"3418138ef87c2d58","name":"Send to MQTT","links":["5106df9ead6196ac"],"x":140,"y":3900,"wires":[["d37624f402e6140b"]],"l":true},{"id":"e3d45312.a3103","type":"mqtt-broker","name":"","broker":"owl2.local","port":"1883","clientid":"tigger","autoConnect":true,"usetls":false,"protocolVersion":"4","keepalive":"60","cleansession":true,"autoUnsubscribe":true,"birthTopic":"tydwr/tigger/LWT","birthQos":"0","birthRetain":"true","birthPayload":"Online","birthMsg":{},"closeTopic":"tydwr/tigger/LWT","closeQos":"0","closePayload":"Offline","closeMsg":{},"willTopic":"tydwr/tigger/LWT","willQos":"0","willPayload":"Offline","willMsg":{},"sessionExpiry":""}]

Thanks Colin! You are the best!
That last flow works really smooth and reliable.
While troubleshooting comms between my router and my mosquitto server, i realized that the QoS value for my project was 0 as i need to keep it alive all the time when losing comms. Hope it helps someone else with the same problem.

I don't know what you mean by that. QoS is nothing to do with keeping alive.


This was the message from the server, my system is sending messages every 7 secs all day and night, and after 1 day it stoped receiving data until i chaged QoS value to 0, now is working fine. Probably is something else causing the problem with another QoS value?

Which server, node red or MQTT or what?
Where exactly were you seeing those messages?

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.