Problems with JOIN node

I’m having a lot of trouble getting a JOIN node to work properly.

My application involves two systems: an ESP32 development board , with OV2640 camera, which periodically takes a jpeg image; and an RPi (with Node Red installed) which receives the image and eventually will display it on a web page. Both devices are on the same local Wifi network. A single image is transmitted every 10 minutes or so.

The ESP32 breaks up the image into ~2500-byte chunks and sends each chunk by MQTT as a buffer, with control information embedded in the first few bytes of each buffer. Images are about 40-50kb, so each image takes maybe 30 MQTT messages.

Here’s the overall flow:

I am trying to use the Automatic mode of the Join node to reassemble the image chunks into the original file.

Here’s the function code, which prepares the “parts” object so the Join can operate in Automatic mode:

//input is a byte buffer. 
//The first part is a null-terminated info string
//the last part, starting after the null terminator, is image pic data

//First, extract the info string.

//find the end of the text string
for (var txtEnd = 0; msg.payload[txtEnd] != 0; txtEnd++) { };

//Now extract and parse it. The variables are in hex format
var strBuf = msg.payload.slice(0, txtEnd); 
var txt = strBuf.toString('ascii');   // "pictureNumber,msgCount,nBytes,msgIndex"
var txtSplit = txt.split(',');
var picNo = parseInt(txtSplit[0],16);
var msgCount = parseInt(txtSplit[1],16);
var msgIndex = parseInt(txtSplit[2],16);
var nBytes = parseInt(txtSplit[3], 16);

//Get the pic buffer on its own
var bufStart = txtEnd + 1;
var picFragment = msg.payload.slice(bufStart, bufStart + nBytes-1);

//build the output message
var msg2 = {};

msg2.payload = picFragment;
msg2.parts = {
    "id": picNo,
    "count": msgCount,
    "index": msgIndex,
    "type": "buffer"
};
    
return msg2;

My problem is when the final chunk is passed to the Join node, I see an error which makes no sense to me: "TypeError: The "list[0]" argument must be an instance of Buffer or Uint8Array. Received undefined". I don’t have a “list” referenced anywhere, and the chunk that the join node is working on is clearly a buffer as shown by the output of Debug “split incoming”.

The Join node only complains when the last node is received. Here’s the Debug output from the final two messages:

Finally, I have created two test flows.

The first joins three binary buffers with manual injection, and works without error.

The second test flow receives non-picture buffers from a trial app on the ESP32, tested with varying numbers and sizes of buffer, and varying delays between. I initially prefixed each sequence with a “reset” message, thinking some of the problem may be due to incomplete buffers from a previous run.

This did initially throw a range of errors, including the one shown, but also sometimes associated with a non-final message. However, once I removed the “reset” message, the second test flow behaved as expected, even with longer buffers than I’m currently using, and at a shorter interval between.

I’m at a loss as to how to proceed, any suggestions???

I would convert the image data to an array an join the array. At the end, when the array is complete, convert it to a buffer.

chrome_5kit52X3YB

