[Updated] @tonymacdonald/node-red-rapt-pull Pull data for one or more RAPT Pill hydrometers into Node-RED

Just published the following package

#@tonymacdonald/node-red-rapt-pull

Simple node to pull data from RAPT cloud for integration into Node-RED flows.

RAPT Pill is a floating digital hydrometer and thermometer produced by KegLand.
It pushes periodic telemetry data to the RAPT cloud via WiFi or indirectly via Bluetooth through a compatible KegLand device.

The rapt-pull node uses the RAPT API v1 to request device or telemetry information from the RAPT cloud so that it can be integrated into a Node-RED flow.

Currently the following API endpoints are supported

/api​/Hydrometers​/GetHydrometers
/api​/Hydrometers​/GetHydrometer
/api​/Hydrometers​/GetTelemetry

Each instance of the rapt-pull node is configured to invoke one of the supported endpoints.

When configured for GetHydrometers, it will return an array of objects containing details for all hydrometers registered against your RAPT account.
When configured for GetHydrometer, it will return an object containing details about a single hydrometer.
When configured for GetTelemetry, it will return an array of objects containing telemetry information for a specific time period.

By chaining node instances together it is very easy to create a flow to get exactly the information you would like.

If you have a RAPT Pill, give this node a try and let me know what you think.

Upversioned to 0.9.1

Updated to move timestamp of last query into node context and keep separate values for each hydrometer for those with more than one device.
Node status now indicates when a query is in progress.

If you are using this node, let me know how you find it.

I'm able to pull data easily with a inject and debug nodes. I'd be interested in seeing your full flow. I'm just getting started with API's. I have a bunch of this built out using Webhooks and other stuff built out using MQTT but since Rapt doesn't support MQTT, I think using the API ultimately be the best option but need to built out the rest of the flow for it.

My dashboards work with data from a local influx db.
I am using the rapt-pull node to get data out of the rapt cloud and into my local db for the dashboard flows to use it.
The following flow is polling rapt-cloud hourly for telemetry info and sending telemetry results to MQTT.

