MQTT Broker in Erlang based on Node-RED code

Hi There,

To prove the point that it's not that hard to duplicate Node-RED in Erlang, here is an Erlang-Red flow based off the work of @cymplecy creating an MQTT Broker in Node-RED.

The flow can also be viewed in the serverless Node-RED client - can view the Erlang code that went into this. It does really work in Erlang-Red and I'm actually using the Erlang MQTT libraries to connect to the broker (via the MQTT nodes).

What I aimed to do in creating this clone was to keep as much Node-RED code as possible, i.e. the change node now supports the buffer type. Binary (0b0101010) values are supported so that they can be use in conjunction with the Buffer type.

I also learned that bit shifting and binary operations are incredible simple in Erlang if and only if one understands the freaky syntax! Erlang was made for creating code of the back of a specification so it's no great wonder.

[{"id":"dc897f402c53697f","type":"tab","label":"[.breadboard] MQTT broker as flow","disabled":false,"info":"","env":[{"name":"ERED_PENDING","value":"true","type":"bool"}]},{"id":"22a2dd03796decc5","type":"group","z":"dc897f402c53697f","name":"subscription storage","style":{"label":true},"nodes":["8ac017d985b99893","fa000edf71c93622","6a238463862c37b4","6c126f348b04a739","d939b07bf691fd5a","99bd40cbdbe8b052","2d5dd30a1086ad59","c6d2c1944f1d09ba","d75bca3f19214d01","a79b7ede76991cbf","fad25e160067c8db","d904b3ba7ae1718d","099a121e094d2b7f","4147297a999d02ff","e333563e6cdd31ec"],"x":320,"y":23,"w":864,"h":298},{"id":"d4b9a5571b27f3b8","type":"group","z":"dc897f402c53697f","name":"Client","style":{"label":true},"nodes":["8b955da886c2db79","8a7d115c75d00cb2","4d0ffeae2a76b8b0","c4ab9a480d2b5300","eb63f1261f783d43","f8bb574706073725","c548723aabcb824d","f8e2d913c7a2e082","49784831a30cafba","b25834b00dc17621","94fd08bb1f03e37f","5453a29ad144ac8f","518fff983ad277ba","1a3b6d773654e9b5","8fd3cccdbffacd09"],"x":82.66669464111328,"y":1165.6666107177734,"w":838.0000381469727,"h":512.6667423248291},{"id":"3e1927fe4ebc89d8","type":"group","z":"dc897f402c53697f","name":"Helper code","style":{"label":true},"nodes":["023c321774b79ff6","e560cccf959d60d9","9c4a057d265f1bc2"],"x":19,"y":54,"w":272,"h":192},{"id":"6f9a495d75186980","type":"group","z":"dc897f402c53697f","name":"MQTT broker 3.1.1 only for QoS 0","style":{"label":true},"nodes":["98c9651dbea8582c","38440ad2e824817e","e4321e5c01b16c27","3addc59737a0ed8a","1e8e2648ed67d5c6","5c5cf4cf97698822","618215b887917904","c078fde4eaa67651","d354d045491e70f7","03561c2e4510609a","539c9d2c592d393b","59d1f5eaff7bc315","07e8b93b1a84af24","b8a6812cb8f0e9c5","88419b83727f1149","26d3a84f0b5c547e","5f3ddf0b4c97e847","2e29be5597431a04","462f8d53f850d053","d06e1719fc770910","da3f84ead8ae89dc","2343c09d75092ba6","46100fb0a08b0c5c","73a8064f83d4db2a","0c8ae20f5bca7f97","81ad923686dfe8d9","e396e5ee280efc86","4012ad0d8a2cfb0c","d4d4146f6450bcaa","8f829e2401b0dad4","2293bce4f75e3f8f","944d14965ae42f8e","e2da45201ee4031b","44cd3d3f4ee84f1c","f5abc9793ac1fc62","f5800594558eaf91","7f8884244ee2814f","f634f001c334a8ce","2f2bab9efe07d5ae","80c12efc3d845fa3","03e661a4f7ee610f","77b3e857bdd8c765","68f0a79c209e819c","ded413082f65543c"],"x":53.333221435546875,"y":435.99999237060547,"w":2329.0001525878906,"h":634.833381652832},{"id":"ded413082f65543c","type":"group","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"Send to client","style":{"label":true},"nodes":["af9f5d0d7cea3ddd","7d8a1c95eebd10b5","86d40617a04bb66d","cde7df7a94f9c8a2"],"x":2034.3333740234375,"y":461.99999237060547,"w":322,"h":142},{"id":"80c12efc3d845fa3","type":"junction","z":"dc897f402c53697f","g":"6f9a495d75186980","x":1263.666633605957,"y":620.8333892822266,"wires":[["03e661a4f7ee610f"]]},{"id":"03e661a4f7ee610f","type":"junction","z":"dc897f402c53697f","g":"6f9a495d75186980","x":1303.0476151704788,"y":566.7381461858749,"wires":[["d354d045491e70f7"]]},{"id":"77b3e857bdd8c765","type":"junction","z":"dc897f402c53697f","g":"6f9a495d75186980","x":1313.0476031303406,"y":872.1667095422745,"wires":[["e396e5ee280efc86","4012ad0d8a2cfb0c"]]},{"id":"8fd3cccdbffacd09","type":"junction","z":"dc897f402c53697f","g":"d4b9a5571b27f3b8","x":403.66668701171875,"y":1253.3332786560059,"wires":[["b25834b00dc17621","4d0ffeae2a76b8b0"]]},{"id":"cde7df7a94f9c8a2","type":"junction","z":"dc897f402c53697f","g":"ded413082f65543c","x":2150.3333740234375,"y":562.9999923706055,"wires":[["7d8a1c95eebd10b5","af9f5d0d7cea3ddd"]]},{"id":"68f0a79c209e819c","type":"junction","z":"dc897f402c53697f","g":"6f9a495d75186980","x":1311.7142199277878,"y":789.6429119110107,"wires":[["da3f84ead8ae89dc","2343c09d75092ba6"]]},{"id":"98c9651dbea8582c","type":"debug","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"Incoming","active":false,"tosidebar":true,"console":false,"tostatus":true,"complete":"true","targetType":"full","statusVal":"payload","statusType":"msg","x":368.99999237060547,"y":835.8333311080933,"wires":[]},{"id":"38440ad2e824817e","type":"tcp in","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"","server":"server","host":"","port":"2883","datamode":"stream","datatype":"buffer","newline":"","topic":"","trim":false,"base64":false,"tls":"","x":139.33322143554688,"y":789.8331909179688,"wires":[["98c9651dbea8582c","44cd3d3f4ee84f1c"]]},{"id":"8b955da886c2db79","type":"inject","z":"dc897f402c53697f","g":"d4b9a5571b27f3b8","name":"Publish  top=Test","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"Test","payload":"What From client on broker machine","payloadType":"str","x":208.66669464111328,"y":1637.3333530426025,"wires":[["8a7d115c75d00cb2"]]},{"id":"8a7d115c75d00cb2","type":"mqtt out","z":"dc897f402c53697f","g":"d4b9a5571b27f3b8","name":"MQTT Out","topic":"","qos":"0","retain":"false","respTopic":"","contentType":"","userProps":"","correl":"","expiry":"","broker":"da2f91c287ad12f7","x":770.3333320617676,"y":1636.9999980926514,"wires":[]},{"id":"e4321e5c01b16c27","type":"change","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"set CONNACK","rules":[{"t":"set","p":"payload","pt":"msg","to":"[\"0b00100000\",2,0,0]","tot":"bin"}],"action":"","property":"","from":"","to":"","reg":false,"x":1123.666633605957,"y":566.8333892822266,"wires":[["03e661a4f7ee610f","539c9d2c592d393b"]]},{"id":"3addc59737a0ed8a","type":"switch","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"Route \\n packet type","property":"controlbyte","propertyType":"msg","rules":[{"t":"eq","v":"0b00000001","vt":"num"},{"t":"eq","v":"0b00001100","vt":"num"},{"t":"eq","v":"0b00000011","vt":"num"},{"t":"eq","v":"0b00001000","vt":"num"},{"t":"eq","v":"0b00001010","vt":"num"},{"t":"eq","v":"0b00001110","vt":"num"},{"t":"else"}],"checkall":"true","repair":false,"outputs":7,"x":559.3333129882812,"y":754.8333740234375,"wires":[["1e8e2648ed67d5c6"],["618215b887917904"],["26d3a84f0b5c547e"],["5f3ddf0b4c97e847"],["0c8ae20f5bca7f97"],["46100fb0a08b0c5c"],["5c5cf4cf97698822"]]},{"id":"1e8e2648ed67d5c6","type":"change","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"CONNECT","rules":[{"p":"mqttPacket.type","pt":"msg","t":"set","to":"CONNECT","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":893.666633605957,"y":566.8333892822266,"wires":[["e4321e5c01b16c27","03561c2e4510609a"]]},{"id":"5c5cf4cf97698822","type":"debug","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"Unexpected","active":false,"tosidebar":true,"console":false,"tostatus":true,"complete":"true","targetType":"full","statusVal":"","statusType":"counter","x":658.6666107177734,"y":979.1666650772095,"wires":[]},{"id":"618215b887917904","type":"change","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"PINGREQ","rules":[{"p":"mqttPacket.type","pt":"msg","t":"set","to":"PINGREQ","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":883.666633605957,"y":620.8333892822266,"wires":[["c078fde4eaa67651","59d1f5eaff7bc315"]]},{"id":"c078fde4eaa67651","type":"change","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"Set PINGACK","rules":[{"t":"set","p":"payload","pt":"msg","to":"[\"0b11010000\",0]","tot":"bin"}],"action":"","property":"","from":"","to":"","reg":false,"x":1123.666633605957,"y":620.8333892822266,"wires":[["80c12efc3d845fa3","07e8b93b1a84af24"]]},{"id":"d354d045491e70f7","type":"link out","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"link out 3","mode":"link","links":["86d40617a04bb66d"],"x":1376.3333129882812,"y":566.8333740234375,"wires":[]},{"id":"03561c2e4510609a","type":"debug","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"debug 352","active":false,"tosidebar":true,"console":false,"tostatus":false,"complete":"\"CONNECT rcvd\"","targetType":"jsonata","statusVal":"","statusType":"auto","x":1016.6666259765625,"y":514.8333740234375,"wires":[],"l":false},{"id":"539c9d2c592d393b","type":"debug","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"debug 353","active":false,"tosidebar":true,"console":false,"tostatus":false,"complete":"\"CONNACK sent\"","targetType":"jsonata","statusVal":"","statusType":"auto","x":1376.3333129882812,"y":479.8333740234375,"wires":[],"l":false},{"id":"59d1f5eaff7bc315","type":"debug","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"debug 354","active":false,"tosidebar":true,"console":false,"tostatus":false,"complete":"\"PINGREQ rcvd\"","targetType":"jsonata","statusVal":"","statusType":"auto","x":1016.6666259765625,"y":656.8333740234375,"wires":[],"l":false},{"id":"07e8b93b1a84af24","type":"debug","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"debug 355","active":false,"tosidebar":true,"console":false,"tostatus":false,"complete":"\"PINGRESP sent\"","targetType":"jsonata","statusVal":"","statusType":"auto","x":1308.6666870117188,"y":648.8333740234375,"wires":[],"l":false},{"id":"b8a6812cb8f0e9c5","type":"change","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"PUBLISH","rules":[{"p":"mqttPacket.type","pt":"msg","t":"set","to":"PUBLISH","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":883.666633605957,"y":721.3333892822266,"wires":[["88419b83727f1149","8f829e2401b0dad4"]]},{"id":"88419b83727f1149","type":"debug","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"debug 356","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"\"PUBLISH rcvd\"","targetType":"jsonata","statusVal":"","statusType":"auto","x":1016.6666259765625,"y":772.8333740234375,"wires":[],"l":false},{"id":"26d3a84f0b5c547e","type":"switch","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"Only QoS0","property":"qos","propertyType":"msg","rules":[{"t":"eq","v":"0","vt":"num"}],"checkall":"true","repair":false,"outputs":1,"x":893.666633605957,"y":680.8333892822266,"wires":[["b8a6812cb8f0e9c5"]]},{"id":"4d0ffeae2a76b8b0","type":"mqtt in","z":"dc897f402c53697f","g":"d4b9a5571b27f3b8","name":"MQTT In","topic":"","qos":"2","datatype":"auto-detect","broker":"da2f91c287ad12f7","nl":false,"rap":true,"rh":0,"inputs":1,"x":526.9999618530273,"y":1229.6666946411133,"wires":[["eb63f1261f783d43"]]},{"id":"c4ab9a480d2b5300","type":"inject","z":"dc897f402c53697f","g":"d4b9a5571b27f3b8","name":"Subscribe to Test","props":[{"p":"action","v":"subscribe","vt":"str"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"Test","x":208.66669464111328,"y":1253.3332786560059,"wires":[["8fd3cccdbffacd09"]]},{"id":"eb63f1261f783d43","type":"debug","z":"dc897f402c53697f","g":"d4b9a5571b27f3b8","name":"Subscribtions","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","targetType":"full","statusVal":"","statusType":"auto","x":745.3332214355469,"y":1229.6666946411133,"wires":[]},{"id":"5f3ddf0b4c97e847","type":"change","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"SUBSCRIBE","rules":[{"p":"mqttPacket.type","pt":"msg","t":"set","to":"SUBSCRIBE","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":893.666633605957,"y":820.3333892822266,"wires":[["2e29be5597431a04","944d14965ae42f8e"]]},{"id":"2e29be5597431a04","type":"debug","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"debug 359","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"\"SUBSCRIBE rcvd\"","targetType":"jsonata","statusVal":"","statusType":"auto","x":1016.6666259765625,"y":860.8333892822266,"wires":[],"l":false},{"id":"f8bb574706073725","type":"inject","z":"dc897f402c53697f","g":"d4b9a5571b27f3b8","name":"Unsubscribe","props":[{"p":"action","v":"unsubscribe","vt":"str"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"Test","x":198.66669464111328,"y":1299.9999465942383,"wires":[["8fd3cccdbffacd09"]]},{"id":"c548723aabcb824d","type":"inject","z":"dc897f402c53697f","g":"d4b9a5571b27f3b8","name":"Connect","props":[{"p":"action","v":"connect","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":188.66669464111328,"y":1206.6666107177734,"wires":[["8fd3cccdbffacd09"]]},{"id":"f8e2d913c7a2e082","type":"inject","z":"dc897f402c53697f","g":"d4b9a5571b27f3b8","name":"Disconnect","props":[{"p":"action","v":"disconnect","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":198.66669464111328,"y":1346.6666145324707,"wires":[["8fd3cccdbffacd09"]]},{"id":"462f8d53f850d053","type":"delay","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"","pauseType":"delay","timeout":"2","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":1525.3332595825195,"y":665.8333950042725,"wires":[["d06e1719fc770910","d4d4146f6450bcaa"]]},{"id":"d06e1719fc770910","type":"link out","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"link out 6","mode":"link","links":["86d40617a04bb66d"],"x":1700.3331394195557,"y":665.8334045410156,"wires":[]},{"id":"da3f84ead8ae89dc","type":"debug","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"debug 363","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"\"SUBACK sent\"","targetType":"jsonata","statusVal":"","statusType":"auto","x":1438.0000610351562,"y":789.5000610351562,"wires":[],"l":false},{"id":"2343c09d75092ba6","type":"link out","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"link out 7","mode":"link","links":["86d40617a04bb66d"],"x":1376.3333129882812,"y":761.8333740234375,"wires":[]},{"id":"46100fb0a08b0c5c","type":"change","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"DISCONNECT","rules":[{"p":"mqttPacket.type","pt":"msg","t":"set","to":"DISCONNECT","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":903.666633605957,"y":980.8333892822266,"wires":[["73a8064f83d4db2a","2293bce4f75e3f8f"]]},{"id":"73a8064f83d4db2a","type":"debug","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"debug 364","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"\"DISCONNECT rcvd\"","targetType":"jsonata","statusVal":"","statusType":"auto","x":1099.666633605957,"y":980.8333892822266,"wires":[],"l":false},{"id":"0c8ae20f5bca7f97","type":"change","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"UNSUBSCRIBE","rules":[{"p":"mqttPacket.type","pt":"msg","t":"set","to":"UNSUBSCRIBE","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":903.666633605957,"y":901.3333892822266,"wires":[["81ad923686dfe8d9","f634f001c334a8ce"]]},{"id":"81ad923686dfe8d9","type":"debug","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"debug 365","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"\"UNSUBSCRIBE rcvd\"","targetType":"jsonata","statusVal":"","statusType":"auto","x":1016.6666259765625,"y":940.8333892822266,"wires":[],"l":false},{"id":"e396e5ee280efc86","type":"link out","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"link out 8","mode":"link","links":["86d40617a04bb66d"],"x":1376.3333129882812,"y":847.8333740234375,"wires":[]},{"id":"4012ad0d8a2cfb0c","type":"debug","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"debug 366","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"\"UNSUBACK sent\"","targetType":"jsonata","statusVal":"","statusType":"auto","x":1376.3333129882812,"y":887.8333740234375,"wires":[],"l":false},{"id":"d4d4146f6450bcaa","type":"debug","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"debug 371","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"\"Message sent to subscriber\"","targetType":"jsonata","statusVal":"","statusType":"auto","x":1631.9998054504395,"y":620.8334045410156,"wires":[],"l":false},{"id":"49784831a30cafba","type":"inject","z":"dc897f402c53697f","g":"d4b9a5571b27f3b8","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":188.66669464111328,"y":1449.3333253860474,"wires":[["518fff983ad277ba"]]},{"id":"b25834b00dc17621","type":"delay","z":"dc897f402c53697f","g":"d4b9a5571b27f3b8","name":"","pauseType":"delay","timeout":"2","timeoutUnits":"seconds","rate":"1","nbRateUnits":"1","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":516.0000457763672,"y":1384.6667385101318,"wires":[["518fff983ad277ba"]]},{"id":"94fd08bb1f03e37f","type":"inject","z":"dc897f402c53697f","g":"d4b9a5571b27f3b8","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":188.66669464111328,"y":1529.666660308838,"wires":[["1a3b6d773654e9b5"]]},{"id":"af9f5d0d7cea3ddd","type":"tcp out","z":"dc897f402c53697f","g":"ded413082f65543c","name":"","host":"","port":"","beserver":"reply","base64":false,"end":false,"tls":"","x":2260.3333740234375,"y":562.9999923706055,"wires":[]},{"id":"7d8a1c95eebd10b5","type":"debug","z":"dc897f402c53697f","g":"ded413082f65543c","name":"Sent","active":false,"tosidebar":true,"console":false,"tostatus":true,"complete":"true","targetType":"full","statusVal":"payload","statusType":"msg","x":2260.3333740234375,"y":502.99999237060547,"wires":[]},{"id":"86d40617a04bb66d","type":"link in","z":"dc897f402c53697f","g":"ded413082f65543c","name":"TCP reply","links":["d354d045491e70f7","d06e1719fc770910","2343c09d75092ba6","e396e5ee280efc86","7f8884244ee2814f"],"x":2075.3333740234375,"y":562.9999923706055,"wires":[["cde7df7a94f9c8a2"]]},{"id":"8f829e2401b0dad4","type":"function","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"publish mqtt payload","func":"#{ <<\"payload\">> := InPayload } = Msg,\n\n{RemLength, VarHeaderAndPubPayload} = mqtt_packet_helpers:remlength(InPayload),\n\n{TopicName, PubPayload} = mqtt_packet_helpers:topicname(VarHeaderAndPubPayload),\n\nSep = <<\" : \">>,\n\nnode_helpers:status_with_clear(Msg, NodeDef, \n                     <<TopicName/bytes, Sep/bytes, PubPayload/bytes>>, green, dot, 5),\n\n%% PubAck packet - no response for QoS zero packets\n%% node_helpers:msg_send_on_port(1, Msg#{ <<\"payload\">> => binary_to_list(<<4:4,0:4,1:8,0:8,0:8>>) }, NodeDef),\n\n%% port 2 goes to the code that sends this packet onto all subscribres\nnode_helpers:msg_send_on_port(2, Msg#{ <<\"topicName\">> => TopicName, <<\"mqttpayload\">> => InPayload }, NodeDef ),\n\n%% Send no other messages\nundefined\n","outputs":2,"timeout":0,"noerr":126,"initialize":"","finalize":"","libs":[],"x":1170.666633605957,"y":721.3333892822266,"wires":[["462f8d53f850d053"],["f5abc9793ac1fc62"]]},{"id":"8ac017d985b99893","type":"erlmodule","z":"dc897f402c53697f","g":"22a2dd03796decc5","name":"","module_name":"store_subscriptions","code":"","x":964,"y":159,"wires":[]},{"id":"fa000edf71c93622","type":"erlstatemachine","z":"dc897f402c53697f","g":"22a2dd03796decc5","name":"","scope":["8ac017d985b99893"],"emit_on_state_change":false,"x":943,"y":64,"wires":[["6a238463862c37b4"]]},{"id":"023c321774b79ff6","type":"erlmodule","z":"dc897f402c53697f","g":"3e1927fe4ebc89d8","name":"","module_name":"node_helpers","code":"","x":115,"y":95,"wires":[]},{"id":"e560cccf959d60d9","type":"erlmodule","z":"dc897f402c53697f","g":"3e1927fe4ebc89d8","name":"","module_name":"show_subscription_details","code":"","x":155,"y":150,"wires":[]},{"id":"6a238463862c37b4","type":"link out","z":"dc897f402c53697f","g":"22a2dd03796decc5","name":"link out 1","mode":"return","links":[],"x":1143,"y":64,"wires":[]},{"id":"6c126f348b04a739","type":"link in","z":"dc897f402c53697f","g":"22a2dd03796decc5","name":"get_subscriptions","links":[],"x":361,"y":107.2,"wires":[["d939b07bf691fd5a"]]},{"id":"d939b07bf691fd5a","type":"change","z":"dc897f402c53697f","g":"22a2dd03796decc5","name":"get subscriptions","rules":[{"t":"set","p":"action","pt":"msg","to":"subscriptions","tot":"str"},{"t":"set","p":"payload","pt":"msg","to":"ignored","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":609,"y":107,"wires":[["fa000edf71c93622"]]},{"id":"5453a29ad144ac8f","type":"function","z":"dc897f402c53697f","g":"d4b9a5571b27f3b8","name":"subs to status","func":"show_subscription_details:show(Msg,NodeDef),\nMsg","outputs":1,"timeout":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":814.6667327880859,"y":1515.0000743865967,"wires":[[]]},{"id":"518fff983ad277ba","type":"link call","z":"dc897f402c53697f","g":"d4b9a5571b27f3b8","name":"","links":["6c126f348b04a739"],"linkType":"static","timeout":"30","x":674.6666870117188,"y":1449.3333253860474,"wires":[["5453a29ad144ac8f"]]},{"id":"99bd40cbdbe8b052","type":"link in","z":"dc897f402c53697f","g":"22a2dd03796decc5","name":"remove all subscriptions","links":[],"x":361,"y":150.39999999999998,"wires":[["2d5dd30a1086ad59"]]},{"id":"2d5dd30a1086ad59","type":"change","z":"dc897f402c53697f","g":"22a2dd03796decc5","name":"remove all subscriptions","rules":[{"t":"set","p":"action","pt":"msg","to":"clear","tot":"str"},{"t":"set","p":"payload","pt":"msg","to":"ignored","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":612,"y":150,"wires":[["fa000edf71c93622"]]},{"id":"1a3b6d773654e9b5","type":"link call","z":"dc897f402c53697f","g":"d4b9a5571b27f3b8","name":"","links":["99bd40cbdbe8b052"],"linkType":"static","timeout":"30","x":431.0000457763672,"y":1529.666660308838,"wires":[["518fff983ad277ba"]]},{"id":"c6d2c1944f1d09ba","type":"change","z":"dc897f402c53697f","g":"22a2dd03796decc5","name":"unsubscribe session from all topics","rules":[{"t":"set","p":"action","pt":"msg","to":"unsubscribe_session_id","tot":"str"},{"t":"set","p":"payload","pt":"msg","to":"_session.id","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":606,"y":193,"wires":[["fa000edf71c93622"]]},{"id":"d75bca3f19214d01","type":"link in","z":"dc897f402c53697f","g":"22a2dd03796decc5","name":"unsubscribe_session_id","links":[],"x":361,"y":193.59999999999997,"wires":[["c6d2c1944f1d09ba"]]},{"id":"2293bce4f75e3f8f","type":"link call","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"","links":["d75bca3f19214d01"],"linkType":"static","timeout":"30","x":1166.6666259765625,"y":1029.8333740234375,"wires":[[]]},{"id":"944d14965ae42f8e","type":"function","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"create subscribe payload","func":"%%\n%% WARNING: This code assumes a single topic that is successfully subscribed to.\n%%\n\n#{ <<\"payload\">> := InPayload } = Msg,\n\n{RemLength, VarHeaderAndPubPayload} = mqtt_packet_helpers:remlength(InPayload),\n\n{PacketIdentifier, Topics} = mqtt_packet_helpers:packetidentifier(VarHeaderAndPubPayload),\n\n%% create the acknowledge packet and send it back\n%% NOTE: the last zero is the success status of the subscription: successful for QoS 0.\nM = Msg#{ \n     <<\"payload\">> => <<9:4, 0:4, 3:8, PacketIdentifier:16/bits, 0:8>>\n},\n\nnode_helpers:msg_send_on_port(1, M, NodeDef),\n\n{ok, TopicName, _} = mqtt_packet_helpers:poptopic(Topics),\n\nnode_helpers:status_with_clear(Msg, NodeDef, TopicName, green, ring, 5),\n\n%% create the details for updating the subscription list.\nMsg2 = node_helpers:msg_update(M, \"mqttPacket.subscribe\", #{ \n   <<\"MessageId\">> => PacketIdentifier,\n   <<\"topicName\">> => TopicName\n}),\n\nnode_helpers:msg_send_on_port(2, Msg2, NodeDef),\n\n%% Send no message, they have been sent.\nundefined","outputs":2,"timeout":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":1166.666633605957,"y":820.3333892822266,"wires":[["68f0a79c209e819c"],["e2da45201ee4031b"]]},{"id":"e2da45201ee4031b","type":"link call","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"","links":["fad25e160067c8db"],"linkType":"static","timeout":"30","x":1593.6666870117188,"y":823.8333740234375,"wires":[[]]},{"id":"a79b7ede76991cbf","type":"change","z":"dc897f402c53697f","g":"22a2dd03796decc5","name":"subscribe ssession to topic","rules":[{"t":"set","p":"action","pt":"msg","to":"subscribe","tot":"str"},{"t":"set","p":"payload","pt":"msg","to":"[$$._session.id, $$.mqttPacket.subscribe.topicName]","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":653,"y":236,"wires":[["fa000edf71c93622"]]},{"id":"fad25e160067c8db","type":"link in","z":"dc897f402c53697f","g":"22a2dd03796decc5","name":"subscribe_session_id_to_topic","links":[],"x":361,"y":236.79999999999995,"wires":[["a79b7ede76991cbf"]]},{"id":"44cd3d3f4ee84f1c","type":"function","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"set control byte","func":"maps:merge(Msg, mqtt_packet_helpers:controlbyte(Msg) )\n","outputs":1,"timeout":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":353.99999237060547,"y":754.8333740234375,"wires":[["3addc59737a0ed8a"]]},{"id":"9c4a057d265f1bc2","type":"erlmodule","z":"dc897f402c53697f","g":"3e1927fe4ebc89d8","name":"","module_name":"mqtt_packet_helpers","code":"","x":145,"y":205,"wires":[]},{"id":"d904b3ba7ae1718d","type":"change","z":"dc897f402c53697f","g":"22a2dd03796decc5","name":"subscriptions for topic","rules":[{"t":"set","p":"action","pt":"msg","to":"subs_for_topic","tot":"str"},{"t":"set","p":"payload","pt":"msg","to":"$$.topicName","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":709,"y":279,"wires":[["fa000edf71c93622"]]},{"id":"099a121e094d2b7f","type":"link in","z":"dc897f402c53697f","g":"22a2dd03796decc5","name":"sessions_for_topic","links":[],"x":361,"y":280,"wires":[["d904b3ba7ae1718d"]]},{"id":"f5abc9793ac1fc62","type":"link call","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"","links":["099a121e094d2b7f"],"linkType":"static","timeout":"30","x":1554.6666870117188,"y":727.8333711624146,"wires":[["f5800594558eaf91"]]},{"id":"f5800594558eaf91","type":"function","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"send payload out","func":"#{ <<\"payload\">> := Sessions, <<\"mqttpayload\">> := Payload } = Msg,\n\n[ node_helpers:msg_send(Msg#{ <<\"payload\">> => Payload, <<\"_session\">> => #{\n    <<\"id\">> => SessionId,\n    <<\"type\">> => <<\"tcp\">>,\n    <<\"status\">>  => <<\"connected\">>\n }}, NodeDef) || SessionId <- Sessions ],\n\nundefined\n\n","outputs":1,"timeout":0,"noerr":57,"initialize":"","finalize":"","libs":[],"x":1789.6666870117188,"y":727.8333711624146,"wires":[["7f8884244ee2814f"]]},{"id":"7f8884244ee2814f","type":"link out","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"link out 2","mode":"link","links":["86d40617a04bb66d"],"x":1971.6666870117188,"y":727.8333711624146,"wires":[]},{"id":"f634f001c334a8ce","type":"function","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"ack & prepare sub deletion","func":"#{ <<\"payload\">> := InPayload } = Msg,\n\n{RemLength, VarHeaderAndPubPayload} = mqtt_packet_helpers:remlength(InPayload),\n\n{PacketIdentifier, Topics} = mqtt_packet_helpers:packetidentifier(VarHeaderAndPubPayload),\n\n%% 0b1011 ==> 11 --> UnsubAck\nnode_helpers:msg_send_on_port(1, Msg#{ <<\"payload\">> => <<11:4,0:4,2:8,PacketIdentifier:16/bits>> }, NodeDef),\n\nTopicHandler = fun(TopicName) -> \n  node_helpers:status_with_clear(Msg, NodeDef, TopicName, green, ring, 5),\n  node_helpers:msg_send_on_port(2, Msg#{<<\"payload\">> => TopicName}, NodeDef)\nend,\n\nmqtt_packet_helpers:iterate_through_topics(Topics, TopicHandler),\n\n%% Send no messages\nundefined","outputs":2,"timeout":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":1155.166633605957,"y":901.3333892822266,"wires":[["77b3e857bdd8c765"],["2f2bab9efe07d5ae"]]},{"id":"4147297a999d02ff","type":"change","z":"dc897f402c53697f","g":"22a2dd03796decc5","name":"unsubscribe ssession from topic","rules":[{"t":"set","p":"action","pt":"msg","to":"unsubscribe","tot":"str"},{"t":"set","p":"payload","pt":"msg","to":"[$$._session.id, $$.payload]","tot":"jsonata"}],"action":"","property":"","from":"","to":"","reg":false,"x":566,"y":64,"wires":[["fa000edf71c93622"]]},{"id":"e333563e6cdd31ec","type":"link in","z":"dc897f402c53697f","g":"22a2dd03796decc5","name":"unsubscribe_session_from_topic","links":[],"x":361,"y":64,"wires":[["4147297a999d02ff"]]},{"id":"2f2bab9efe07d5ae","type":"link call","z":"dc897f402c53697f","g":"6f9a495d75186980","name":"","links":["e333563e6cdd31ec"],"linkType":"static","timeout":"30","x":1612.6666870117188,"y":907.8333740234375,"wires":[[]]},{"id":"da2f91c287ad12f7","type":"mqtt-broker","name":"Homemade","broker":"localhost","port":"2883","clientid":"","autoConnect":true,"usetls":false,"protocolVersion":"4","keepalive":"60","cleansession":true,"autoUnsubscribe":true,"birthTopic":"","birthQos":"0","birthRetain":"false","birthPayload":"","birthMsg":{},"closeTopic":"","closeQos":"0","closeRetain":"false","closePayload":"","closeMsg":{},"willTopic":"","willQos":"0","willRetain":"false","willPayload":"","willMsg":{},"userProps":"","sessionExpiry":""}]
1 Like