Node.send() is not working always

My case is similar to that of Node.send not always works - Developing Nodes - Node-RED Forum (nodered.org). I did not understand what cleanup I should do.

I am trying to build a custom Kafka Consumer node where I am using a function which gets triggered every time a message is received ( Consuming Messages · KafkaJS)

When I start the Node-RED for the first time, everything works fine. Sometimes when I redeploy(Hit the deploy button in UI), the node.send in the eachMessage stops working even If I was able to get logs above and below the node.send(once stopped it won't work even If I redeploy, unless I restart the whole app). Unfortunately, I am unable to reproduce this consistently. Sometimes it works for even If I redeploy 10 times. sometimes 1 redeploy will be enough to break it.

An example of my code, removed any sensitive stuff, so might not work for you if you try to execute.

const { Kafka } = require("kafkajs");
module.exports = function (RED) {
    function consumeKafkaMessage(config) {
        let consumer;
        RED.nodes.createNode(this, config);
        var node = this;
        node.init = async function(){
            try{
                let topic =  config.topic;
                let fromBegining =  config.fromBegining;
                let groupId =  config.groupId;
                
                const client = new Kafka({ 
                    brokers: 'brokers....', 
                    connectionTimeout: 3000
                })
                let consumerOptions = {} ;
                consumerOptions.topic = topic;
                consumerOptions.fromBegining = fromBegining;
                
                
                consumer = client.consumer({ groupId: groupId, allowAutoTopicCreation:false });
                await consumer.connect()
                let fromBegining = consumerOptions.fromBegining ? consumerOptions.fromBegining : true
                await this.subscribe(consumer, consumerOptions.topic, fromBegining)
                
                consumer.run({
                    eachMessage: async ({ topic, partition, message }) => {
                        let messageToSend = { "payload" : message.value.toString() }
                        node.send([null, messageToSend])
                        console.log("Logging messageToSend has msg id added to it.")
                    },
                }).catch((error) => {
                    node.send([{payload : error}, null])
                });
            }catch(error){
                node.send([{payload : error}, null])
            }
        }

       node.on('close', async function(){
            try{
                console.log(`Closing the Node-RED Kafka consumer`);
                consumer.disconnect();
            }catch(error){
                console.log(error)
            }
        })
        node.init();
    }
    RED.nodes.registerType("kafka_consumer_node", consumeKafkaMessage);
}

When a node is deployed, it gets destroyed/recreated.
Therefore you should handle .on('close'

https://nodered.org/docs/creating-nodes/node-js#closing-the-node

this.on('close', function(removed, done) {
    if (removed) {
        // This node has been disabled/deleted - might wanna do something special?
    } else {
        // This node is being restarted - might wanna do something else?
    }
    // but normally...
    doCleanup_unsubscribe_closeConnections_EtcEtcEtc(function() {
        done()
    });
});

Thank you for your prompt reply, I have edited my code as you suggested i.e added node.on('close') but unfortunately the issue still persists. I was not clear in my post earlier, the consumer itself is receiving messages every time I redeploy, but the node.send is not passing the message to the next node sometimes.(When the issue starts redeploys won't help me anymore, I need to restart the whole app).

Is there anything I should additionally do in node.on('close') function apart from stopping my kafka consumers?

Let me know, if anything I said is confusing.

the only way this will occur is if the msg is null or undefined.

Have you added console.logs to check this is not happening?

Hello, I think I solved the issue, the message was not null or undefined(I logged it), but it was indeed closing the consumers properly ig.(not 100% sure, I tried redeploying 20-25 times, everything is working fine, I should test it more rigorously though.)

For reference to others, I modified it to

node.on('close', function(done){
    try{
        new Promise(async (resolve,reject)=>{
            await consumer.disconnect();
            resolve();
        })
        .then(()=>{
            done();
        })
        .catch((error)=>{
            done()
        })
        
    }catch(error){
    }
})

Thank you so much for your time @Steve-Mcl!!

But I would like to know, is there a reason why not closing consumers properly is affecting node.send while the consumer itself is delivering the message correctly every time?

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.