My case is similar to that of Node.send not always works - Developing Nodes - Node-RED Forum (nodered.org). I did not understand what cleanup I should do.
I am trying to build a custom Kafka Consumer node where I am using a function which gets triggered every time a message is received ( Consuming Messages · KafkaJS)
When I start the Node-RED for the first time, everything works fine. Sometimes when I redeploy(Hit the deploy button in UI), the node.send in the eachMessage
stops working even If I was able to get logs above and below the node.send(once stopped it won't work even If I redeploy, unless I restart the whole app). Unfortunately, I am unable to reproduce this consistently. Sometimes it works for even If I redeploy 10 times. sometimes 1 redeploy will be enough to break it.
An example of my code, removed any sensitive stuff, so might not work for you if you try to execute.
const { Kafka } = require("kafkajs");
module.exports = function (RED) {
function consumeKafkaMessage(config) {
let consumer;
RED.nodes.createNode(this, config);
var node = this;
node.init = async function(){
try{
let topic = config.topic;
let fromBegining = config.fromBegining;
let groupId = config.groupId;
const client = new Kafka({
brokers: 'brokers....',
connectionTimeout: 3000
})
let consumerOptions = {} ;
consumerOptions.topic = topic;
consumerOptions.fromBegining = fromBegining;
consumer = client.consumer({ groupId: groupId, allowAutoTopicCreation:false });
await consumer.connect()
let fromBegining = consumerOptions.fromBegining ? consumerOptions.fromBegining : true
await this.subscribe(consumer, consumerOptions.topic, fromBegining)
consumer.run({
eachMessage: async ({ topic, partition, message }) => {
let messageToSend = { "payload" : message.value.toString() }
node.send([null, messageToSend])
console.log("Logging messageToSend has msg id added to it.")
},
}).catch((error) => {
node.send([{payload : error}, null])
});
}catch(error){
node.send([{payload : error}, null])
}
}
node.on('close', async function(){
try{
console.log(`Closing the Node-RED Kafka consumer`);
consumer.disconnect();
}catch(error){
console.log(error)
}
})
node.init();
}
RED.nodes.registerType("kafka_consumer_node", consumeKafkaMessage);
}