Overlay HTTP Request over MQTT Pub and Sub

Hello. What I am trying to accomplish is expose an HTTP endpoint for user to issue POST. Posted data gets manipulated and published to MQTT broker. Once broker processes message it publishes a message with an error code. I would like to pass this error code back to HTTP client as response. Topic subscription could receive many different data, not relevant to the operation. I would like to set a five second timeout, while parsing received data on topic. If I see the data I am after I would respond to HTTP request, otherwise timeout and return non-200 status.

I understand that I have to hold on to msg.resp and but I'm not sure how to orchestrate this.

UPDATE

Not sure that saving msg.res to flow.res is a good idea. Are all of these messages within the same instance of the flow???

[{"id":"579a368c.4e87f8","type":"tab","label":"ZPL Ingress","disabled":false,"info":""},{"id":"e617fe72.32aa3","type":"mqtt in","z":"579a368c.4e87f8","name":"","topic":"l99/ccs/evt/read","qos":"1","datatype":"json","broker":"caf333e3.df7ec","x":100,"y":60,"wires":[["744fba68.4371e4"]]},{"id":"394f1739.d3b1f8","type":"http in","z":"579a368c.4e87f8","name":"","url":"/l99/zpl","method":"post","upload":false,"swaggerDoc":"","x":90,"y":280,"wires":[["fb54feaf.3cb19","c288367d.49b708"]]},{"id":"bbebe0bc.4033b","type":"http response","z":"579a368c.4e87f8","name":"","statusCode":"200","headers":{},"x":820,"y":180,"wires":[]},{"id":"5db9f64b.4fe668","type":"mqtt out","z":"579a368c.4e87f8","name":"","topic":"l99/ccs/cmd/write","qos":"1","retain":"false","broker":"caf333e3.df7ec","x":630,"y":360,"wires":[]},{"id":"29302751.21ca58","type":"template","z":"579a368c.4e87f8","name":"","field":"payload","fieldType":"msg","format":"handlebars","syntax":"mustache","template":"[\n    {\n        \"value\": \"{{payload}}\", \n        \"name\": \"%Z56.0\"\n    }\n]","output":"json","x":440,"y":360,"wires":[["5db9f64b.4fe668"]]},{"id":"fb54feaf.3cb19","type":"base64","z":"579a368c.4e87f8","name":"","action":"str","property":"payload","x":280,"y":360,"wires":[["29302751.21ca58"]]},{"id":"744fba68.4371e4","type":"change","z":"579a368c.4e87f8","name":"","rules":[{"t":"set","p":"mqtt_hit","pt":"msg","to":"payload[address = '%Z56.1'] ? \t    payload[address = '%Z56.1'][0].value : \t    -1","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":310,"y":60,"wires":[["5e2e899d.8a6cd8"]]},{"id":"364d27d.8bb63d8","type":"wait-paths","z":"579a368c.4e87f8","name":"","paths":"[\"mqtt_path\",\"http_path\"]","timeout":"2000","finalTimeout":"3000","x":370,"y":180,"wires":[["47e84ac1.497fa4"]]},{"id":"c288367d.49b708","type":"change","z":"579a368c.4e87f8","name":"","rules":[{"t":"set","p":"paths['http_path']","pt":"msg","to":"1","tot":"num"},{"t":"set","p":"pathsCorrelationId","pt":"msg","to":"666","tot":"num"},{"t":"set","p":"res","pt":"flow","to":"res","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":300,"y":280,"wires":[["364d27d.8bb63d8"]]},{"id":"47e84ac1.497fa4","type":"change","z":"579a368c.4e87f8","name":"","rules":[{"t":"set","p":"res","pt":"msg","to":"res","tot":"flow"}],"action":"","property":"","from":"","to":"","reg":false,"x":650,"y":180,"wires":[["bbebe0bc.4033b"]]},{"id":"e1097953.bfd548","type":"http response","z":"579a368c.4e87f8","name":"","statusCode":"500","headers":{},"x":820,"y":240,"wires":[]},{"id":"5e2e899d.8a6cd8","type":"switch","z":"579a368c.4e87f8","name":"","property":"mqtt_hit","propertyType":"msg","rules":[{"t":"gte","v":"0","vt":"num"}],"checkall":"true","repair":false,"outputs":1,"x":490,"y":60,"wires":[["6ba7cd35.7c0c04"]]},{"id":"6ba7cd35.7c0c04","type":"change","z":"579a368c.4e87f8","name":"","rules":[{"t":"set","p":"paths['mqtt_path']","pt":"msg","to":"1","tot":"num"},{"t":"set","p":"pathsCorrelationId","pt":"msg","to":"666","tot":"num"}],"action":"","property":"","from":"","to":"","reg":false,"x":660,"y":60,"wires":[["364d27d.8bb63d8"]]},{"id":"e3f9f5f3.c588c8","type":"catch","z":"579a368c.4e87f8","name":"","scope":["364d27d.8bb63d8"],"uncaught":false,"x":425.7142791748047,"y":208.57142639160156,"wires":[["11667a1.d384d86"]]},{"id":"11667a1.d384d86","type":"change","z":"579a368c.4e87f8","name":"","rules":[{"t":"set","p":"res","pt":"msg","to":"res","tot":"flow"}],"action":"","property":"","from":"","to":"","reg":false,"x":650,"y":240,"wires":[["e1097953.bfd548"]]},{"id":"caf333e3.df7ec","type":"mqtt-broker","z":"","name":"mosquitto","broker":"mosquitto","port":"1883","clientid":"nr_pgm1003_zplingress","usetls":false,"compatmode":false,"keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","closeTopic":"","closeQos":"0","closePayload":"","willTopic":"","willQos":"0","willPayload":""}]

OK, so this works as expected. Quite a lot of nodes.

[{"id":"579a368c.4e87f8","type":"tab","label":"ZPL Ingress","disabled":false,"info":""},{"id":"e617fe72.32aa3","type":"mqtt in","z":"579a368c.4e87f8","name":"","topic":"l99/ccs/evt/read","qos":"1","datatype":"json","broker":"caf333e3.df7ec","x":100,"y":60,"wires":[["744fba68.4371e4"]]},{"id":"394f1739.d3b1f8","type":"http in","z":"579a368c.4e87f8","name":"","url":"/pstprint","method":"post","upload":false,"swaggerDoc":"","x":140,"y":540,"wires":[["3f8957ae.cd4d88"]]},{"id":"bbebe0bc.4033b","type":"http response","z":"579a368c.4e87f8","name":"","statusCode":"200","headers":{},"x":1460,"y":180,"wires":[]},{"id":"5db9f64b.4fe668","type":"mqtt out","z":"579a368c.4e87f8","name":"","topic":"l99/ccs/cmd/write","qos":"1","retain":"false","broker":"caf333e3.df7ec","x":1050,"y":560,"wires":[]},{"id":"29302751.21ca58","type":"template","z":"579a368c.4e87f8","name":"mqtt cmd","field":"payload","fieldType":"msg","format":"handlebars","syntax":"mustache","template":"[\n    {\n        \"value\": \"{{{payload}}}\", \n        \"name\": \"%Z56.0\"\n    }\n]","output":"json","x":860,"y":560,"wires":[["5db9f64b.4fe668","dd5437fd.a6d0f8"]]},{"id":"744fba68.4371e4","type":"change","z":"579a368c.4e87f8","name":"match mqtt evt","rules":[{"t":"set","p":"_aux_mqtt_hit","pt":"msg","to":"payload[address = '%Z56.1'] ? \t    payload[address = '%Z56.1'][0].value : \t    -1","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":300,"y":60,"wires":[["5e2e899d.8a6cd8"]]},{"id":"364d27d.8bb63d8","type":"wait-paths","z":"579a368c.4e87f8","name":"","paths":"[\"_aux_mqtt_path\",\"_aux_http_path\"]","timeout":"2000","finalTimeout":"2000","x":730,"y":180,"wires":[["47e84ac1.497fa4"]]},{"id":"c288367d.49b708","type":"change","z":"579a368c.4e87f8","name":"activate path, save http.res","rules":[{"t":"set","p":"_aux_res","pt":"flow","to":"res","tot":"msg"},{"t":"set","p":"_aux_msgid","pt":"flow","to":"_msgid","tot":"msg"},{"t":"set","p":"paths['_aux_http_path']","pt":"msg","to":"1","tot":"num"},{"t":"set","p":"pathsCorrelationId","pt":"msg","to":"666","tot":"num"}],"action":"","property":"","from":"","to":"","reg":false,"x":740,"y":480,"wires":[["3f310b86.29b7b4"]]},{"id":"47e84ac1.497fa4","type":"change","z":"579a368c.4e87f8","name":"","rules":[{"t":"set","p":"res","pt":"msg","to":"_aux_res","tot":"flow"}],"action":"","property":"","from":"","to":"","reg":false,"x":1090,"y":180,"wires":[["e5551ef5.af111"]]},{"id":"e1097953.bfd548","type":"http response","z":"579a368c.4e87f8","name":"","statusCode":"408","headers":{},"x":1460,"y":280,"wires":[]},{"id":"5e2e899d.8a6cd8","type":"switch","z":"579a368c.4e87f8","name":"evt matched","property":"_aux_mqtt_hit","propertyType":"msg","rules":[{"t":"gte","v":"0","vt":"num"}],"checkall":"true","repair":false,"outputs":1,"x":510,"y":60,"wires":[["6ba7cd35.7c0c04"]]},{"id":"6ba7cd35.7c0c04","type":"change","z":"579a368c.4e87f8","name":"activate path","rules":[{"t":"set","p":"paths['_aux_mqtt_path']","pt":"msg","to":"1","tot":"num"},{"t":"set","p":"pathsCorrelationId","pt":"msg","to":"666","tot":"num"}],"action":"","property":"","from":"","to":"","reg":false,"x":690,"y":60,"wires":[["364d27d.8bb63d8","b30e4226.205e9"]]},{"id":"e3f9f5f3.c588c8","type":"catch","z":"579a368c.4e87f8","name":"","scope":["364d27d.8bb63d8"],"uncaught":false,"x":752.7500114440918,"y":211.25000381469727,"wires":[["446b1f88.a9fea"]]},{"id":"11667a1.d384d86","type":"change","z":"579a368c.4e87f8","name":"","rules":[{"t":"set","p":"res","pt":"msg","to":"_aux_res","tot":"flow"}],"action":"","property":"","from":"","to":"","reg":false,"x":1090,"y":280,"wires":[["7651a2ac.3660cc"]]},{"id":"f5af90a8.db59b","type":"change","z":"579a368c.4e87f8","name":"reset flow.res","rules":[{"t":"set","p":"_aux_res","pt":"flow","to":"null","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":1470,"y":220,"wires":[[]]},{"id":"88563d0c.92177","type":"change","z":"579a368c.4e87f8","name":"reset flow.res","rules":[{"t":"set","p":"_aux_res","pt":"flow","to":"null","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":1470,"y":320,"wires":[[]]},{"id":"d708a1cb.ce348","type":"function","z":"579a368c.4e87f8","name":"encode b64","func":"msg.payload = Buffer.from(msg.payload).toString(\"base64\");\nreturn msg;","outputs":1,"noerr":0,"x":690,"y":560,"wires":[["29302751.21ca58"]]},{"id":"3218cd31.730902","type":"switch","z":"579a368c.4e87f8","name":"http lock","property":"_aux_res","propertyType":"flow","rules":[{"t":"null"},{"t":"else"}],"checkall":"true","repair":false,"outputs":2,"x":460,"y":540,"wires":[["d708a1cb.ce348","c288367d.49b708"],["16d565e8.44269a"]]},{"id":"53a1c2bf.75dfcc","type":"http response","z":"579a368c.4e87f8","name":"","statusCode":"409","headers":{},"x":1040,"y":620,"wires":[]},{"id":"e5551ef5.af111","type":"template","z":"579a368c.4e87f8","name":"set header","field":"headers","fieldType":"msg","format":"handlebars","syntax":"mustache","template":"{\n    \"X-NR-MSGID-MQTT\": \"{{_msgid}}\",\n    \"X-NR-MSGID-HTTP\": \"{{flow._aux_msgid}}\"\n}","output":"json","x":1270,"y":180,"wires":[["bbebe0bc.4033b","f5af90a8.db59b"]]},{"id":"7651a2ac.3660cc","type":"template","z":"579a368c.4e87f8","name":"set header","field":"headers","fieldType":"msg","format":"handlebars","syntax":"mustache","template":"{\n    \"X-NR-MSGID-MQTT\": \"{{_msgid}}\",\n    \"X-NR-MSGID-HTTP\": \"{{flow._aux_msgid}}\"\n}","output":"json","x":1270,"y":280,"wires":[["e1097953.bfd548","88563d0c.92177"]]},{"id":"bd82f536.c8c028","type":"template","z":"579a368c.4e87f8","name":"set header","field":"headers","fieldType":"msg","format":"handlebars","syntax":"mustache","template":"{\n    \"X-NR-MSGID-MQTT\": \"\",\n    \"X-NR-MSGID-HTTP\": \"{{_msgid}}\"\n}","output":"json","x":870,"y":620,"wires":[["53a1c2bf.75dfcc"]]},{"id":"446b1f88.a9fea","type":"switch","z":"579a368c.4e87f8","name":"http.res exists","property":"_aux_res","propertyType":"flow","rules":[{"t":"nnull"}],"checkall":"true","repair":false,"outputs":1,"x":900,"y":280,"wires":[["11667a1.d384d86"]]},{"id":"cad8eecb.069e7","type":"debug","z":"579a368c.4e87f8","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"_msgid","targetType":"msg","x":1150,"y":480,"wires":[]},{"id":"b30e4226.205e9","type":"debug","z":"579a368c.4e87f8","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"_msgid","targetType":"msg","x":890,"y":60,"wires":[]},{"id":"3f8957ae.cd4d88","type":"semaphore-take","z":"579a368c.4e87f8","config":"f15b809b.2ece","name":"block","x":310,"y":540,"wires":[["3218cd31.730902"]]},{"id":"3f310b86.29b7b4","type":"semaphore-leave","z":"579a368c.4e87f8","config":"f15b809b.2ece","name":"release","x":960,"y":480,"wires":[["364d27d.8bb63d8","cad8eecb.069e7"]]},{"id":"16d565e8.44269a","type":"semaphore-leave","z":"579a368c.4e87f8","config":"f15b809b.2ece","name":"release","x":700,"y":620,"wires":[["bd82f536.c8c028"]]},{"id":"dd5437fd.a6d0f8","type":"debug","z":"579a368c.4e87f8","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","x":1070,"y":520,"wires":[]},{"id":"caf333e3.df7ec","type":"mqtt-broker","z":"","name":"mosquitto","broker":"mosquitto","port":"1883","clientid":"nr_pgm1003_zplingress","usetls":false,"compatmode":false,"keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","closeTopic":"","closeQos":"0","closePayload":"","willTopic":"","willQos":"0","willPayload":""},{"id":"f15b809b.2ece","type":"semaphore-config","z":"","name":"http_mutex","capacity":"1"}]
1 Like

Hi
What are these missing nodes?

What are these missing nodes?

looks like

Ta
And also
node-red-contrib-semaphore