Msg.flush adding message into the queue

I have a flow in which there are nearly 20 messages coming from a node and i split them to process individually so i set a rate limit of approx 20 seconds. however, some messages are empty, and hence dont take any amount of time, so i need to wait for 20 seconds unnecessarily,
hence i added a msg.flush set to 1, to advance to next message immediately, the trouble is, the msg.flush itself adds a message into the queue, and although till the end of the original message length the flow works at best speed possible, the rate limit continues to send out the message recieved from msg.flush and the flow gives error as it does not have expected key parameters in the message.
i now have optimised by including a switch node that looks for a key parameter and stops flow if not recieved.
is there a better method.
Note : injecting msg.flush from an inject node without any payload works perfectly. i tried deleting the message property before msg.flush, but there are so many properties, it is difficult to delete all of them (is there a *.* delete for this step?)

[{"id":"76b8bd875937cd50","type":"group","z":"114ac201cf643935","style":{"stroke":"#999999","stroke-opacity":"1","fill":"none","fill-opacity":"1","label":true,"label-position":"nw","color":"#a4a4a4"},"nodes":["8dddce38c5ba775f","a6486c81eb4a409c","f24eaf38e8d2b698","90a3c38879e4403f","dc7b1a312f68ac14","567058326adf6d55","f04246036fe31be2","5c58f9430337ea1f","056248a6508b9eee","6fff3809badcd06c","ed46dca5f1f7bf03","7b688b20f13efa15","87c6c5c32d271be5"],"x":194,"y":179,"w":832,"h":322},{"id":"8dddce38c5ba775f","type":"inject","z":"114ac201cf643935","g":"76b8bd875937cd50","name":"","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"topic1","payload":"{\"item\":\"one\",\"item-2\":\"two\",\"item-3\":\"three\",\"item-4\":\"four\",\"item-5\":\"five\"}","payloadType":"json","x":290,"y":220,"wires":[["a6486c81eb4a409c","f24eaf38e8d2b698"]]},{"id":"a6486c81eb4a409c","type":"debug","z":"114ac201cf643935","g":"76b8bd875937cd50","name":"debug 2822","active":false,"tosidebar":true,"console":false,"tostatus":true,"complete":"payload","targetType":"msg","statusVal":"payload","statusType":"msg","x":910,"y":220,"wires":[]},{"id":"f24eaf38e8d2b698","type":"junction","z":"114ac201cf643935","g":"76b8bd875937cd50","x":380,"y":280,"wires":[["90a3c38879e4403f"]]},{"id":"90a3c38879e4403f","type":"split","z":"114ac201cf643935","g":"76b8bd875937cd50","name":"","splt":"\\n","spltType":"str","arraySplt":1,"arraySpltType":"len","stream":false,"addname":"","property":"payload","x":690,"y":280,"wires":[["dc7b1a312f68ac14","567058326adf6d55"]]},{"id":"dc7b1a312f68ac14","type":"debug","z":"114ac201cf643935","g":"76b8bd875937cd50","name":"debug 2823","active":false,"tosidebar":true,"console":false,"tostatus":true,"complete":"payload","targetType":"msg","statusVal":"payload","statusType":"auto","x":910,"y":280,"wires":[]},{"id":"567058326adf6d55","type":"delay","z":"114ac201cf643935","g":"76b8bd875937cd50","name":"","pauseType":"rate","timeout":"5","timeoutUnits":"seconds","rate":"1","nbRateUnits":"5","rateUnits":"second","randomFirst":"1","randomLast":"5","randomUnits":"seconds","drop":false,"allowrate":false,"outputs":1,"x":720,"y":340,"wires":[["6fff3809badcd06c"]]},{"id":"f04246036fe31be2","type":"change","z":"114ac201cf643935","g":"76b8bd875937cd50","name":"Flush 1","rules":[{"t":"delete","p":"payload","pt":"msg"},{"t":"set","p":"flush","pt":"msg","to":"1","tot":"num"}],"action":"","property":"","from":"","to":"","reg":false,"x":480,"y":340,"wires":[["567058326adf6d55"]]},{"id":"5c58f9430337ea1f","type":"change","z":"114ac201cf643935","g":"76b8bd875937cd50","name":"Reset","rules":[{"t":"set","p":"reset","pt":"msg","to":"true","tot":"bool"}],"action":"","property":"","from":"","to":"","reg":false,"x":470,"y":460,"wires":[["567058326adf6d55"]]},{"id":"056248a6508b9eee","type":"change","z":"114ac201cf643935","g":"76b8bd875937cd50","name":"Flush All","rules":[{"t":"set","p":"flush","pt":"msg","to":"true","tot":"bool"}],"action":"","property":"","from":"","to":"","reg":false,"x":480,"y":400,"wires":[["567058326adf6d55"]]},{"id":"6fff3809badcd06c","type":"debug","z":"114ac201cf643935","g":"76b8bd875937cd50","name":"debug 2824","active":false,"tosidebar":true,"console":false,"tostatus":true,"complete":"payload","targetType":"msg","statusVal":"payload","statusType":"msg","x":910,"y":340,"wires":[]},{"id":"ed46dca5f1f7bf03","type":"inject","z":"114ac201cf643935","g":"76b8bd875937cd50","name":"","props":[{"p":"payload"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"hello","payloadType":"str","x":290,"y":340,"wires":[["f04246036fe31be2"]]},{"id":"7b688b20f13efa15","type":"inject","z":"114ac201cf643935","g":"76b8bd875937cd50","name":"","props":[],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":290,"y":460,"wires":[["5c58f9430337ea1f"]]},{"id":"87c6c5c32d271be5","type":"inject","z":"114ac201cf643935","g":"76b8bd875937cd50","name":"","props":[],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","x":290,"y":400,"wires":[["056248a6508b9eee"]]}]

in the attached example, the inject node with a payload 'hello' simulates my exact flow, but with more number of properties. injecting the 'hello' payload doesn't add to the queue, since i have a delete msg.payload before msg.flush in the change node.

Instead of using the complexities of flushing, I would remove the empty items (ie. the items you are not interested in) before the split ?

1 Like

I think you answered you own problem, you have to delete any other properties or in a function node

return {flush:1}
1 Like

yes. did the trick!

1 Like

sequence of events for any future visit

I might try adding a switch node to filter the empty messages first. IE, if msg.payload is empty, have the switch send it to output 1, which is connected to nothing. Then add a rule for "Otherwise" to output two, which then feeds the rate limit.

the empty 'message' isn't generated until i process the output from the split node first.
bulk message--> split -->rate limt-->process-->may be empty/may not be empty-->if empty waits till rate limit is sending next message(this is where i need flush)

I read this as, there is an empty message reaching the rate limit node, and you're adding a flush to remove it. If the empty message is queued by rate limit, which is why it would be flushed, it seems more sensible to me to eliminate it before the rate limit, rather than add several more layers of complexity to deal with it after the rate limit. If the empty message isn't generated until after rate limit, I'm clearly not understanding, but if it's solved, doesn't really matter.

there is no empty message before the rate limit. all are valid messages, because of the length of the payload incoming, the time taken to process is different for each message, so to avoid conflict in database access, i have given sufficient time (rate limit) to allow the message to process. however, some messages are either very short, or empty, after they are processed (not before the rate limit node), and hence I flush to expedite.
I have no way of knowing whether a message will be empty or not, before processing. (or even short for that matter, which may not require the entire wait period)

You can use the flow shown here instead of your rate limit -> process. It does not use a fixed rate limit but releases the next message when the previous one has been completed however long it takes, so it will automatically adapt the rate to maximise the throughput.

That is precisely how I have done now by following @E1cid's instruction. Using a function node to send flush:1 ( I already had the same logic, however, along with flush :1, I was sending bunch of other unintended messages, so it was getting added in queue.

Are you using flush with all messages or just empty ones?

All, so if a message processes, faster, it doesn't need to wait till the rate limit period set in the node.

Exactly. Set the time in the rate limit to longer than it should ever take, so the normal operation is to use the flush to release the next one as soon as possible. The rate limit time is just in case the previous message gets lost somehow. That sort of thing can happen if you deploy while the messages are going through.

1 Like

I have put 2.5 times, but it really doesn't matter with the flush in place!

Wish I had seen that post earlier, but i developed that logic from scratch (self pat on my back :wink: ) .
my only struggle was the flush message was adding into the queue as i was unaware how to delete a whole lot of messages(topic/payload/cron/and several other messages added along the flow) return {flush:1} was new to me.

1 Like

Understood better now with further explanation. :+1:

1 Like

When i see my original post now, it does seem misleading that empty messages are BEFORE the rate limit node. I should read my post after composing and before sending.

1 Like