ETL Pipelines with Node-RED?

Hi There,

So when someone asks me whether Node-RED can be used for ETL (Extract Transform Load) pipelines I say No. No because of the size of data involved: sending a 1GB message through Node-RED isn't for the fainthearted and data pipelines can get large.

But then I came across Open Sanctions which has open sourced all their data crawlers, so I thought I might have a try to emulate what they do in Python but using NR.

Anyway original I wanted to investigate handling all the various formats (zip, tar.gz, json, csv, jsonl, bz2, xlsx) that they have to deal with and whether NR could also handle them. It definitely can. Now am trying to find a structure that makes it easy to add new crawlers and also represent the ETL ideas.

So far, I have really only implemented the 'E' and a but of the 'T' for a subset of all their crawlers but I must say, it's very straight forward and simple to do in NR.

My question is really has anyone else done this and secondly what are some of the learnings from having done ETL with NR? Are their any example flows for doing ETL with NR or is really something that shouldn't be done with NR?

For those interested, my flow is available at FlowHub.org in static form and over here in the Node-RED static-but-a-bit-dynamic-only-in-the-browser instance.

Cheers and I hope it helps :slight_smile:

To me, this is one of the limitations of Node-RED and I really would like it to be stronger.

I've done ETL but tend to start with a CSV and use a library to do the transform in a function node.

I also started creating a JavaScript library to do some reshaping of JSON data which is a common ETL requirement. Mostly about being able to reshape tabular data into custom groupings.

1 Like

Agree. Question is how to make that happen ... certainly msg cloning would need to be more selective. I had an issue of joining on nine messages each of them wrote a file to disc. After the write, the msg.payload of each msg was the contents of the file. Joining nine of them blew the stack .... so I had to remove the payload before joining on the nine msgs (just needed the join to know when all writes were completed.)

