ETL Pipelines with Node-RED?

I not quite sure how that would work since the PipeEnd node also creates the pipeline, so it will always be needed. Are you saying that for the PipeEnd to know something is a streaming node, it would check whether a node has that config node as reference? I.e. the config node would act as a type of marker?

I would definitely not move the implementation of the stream to that config node. The code for creating the stream should be on the node itself, e.g., CsvStream.

This binding of stream creation to the node allows others to create stream nodes without having to touch the original streaming package.

If Joe 'Bob' Bloggs were to implement a StreamBanana node then she would only need to a) add the node id to the _streamPipeline array on 'input' and b) add a createStream function to the nodes JS file.

PipeEnd goes through the _streamPipeline array and sequentially calls createStream, finally creating a pipeline from those streams.

There are some subtleties around how to handle evaluateNodeProperty, i.e. when that should be called. My nodes call it when the first message comes through but a second msg is passed to the createStream function. That msg object is the one that the PipeEnd node received. So in theory, the node could call evaluateNodeProperty in createStream ...

1 Like

Fascinating discussion! I have to admit I've not thought through all the ramifications, but my first thought after looking at your pipe flows above was that the pipestart and pipeend nodes are "just" bounding the stream processing (i know there is way more than that! ;*).

To simplify the node-red diagrams, what about adding streaming support to either "groups" or "subflows"? Yes it would mean extending the core with a different internal processing implementation -- but then you could build whatever flows you want inside that streaming container. The only caveat would be that all those included nodes would have to support streaming mode.

Initially it may be just these new nodes that you have working -- but over time the core nodes could be updated to include streaming support (or verify that they already work with streams?). For instance, the debug node onMessage(...) function takes the incoming msg object and writes it to the sidebar (console, status, whatever) which seems to "inherently" resemble streaming. So mark that node with stream support, and let the core processing listen on the stream and feed each incoming message to it like usual -- no real code changes, I would think?

Perhaps this is greatly over-simplified, but what a huge leap forward for node-red if it supported both paths. Feels similar to how Promise chaining in JS simplified our handling of "callback hell".

1 Like

OK, it was just a thought. I was thinking that the stream management would all be in the config node. Not to worry. No point in having the complexity of a config node if it is only being used for a flag.

1 Like

PipeStart just adds an array to the message and PipeEnd executes the streams - so not too much more :slight_smile:

In the ETL flow, I now use subflows to simplify the flow --> a somewhat too colourful comparison of the two flows.

The point is that I have actually created several subflows to encapsulate certain common behaviour --> the Get2Disk node is a subflow that contains the PipeStart and PipeEnd (edit subflow template works) nodes. So it's possible to use subflows and not really necessary to create a new concept of wrapping streaming nodes.

Also in creating subflows, I begin to create "ETL" behaviour (i.e. retrieve and store data) instead of just streaming A to B. There is also a Path2JsonL subflow which assumes there is a path attribute with a .jsonl file that is then streamed into a JsonLStream node --> that's ETL semantics, not streaming.

:+1: Brainstorming ideas is great!

Here we begin to mix terminology, I should say that the "streaming" I'm talking about here is the streaming API defined by NodeJS.

Node-RED is a message streaming tool. So it certainly does stream already but it does not support the streaming API from NodeJS.

The streaming API is more about having byte streams interconnected between javascript components with each component doing something with the data on-the-fly. I.e. the entire file is never completely in memory, only chunks of it.

What the intention of my work here is to "data to message streamification": large CSV/JsonL files are converted into a stream of Node-RED msg objects without generating large memory footprint or large msg objects flowing through Node-RED. CSV/JsonL files are line based, meaning they consist of lines of independent data, each line can be represented by a single msg object.

Thanks for that clarification... so even though the csv node already has an option to "output each line as individual messages" it still reads the entire .csv file into memory before the first line is sent out?

Yes, it is a severe limitation of most of the nodes. Someone created the "bigxxxxxx" nodes a long time back where I think he tried to work around these issues but this is a better solution for sure.

1 Like

I have just updated the serverless Node-RED installation to include the pipestream nodes, so the above two links can be used to explore the pipestream nodes in the the ETL pipeline. i.e., their are no longer red marked as being missing.

1 Like

Though the file node can read in a line at a time to then send to the csv node, which knows that the parts come from a single file, so can do the right thing with headers/columns, but yes - other nodes , not so much.

1 Like

This is all great work, and is something I have wondered about for a long time, it’s great to see this progress, but my head scratcher is how to ā€œcross the streamsā€ so to speak, for example how can I search in a stream for a sequence of header bytes and then send a chunk to say mqtt ?

IE how do I jump from the stream world out to flow world and indeed maybe back in.

2 Likes

For that, check out the code for the LineStream node. What happens is that there is a ByLine streamer that does minimal buffering before passing on a line to the next stream (in this case the inline Transformer that sends the message).

This node is inserted in a pipeline after something like a FileStream which is a Readable stream.

There are three types of streams: Writable, Readable and Transformer - all pipelines do something like Readable => Transformer => Writable whereby there can be as many Transformers as required. Pipelines are just a collection of streams. The LineStream node adds two Transform nodes to the pipeline: one to create lines and the other to send a msg object into Node-RED for each line that comes along.

This is all explained with my very high-level understanding of Streaming in JS. I suspect, for example, that the LineStream node is actually wrong because it does not pass on its content - that should could done better. I don't quite understand how to complete a pipeline or how to split pipelines along different paths.

I understand when you say "a line" - but what defines "a line" in non-text data ? eg video frames ? Is that defined in the Readable part ?

That example transformer is strictly for line separated textual data.

You'd need to implement one for your specific data format, if you need to split it into meaningful chunks.

I use that for example to decode and create messages from binary data streams, where the data length is encoded in a header for each message.

1 Like

It's not the Readable that defines (at least in my experience) the meaning of the data, all the Readable does is generate chunks of data until there is no more data.

It's the Transformer that assigns "meaning" to the data, i.e., frame or line or packet or whatever.

In the context of ETL pipelines, I could imagine that a line is defined, then transformed to CSV object and then the CSV object is modified according to some business logic and finally the csv object is streamed into Node-RED. Or perhaps the CSV object is streamed directly into some data sink (i.e. database, data warehouse or some message bus). So I could imagine that the ETL pipeline is created, maintained and modified in Node-RED but, when executed, the data is streamed completely passed NR and directly from web/data source to database/data store without touching NR.

3 Likes

Just a heads up, the package is now called node-red-streaming and has gotten a more comprehensive readme describing much of what we have discussed here.

4 Likes

Most common requirements in my world is that the source is already CSV, or sometimes multiple CSV's and the pipeline would convert a line to an object and then a "business" transformation applied - most likely filter and/or grouping. Final stage would either be output to a new file or direct to HTML display. HTML output most likely because otherwise I'd probably be using a different tool to be honest.

For me, I can't think of a reason I'd be streaming anything other than some structured data. There are better tools for handling streaming media I think and I'd only use Node-RED to manage a menu or some other metadata display.

1 Like

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.