[{"id":"6e0d669da7c138be","type":"tab","label":"Periodic poll for new telemetry data","disabled":false,"info":"","env":[]},{"id":"dd6039a03faaf985","type":"inject","z":"6e0d669da7c138be","name":"","props":[{"p":"payload"}],"repeat":"3600","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":150,"y":220,"wires":[["d242ff28f6ae7e46"]]},{"id":"d242ff28f6ae7e46","type":"rapt-pull","z":"6e0d669da7c138be","account":"","name":"","endpoint":"GetHydrometers","topic":"","split":false,"x":340,"y":220,"wires":[["50fa3f82640b7e95","6ca1cdc2a186bac0"]]},{"id":"50fa3f82640b7e95","type":"debug","z":"6e0d669da7c138be","name":"debug 13","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":320,"y":280,"wires":[]},{"id":"6ca1cdc2a186bac0","type":"rapt-pull","z":"6e0d669da7c138be","account":"","name":"","endpoint":"GetTelemetry","topic":"RaptPillTelemetry","split":true,"x":550,"y":220,"wires":[["372ee63bdf5b7fbe","c5ce8a8218687b79"]]},{"id":"372ee63bdf5b7fbe","type":"debug","z":"6e0d669da7c138be","name":"debug 15","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":560,"y":280,"wires":[]},{"id":"c5ce8a8218687b79","type":"mqtt out","z":"6e0d669da7c138be","name":"","topic":"","qos":"","retain":"","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"1984f0b2.c48537","x":730,"y":220,"wires":[]},{"id":"9ed3fb813b6adc09","type":"comment","z":"6e0d669da7c138be","name":"Set inject to repeat hourly","info":"","x":170,"y":160,"wires":[]},{"id":"78f0ab7033ba90bb","type":"comment","z":"6e0d669da7c138be","name":"startDate for API request - defaults to the last time this node was triggered or the current time","info":"","x":800,"y":140,"wires":[]},{"id":"34a3d31da66618f9","type":"comment","z":"6e0d669da7c138be","name":"endDate for API request - defaults to current time","info":"","x":660,"y":180,"wires":[]},{"id":"972c1db6e3e0daf0","type":"comment","z":"6e0d669da7c138be","name":"Gets telemetry data since the node was last triggered","info":"","x":680,"y":100,"wires":[]},{"id":"794308d76363e214","type":"comment","z":"6e0d669da7c138be","name":"Periodically poll for new telemetry data","info":"","x":210,"y":100,"wires":[]},{"id":"1984f0b2.c48537","type":"mqtt-broker","name":"pi4broker","broker":"192.168.0.195","port":"1883","clientid":"","usetls":false,"compatmode":false,"keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","closeTopic":"","closeQos":"0","closePayload":"","willTopic":"","willQos":"0","willPayload":""}]

A second flow is receiving the telemetry events from MQTT and integrating the info into my influx db. I am using the "createdOn" field of the telemetry record to generate a time stamp for the DB entries.

[{"id":"42a59e4053b8f023","type":"tab","label":"RAPT Pill telemetry integration","disabled":false,"info":"","env":[]},{"id":"cca1478f60aae8a0","type":"mqtt in","z":"42a59e4053b8f023","name":"","topic":"RaptPillTelemetry","qos":"2","datatype":"auto-detect","broker":"16804d22.537d83","nl":false,"rap":true,"rh":0,"inputs":0,"x":123.5,"y":310,"wires":[["66904c598f61195b","048452161233272a"]]},{"id":"d6a6d020a69ad085","type":"debug","z":"42a59e4053b8f023","name":"debug 7","active":false,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":826.5,"y":268,"wires":[]},{"id":"66904c598f61195b","type":"function","z":"42a59e4053b8f023","name":"format info for storage in temperature and telemetry measurements ","func":"const payload = msg.payload;\nlet myDate = new Date(payload.createdOn);\nvar timestamp = myDate.getTime() * 1000000;\nmsg.timestamp = timestamp;\n//look up name associated with mac address\nlet pillInfo = flow.get(\"pillInfo\")||{};\nlet location = pillInfo[payload.macAddress] || payload.macAddress;\n// send temp info to temperature table\nvar tempkeys = {};\ntempkeys.time = timestamp;\ntempkeys.value = payload.temperature\nvar temptags = {};\ntemptags.location = location;\nvar tempmsg = {payload: [tempkeys,temptags]}\ntempmsg.measurement = \"temperature\";\nnode.send(tempmsg);\n\npayload.time = timestamp;\nmsg.payload = [payload,temptags];\nmsg.measurement = \"telemetry\"\nreturn msg;","outputs":1,"timeout":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":477.5,"y":310,"wires":[["d6a6d020a69ad085","273bcb83b64157c8"]]},{"id":"048452161233272a","type":"debug","z":"42a59e4053b8f023","name":"debug 8","active":false,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":307.5,"y":263,"wires":[]},{"id":"273bcb83b64157c8","type":"influxdb out","z":"42a59e4053b8f023","influxdb":"9ae97e25.17f048","name":"","measurement":"","precision":"","retentionPolicy":"","database":"database","precisionV18FluxV20":"ms","retentionPolicyV18Flux":"","org":"organisation","bucket":"bucket","x":857.5,"y":309,"wires":[]},{"id":"b3102e38ad0e0da7","type":"mqtt in","z":"42a59e4053b8f023","name":"","topic":"RaptPillInfo","qos":"2","datatype":"auto-detect","broker":"16804d22.537d83","nl":false,"rap":true,"rh":0,"inputs":0,"x":103.5,"y":153,"wires":[["52339b5127d45c98","c16b99c793cb8220"]]},{"id":"52339b5127d45c98","type":"debug","z":"42a59e4053b8f023","name":"debug 9","active":false,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":273.5,"y":107,"wires":[]},{"id":"c16b99c793cb8220","type":"function","z":"42a59e4053b8f023","name":"merge pill information into flow variable for name lookup","func":"let payload = msg.payload;\nlet pillInfo = flow.get(\"pillInfo\")||{};\npillInfo[payload.macAddress] = payload.name;\nflow.set(\"pillInfo\", pillInfo);\nreturn msg;","outputs":1,"timeout":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":422.5,"y":154,"wires":[[]]},{"id":"43f0cbb6bd1454c9","type":"comment","z":"42a59e4053b8f023","name":"keep track of pill name info in flow variable","info":"","x":207.5,"y":57,"wires":[]},{"id":"6accd4b777ffe94b","type":"comment","z":"42a59e4053b8f023","name":"persist telemetry info into temperature and telemetry measurements in db","info":"","x":306.5,"y":222,"wires":[]},{"id":"16804d22.537d83","type":"mqtt-broker","name":"pi-4-broker","broker":"localhost","port":"1883","clientid":"","usetls":false,"protocolVersion":"4","keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","birthMsg":{},"closeTopic":"","closeQos":"0","closePayload":"","closeMsg":{},"willTopic":"","willQos":"0","willPayload":"","willMsg":{},"sessionExpiry":""},{"id":"9ae97e25.17f048","type":"influxdb","hostname":"127.0.0.1","port":"8086","protocol":"http","database":"temperature","name":"temperature","usetls":false,"tls":"","influxdbVersion":"1.x","url":"","rejectUnauthorized":false}]

Note that I am inserting the full record into a telemetry table and also putting just the temperature into my generic temperature table which contains data from other sensors. It is not necessary to split out the data like this but thought I should mention what I am doing here to avoid confusion.

Hope this helps

Thanks for sending this. My first complication is that the API call fails for maximum admitted of 5 per 1m. I have 8 aaapills right now and I'll be adding more - monitoring commercial fermentations in a winery. I might just stick with the webhooks or pull the data for all the hydrometers with one API call and split it out based on the array data.

Thanks for the update. It's great to get the feedback on your real use case.

If you can open an issue on my github repo here Issues · tonymacdonald2008/node-red-rapt-pull · GitHub for this limitation that would be helpful.

I am travelling this week so won't be able to look at a real fix for a while.

In the mean time you could try a variation of the following flow (modified from my previous flow adding a split node and a rate limiting node @cameo69/node-red-ratelimit)

[{"id":"61758934874e6af8","type":"tab","label":"Flow 1","disabled":false,"info":"","env":[]},{"id":"c0276c0e770e68eb","type":"inject","z":"61758934874e6af8","name":"","props":[{"p":"payload"}],"repeat":"3600","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":130,"y":200,"wires":[["25b3fb03778349a6"]]},{"id":"25b3fb03778349a6","type":"rapt-pull","z":"61758934874e6af8","account":"","name":"","endpoint":"GetHydrometers","topic":"","split":false,"x":320,"y":200,"wires":[["3fed7a268f25799d","3f5b4b774606abfc"]]},{"id":"3fed7a268f25799d","type":"debug","z":"61758934874e6af8","name":"debug 271","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":300,"y":260,"wires":[]},{"id":"dddb76ce226cb8c0","type":"rapt-pull","z":"61758934874e6af8","account":"","name":"","endpoint":"GetTelemetry","topic":"RaptPillTelemetry","split":true,"x":890,"y":200,"wires":[["cba98799e5544b84","5bebb2cecac8ac5c"]]},{"id":"cba98799e5544b84","type":"debug","z":"61758934874e6af8","name":"debug 272","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":900,"y":260,"wires":[]},{"id":"5bebb2cecac8ac5c","type":"mqtt out","z":"61758934874e6af8","name":"","topic":"","qos":"","retain":"","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"1984f0b2.c48537","x":1090,"y":200,"wires":[]},{"id":"e514f889b5a33b1f","type":"comment","z":"61758934874e6af8","name":"Set inject to repeat hourly","info":"","x":150,"y":140,"wires":[]},{"id":"0cdf6044d2c9d3a4","type":"comment","z":"61758934874e6af8","name":"startDate for API request - defaults to the last time this node was triggered or the current time","info":"","x":1140,"y":120,"wires":[]},{"id":"4ae030719d3b624c","type":"comment","z":"61758934874e6af8","name":"endDate for API request - defaults to current time","info":"","x":1000,"y":160,"wires":[]},{"id":"030f8310196f0aca","type":"comment","z":"61758934874e6af8","name":"Gets telemetry data since the node was last triggered","info":"","x":1020,"y":80,"wires":[]},{"id":"f0bc6bdeb7cc208b","type":"comment","z":"61758934874e6af8","name":"Periodically poll for new telemetry data","info":"","x":190,"y":80,"wires":[]},{"id":"3f5b4b774606abfc","type":"split","z":"61758934874e6af8","name":"","splt":"\\n","spltType":"str","arraySplt":1,"arraySpltType":"len","stream":false,"addname":"","x":490,"y":200,"wires":[["dc863545c18b3e3e"]]},{"id":"dc863545c18b3e3e","type":"rate-limiter","z":"61758934874e6af8","delay_action":"ratelimit","rate":"3","nbRateUnits":"1","rateUnits":"minute","drop_select":"queue","addcurrentcount":false,"name":"","outputs":"1","buffer_size":"0","buffer_drop":"buffer_drop_new","emit_msg_2nd":false,"control_topic":"","version":0.0018,"x":680,"y":200,"wires":[["dddb76ce226cb8c0"]]},{"id":"1984f0b2.c48537","type":"mqtt-broker","name":"pi4broker","broker":"192.168.0.195","port":"1883","clientid":"","usetls":false,"compatmode":false,"keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","closeTopic":"","closeQos":"0","closePayload":"","willTopic":"","willQos":"0","willPayload":""}]

(updated to set rate limit to 3 per minute)