Modbus, Node Red, and Kafka

Greetings y'all,
I'm attempting to use Node-Red to read data from a Modbus TCP server and push that information out through a Kafka broker to connect with cloud IoT. I'm doing my tests using Modbus Monitor XPF to simulate a MODBus TCP server. I'm able to connect to the server and see the data coming out in the Modbus Read node as well as a debug node. But when I push into a Kafka Producer no information is injested in Kafka from Modbus Flex Getter. I am able to inject strings and time stamps directly into the Kafka Producer and see them come out a consumer. So I know my Kafka Broker is functioning.

I was thinking that it's an issue with the format of the data from the modbus server so I've tried to convert it to json and xml with no success.

[{"id":"5efd9028.4f955","type":"modbus-flex-getter","z":"66f42dcd.c0a3b4","name":"Modbus XPF","showStatusActivities":false,"showErrors":false,"logIOActivities":false,"server":"4f580569.a8bb3c","useIOFile":false,"ioFile":"","useIOForPayload":false,"emptyMsgOnFail":false,"keepMsgProperties":false,"x":430,"y":440,"wires":[[],["3cdd200.1212fe","50b06365.d43bbc","9b657e74.88fb9"]]},{"id":"a2bacfbe.45ce9","type":"function","z":"66f42dcd.c0a3b4","name":"","func":"var fc=3;\nvar sa=1;\nvar addresses=1;\n//var slave_ip=msg.payload.slave_ip;\n//msg.slave_ip=\"192.0.0.1:10502\"\nmsg.payload={value: msg.payload, 'fc':fc, 'unitid':1, 'address':sa, 'quantity': addresses };\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","x":280,"y":440,"wires":[["5efd9028.4f955"]]},{"id":"3cdd200.1212fe","type":"modbus-response","z":"66f42dcd.c0a3b4","name":"0","registerShowMax":20,"x":690,"y":420,"wires":[]},{"id":"9b657e74.88fb9","type":"debug","z":"66f42dcd.c0a3b4","name":"0","active":false,"tosidebar":true,"console":true,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":990,"y":420,"wires":[]},{"id":"c1f232b2.f3b06","type":"inject","z":"66f42dcd.c0a3b4","name":"","props":[{"p":"payload"}],"repeat":"3","crontab":"","once":true,"onceDelay":"3","topic":"","payload":"","payloadType":"date","x":110,"y":440,"wires":[["a2bacfbe.45ce9"]]},{"id":"d9334325.26521","type":"Kafka Producer","z":"66f42dcd.c0a3b4","name":"Mod","broker":"b5d9f4b1.568958","topic":"Modbus1","topicSlash2dot":false,"requireAcks":1,"ackTimeoutMs":100,"partitionerType":0,"key":"","partition":0,"attributes":0,"connectionType":"Producer","convertFromJson":false,"x":990,"y":600,"wires":[]},{"id":"2f19f8dd.bb7878","type":"debug","z":"66f42dcd.c0a3b4","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"false","statusVal":"","statusType":"auto","x":390,"y":640,"wires":[]},{"id":"863e10e9.ffce2","type":"Kafka Consumer","z":"66f42dcd.c0a3b4","name":"","broker":"b5d9f4b1.568958","regex":false,"topics":[{"topic":"Modbus1","offset":0,"partition":0}],"groupId":"kafka-node-group","autoCommit":"true","autoCommitIntervalMs":5000,"fetchMaxWaitMs":100,"fetchMinBytes":1,"fetchMaxBytes":1048576,"fromOffset":0,"encoding":"utf8","keyEncoding":"utf8","connectionType":"Consumer","convertToJson":false,"x":130,"y":640,"wires":[["2f19f8dd.bb7878"]]},{"id":"50b06365.d43bbc","type":"function","z":"66f42dcd.c0a3b4","name":"","func":"const buf= Buffer.from(msg.payload.buffer)\n//const buf = Buffer.from(msg.payload.buffer);\nconst value = buf.readUInt16BE();\nmsg.value = value;\nreturn msg;","outputs":1,"noerr":0,"initialize":"","finalize":"","x":640,"y":520,"wires":[["c9ccab13.c3a958"]]},{"id":"c9ccab13.c3a958","type":"json","z":"66f42dcd.c0a3b4","name":"","property":"payload","action":"","pretty":false,"x":810,"y":520,"wires":[["d9334325.26521","9b657e74.88fb9"]]},{"id":"4f580569.a8bb3c","type":"modbus-client","name":"ModBus Monitor Server","clienttype":"tcp","bufferCommands":true,"stateLogEnabled":true,"queueLogEnabled":true,"tcpHost":"127.0.0.1","tcpPort":"10502","tcpType":"DEFAULT","serialPort":"/dev/ttyUSB","serialType":"RTU-BUFFERD","serialBaudrate":"9600","serialDatabits":"8","serialStopbits":"1","serialParity":"none","serialConnectionDelay":"100","unit_id":"1","commandDelay":"1","clientTimeout":"1000","reconnectOnTimeout":true,"reconnectTimeout":"500","parallelUnitIdsAllowed":true},{"id":"b5d9f4b1.568958","type":"Kafka Broker","name":"Local IP","hosts":[{"host":"127.0.0.1","port":9092}],"hostsEnvVar":"","connectTimeout":"10000","requestTimeout":"30000","autoConnect":"true","idleConnection":"5","reconnectOnIdle":"true","maxAsyncRequests":"10","checkInterval":"10","selfSign":true,"usetls":false,"useCredentials":false}]

I hope someone out there might have an idea as to why this isn't working.

Cheers,

Tark

No idea what data formats Kafka requires/supports but have you tried to format & send the data in other formats (string/byte array/hex/binary) depending on what Kafka supports?

Add debug nodes and carefully compare the data from your inject node with the data that fails, there must be a difference.