[{"id":"cde0785f85b34c2a","type":"function","z":"3f7b4ae8acfceb94","name":"extract data","func":"//input is a byte buffer. \n//The first part is a null-terminated info string\n//the last part, starting after the null terminator, is image pic data\n\n//First, extract the info string.\n\n//find the end of the text string\n/** @type {Buffer} */\nconst payload = msg.payload\nconst txtEnd = payload.findIndex(byte => byte === 0)\nif (txtEnd < 6) {\n    // since 1,1,1,1 + a null is always expected, check txtEnd is not < 6\n    node.error(\"Expected to find string terminator\", msg)\n    return\n}\n\n//Now extract and parse it. The variables are in hex format\nconst strBuf = payload.subarray(0, txtEnd);\nconst txt = strBuf.toString('ascii');   // \"pictureNumber,msgCount,nBytes,msgIndex\"\nconst txtSplit = txt.split(',').map(e => parseInt(e, 16))\nconst [picNo, msgCount, msgIndex, nBytes] = txtSplit\n\n//Get the pic buffer on its own\nconst bufStart = txtEnd + 1\nconst picFragment = payload.subarray(bufStart, bufStart + nBytes)\n\n//build the output message\nconst msg2 = {\n    payload: [...picFragment],\n}\n\nif (msgCount === (msgIndex + 1)) {\n    msg2.complete = true\n}\n\nreturn msg2;","outputs":1,"timeout":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":950,"y":480,"wires":[["a0e6bb5dc605ffa5"]]},{"id":"a0e6bb5dc605ffa5","type":"join","z":"3f7b4ae8acfceb94","name":"","mode":"custom","build":"array","property":"payload","propertyType":"msg","key":"topic","joiner":"\\n","joinerType":"str","useparts":false,"accumulate":false,"timeout":"","count":"","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":950,"y":520,"wires":[["9d3ea4dcfd0b58b3"]]},{"id":"9d3ea4dcfd0b58b3","type":"function","z":"3f7b4ae8acfceb94","name":"flatten to buffer","func":"const flat = msg.payload.flat()\nmsg.payload = Buffer.from(flat)\nreturn msg;\n","outputs":1,"timeout":0,"noerr":0,"initialize":"","finalize":"","libs":[],"x":980,"y":560,"wires":[["d007f3d489dab112"]]},{"id":"d007f3d489dab112","type":"debug","z":"3f7b4ae8acfceb94","name":"image data (should be [1,127,128,253,254,255]","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"payload","targetType":"msg","statusVal":"","statusType":"auto","x":880,"y":600,"wires":[]},{"id":"93892a0e0855afcd","type":"junction","z":"3f7b4ae8acfceb94","x":830,"y":480,"wires":[["cde0785f85b34c2a"]]},{"id":"936fb43a5b436662","type":"group","z":"3f7b4ae8acfceb94","name":"using buffer-maker from the \\n node-red-contrib-buffer-parser pkg \\n to simulate your data","style":{"label":true,"color":"#3f3f3f"},"nodes":["a0e10efdbe9e1db5","676f28a9e0511233","feb0215eb1b6b0f0","29171ad15050388a","0a2808b4a3be35d5","eafea7611436d593"],"x":394,"y":367,"w":352,"h":194},{"id":"a0e10efdbe9e1db5","type":"inject","z":"3f7b4ae8acfceb94","g":"936fb43a5b436662","name":"sample 1","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":500,"y":440,"wires":[["676f28a9e0511233"]]},{"id":"676f28a9e0511233","type":"buffer-maker","z":"3f7b4ae8acfceb94","g":"936fb43a5b436662","name":"1","specification":"spec","specificationType":"ui","items":[{"name":"item1","type":"ascii","length":-1,"dataType":"str","data":"1,3,0,1"},{"name":"item2","type":"byte","length":1,"dataType":"num","data":"0"},{"name":"item3","type":"byte","length":1,"dataType":"num","data":"1"}],"swap1":"","swap2":"","swap3":"","swap1Type":"swap","swap2Type":"swap","swap3Type":"swap","msgProperty":"payload","msgPropertyType":"str","x":630,"y":440,"wires":[["93892a0e0855afcd"]]},{"id":"feb0215eb1b6b0f0","type":"buffer-maker","z":"3f7b4ae8acfceb94","g":"936fb43a5b436662","name":"127,128","specification":"spec","specificationType":"ui","items":[{"name":"item1","type":"ascii","length":-1,"dataType":"str","data":"1,3,1,2"},{"name":"item2","type":"byte","length":1,"dataType":"num","data":"0"},{"name":"item3","type":"byte","length":1,"dataType":"num","data":"127"},{"name":"item4","type":"byte","length":1,"dataType":"num","data":"128"}],"swap1":"","swap2":"","swap3":"","swap1Type":"swap","swap2Type":"swap","swap3Type":"swap","msgProperty":"payload","msgPropertyType":"str","x":640,"y":480,"wires":[["93892a0e0855afcd"]]},{"id":"29171ad15050388a","type":"buffer-maker","z":"3f7b4ae8acfceb94","g":"936fb43a5b436662","name":"253,254,255","specification":"spec","specificationType":"ui","items":[{"name":"item1","type":"ascii","length":-1,"dataType":"str","data":"1,3,2,3"},{"name":"item2","type":"byte","length":1,"dataType":"num","data":"0"},{"name":"item3","type":"byte","length":1,"dataType":"num","data":"253"},{"name":"item4","type":"byte","length":1,"dataType":"num","data":"254"},{"name":"item5","type":"byte","length":1,"dataType":"num","data":"255"}],"swap1":"","swap2":"","swap3":"","swap1Type":"swap","swap2Type":"swap","swap3Type":"swap","msgProperty":"payload","msgPropertyType":"str","x":650,"y":520,"wires":[["93892a0e0855afcd"]]},{"id":"0a2808b4a3be35d5","type":"inject","z":"3f7b4ae8acfceb94","g":"936fb43a5b436662","name":"sample 2","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":500,"y":480,"wires":[["feb0215eb1b6b0f0"]]},{"id":"eafea7611436d593","type":"inject","z":"3f7b4ae8acfceb94","g":"936fb43a5b436662","name":"sample 3","props":[{"p":"payload"},{"p":"topic","vt":"str"}],"repeat":"","crontab":"","once":false,"onceDelay":0.1,"topic":"","payload":"","payloadType":"date","x":500,"y":520,"wires":[["29171ad15050388a"]]}]

Extract data fn

//input is a byte buffer. 
//The first part is a null-terminated info string
//the last part, starting after the null terminator, is image pic data

//First, extract the info string.

//find the end of the text string
/** @type {Buffer} */
const payload = msg.payload
const txtEnd = payload.findIndex(byte => byte === 0)
if (txtEnd < 6) {
    // since 1,1,1,1 + a null is always expected, check txtEnd is not < 6
    node.error("Expected to find string terminator", msg)
    return
}

//Now extract and parse it. The variables are in hex format
const strBuf = payload.subarray(0, txtEnd);
const txt = strBuf.toString('ascii');   // "pictureNumber,msgCount,nBytes,msgIndex"
const txtSplit = txt.split(',').map(e => parseInt(e, 16))
const [picNo, msgCount, msgIndex, nBytes] = txtSplit

//Get the pic buffer on its own
const bufStart = txtEnd + 1
const picFragment = payload.subarray(bufStart, bufStart + nBytes)

//build the output message
const msg2 = {
    payload: [...picFragment],
}

if (msgCount === (msgIndex + 1)) {
    msg2.complete = true
}

return msg2;
1 Like

Thanks Steve, that approach seems to be working, although the data has got corrupted somewhere along the way and is no longer recognisable as an image file when saved to the rPi. Could be my encoding algorithm on the ESP32. I'm investigating and will report back!
Chris.

All working well now. My remaining issues were the encoding in the File Save node (needed "default", I was using Binary) and some permissions on the rpi. @Steve-Mcl approach did the trick, thanks again.

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.