A level above that (it's also where I am at), considering which formats are missing in NR? I discovered that there is a json lines format for streaming Json formatted data - it's not valid json since its a list of newline-separated json objects (without commas and brackets).

So a JsonL node might be useful.

Just hit a max string length of 500MB issue in jszip (used in the zip node) - which makes handling large json files (>3GB) rather difficult.

2 Likes

Here are two nodes that would make ETL with Node-RED simpler:

  • web2disk - web request are written directly to disk

What I'm doing now is linking a http request node to a write file node which has the side effect of generating a 600MB message (since the file happens to be 600MB). That msg object is generated by the http request node and then passed to the write file node. Plus the msg would be cloned again for each additional wire connected to the http request (think debug node).

  • disk2stream - semantic knowledge of content to generate msg streams

So this node would take a file on disk, unzip/untar/ungzip the file, recognise its contents and then stream the data as individual messages. For example, CSV is a perfect candidate for message streaming: grab the header line and each subsequent line becomes a message with payload object using the header line as keys.

If the content isn't streamable, then a big blob would be returned - not ideal but better than currently: read file, (giant msg), json parser (more memory) and then many wires that each clone the object.

Also one other thing that I have started to do is to create more node.status calls - because operations take so much longer, its nice to see where in the flow the message is. This is more important since message take so much longer.

Anyway my 2 cents ...

1 Like

Sounds like some solid ideas to me.

Unfortunately, JavaScript lacks the choice of tools that Python has for dealing with datasets - though many packages claim to offer similar features - they don't really seem to at all or plain just don't work. That's where my mind has been at.

But getting more efficient large-file handling would be a great start.

2 Likes

Exactly ... and because I can't simply have an idea and not implement it straight away, Sunday evening is going to be spent in front of the laptop :wink:

Step by step, once I have these nodes, there will be the next hurdles.

That is true but what JS has good streaming APIs (at least I from what I have seen of JS) - one can take advantage of that.

I'm actually quite interested to see how Node-RED will react once a million messages a second go through it :wink:

1 Like

So I've created an initial version and it does what I basically want:

web2disk2

including a progress indicator which makes NR to a ..... dashboard! :slight_smile:

Since this time I did RTFM, I found an extreme simple example in the got docu that does it.

1 Like

Don't you want this?

You probably want Ky instead, by the same people. It's smaller, works in the browser too, and is more stable since it's built upon Fetch.

1 Like

:+1:

Since they still maintain Got, I'll stick to that for the time being but I will have a look at using Ky for the in-browser Node-RED story ... that might better than using $.get(...)

The file reader node can deliver one line or chunk at a time which can help reading large files. Not sure if the kazoo library could handle chunks but would be worth a look.

thanks for the tip, didn't see that :+1:

but most of the files I'm dealing with are archives and what I want to do is stream gunzip and then stream tar extract ... then each file in the tar becomes a message. I could go a step further and analyse the type of file and stream each line instead of the file but I'll keep it on file level for the moment.

got a nice example archive that I'm working with: a million files in a 600MB tgz!

the message counter on the debug node isn't keeping up .... :wink:

Don’t use a forked debug node. Better create your own counter in an inline function node :slight_smile:

But yes. Other input node types that stream tar or zip would be good.

1 Like

current situation:

etltarball

one file, one message ... not according to the debug message counter :frowning:

tar and tgz is already working (see gif) ... looking for a good streaming library for zip files ...

Edit: btw memory is flat and one core is burning - so the streaming is hitting the spot. All that is now required would be multi-core support ....

I think that, realistically, you would need something with a C/C++ library for that. That would be the most efficient anyway. Multi-threading in Node.js itself is pretty limited AFAIK.

1 Like

Out of interest, what is deno? Not being personally involved in the JS scene but having heard of it. I ask because I wonder whether that will be more multi threaded.

Deno is a new alternative to node.js. https://deno.com

It still uses the V8 JavaScript engine so probably no more multi-threaded than node.js. It has other advantages. It is better at using modern standard JavaScript, better security, etc.

But it is early days for it. We will see what happens with server JavaScript. Suffice it to say that node.js now has some competition which should, in time, be a good thing.

1 Like

All of my experiences using Node-RED for ETL have been very positive -- the ability to quickly move data from one place to another is tailor made for flowgrams, imo. However, my projects never involved such large files... instead I've only dealt with sql datasets and/or rest apis, so I'm inherently dealing with individual records. The challenge with large queries is to avoid connection timeouts, so that generally requires smart (manual) paging of the data sets.

This is key to long-running flows, along with the ability to restart after a failure. This is the one case where I might use flow context, since I'm the only one running the ETL flows. Checkpoint the last record index in a flow variable so I can inject it into the query logic.

The goal for me was to run the node-red server as fast as possible without blowing up the memory stack -- the best practice I settled on was to use the semaphore nodes. So before the "transform" step, the flow uses semaphore-take to pick up a ticket (or wait until one becomes available), and after the data is loaded then use semaphore-leave to return the ticket to the pool.

This makes it easy to tune the flow for maximum through-put. Make the semaphore pool larger for machines with more memory/cpu, or even drop the pool size to 1 if the records need to be processed sequentially.

2 Likes

Often when I end up messing with large files (like video or large log files) I often use tools outside of Node-RED to feed them in. Things like pv in Linux can both rate limit and also show progress then pipe that to a socket in Node-RED to process the chunks.

Just to give an update on getting my data through NR:

Screen Shot 2024-01-08 at 00.23.37

It was in fact:

tar tfz cz_business_register.xml.tgz  | wc -l
 1180669

one too many but that was only because the node sends a complete msg at the end :wink:

Nothing crashed, no memory leak, it just did the job - even though it might have taken a little longer than I'd like.

Even decoding and transforming to XML, it all worked.

Hats off too all who made Node-RED such a solid piece of software :+1:

And hats off too all who made Node-RED so flexible that I could bang together a streaming node, get it working and integrated in no time at all!

Edit: ETL nodes involved were created in Node-RED for Node-RED :wink:

1 Like

Run it again without any branches in the hot path (to avoid the message cloning)

If you need to measure time for comparison, use something like flow-timer subflow node at the initiation point (stop it upon completion) (i.e not in the hot path)