Exec and daemon nodes break up a data stream

Morning guys,
Dave's function node is a very good code snippet to get you started! By concatenating the chunks you can easily search for the magic bytes, even across chunk boundaries. I started my multipart-decoder node developments the same way, but it became very quickly visible that my raspberry became very heavily loaded (see screenshots) by this:

  • For every chunk the ENTIRE buffer is scanned (from the start) to find the index of the magic bytes. However that can easily be solved by remembering until which index you have already searched, and pass that to the next 'indexOf' call.
  • For every chunk the ENTIRE buffer (that you have collected until now) will be copied, due to the 'concatenate'. This involves a huge amount of bytes being copied over and over again. I solved that by pushing the chunks into an array, and do only a single concatenate when all chunks of a buffer had arrived. Disadvantage is that this makes it harder to find magic bytes across chunks in the array. Had solved that finally by using another npm library which offers a Buffer interface facade for N other buffers underneath. Caution: multiple npm libraries are available, but I found out that not all of them handled the bytes copying efficiently...

That is one of the reasons why I started last month writing a Buffer-node, to help users to accomplish things like this (e.g. for audio and video). But it is not completed yet...
Bart

I thought I'd share my solution.

I wrote an rtsp to mqtt converter program in Python3 using OpenCV 3.3.0 (newer also works, I've also ran it on 4.1.0) this makes an easy way to get rtsp frames into node-red for whatever.

Here is the flow:

[{"id":"97ee92da.48ade","type":"image","z":"ab325561.062ff8","name":"","width":"640","x":510,"y":125,"wires":[]},{"id":"2f0af8f5.46b718","type":"mqtt in","z":"ab325561.062ff8","name":"","topic":"MQTTcam/#","qos":"0","broker":"40ff988f.c50d08","x":105,"y":180,"wires":[["b0222fcd.984"]]},{"id":"b0222fcd.984","type":"switch","z":"ab325561.062ff8","name":"","property":"topic","propertyType":"msg","rules":[{"t":"cont","v":"MQTTcam/0","vt":"str"},{"t":"cont","v":"MQTTcam/1","vt":"str"},{"t":"cont","v":"MQTTcam/2","vt":"str"},{"t":"cont","v":"MQTTcam/3","vt":"str"}],"checkall":"true","repair":false,"outputs":4,"x":270,"y":180,"wires":[["97ee92da.48ade"],["aabd2b10.c065a8"],["ade96b08.61b588"],["cfb3ced0.bdeae"]]},{"id":"aabd2b10.c065a8","type":"image","z":"ab325561.062ff8","name":"","width":"640","x":1150,"y":125,"wires":[]},{"id":"ade96b08.61b588","type":"image","z":"ab325561.062ff8","name":"","width":"640","x":510,"y":600,"wires":[]},{"id":"cfb3ced0.bdeae","type":"image","z":"ab325561.062ff8","name":"","width":"640","x":1150,"y":600,"wires":[]},{"id":"40ff988f.c50d08","type":"mqtt-broker","z":"","name":"localhost","broker":"localhost","port":"1883","clientid":"","usetls":false,"compatmode":false,"keepalive":"60","cleansession":true,"birthTopic":"","birthQos":"0","birthPayload":"","closeTopic":"","closeQos":"0","closePayload":"","willTopic":"","willQos":"0","willPayload":""}]

And here is the Python3 script for rtsp to mqtt conversion:
rtsp2mqtt.py.txt (9.9 KB)

Edit the rtspURL array to match your rtsp sources, by default its set to the two "public" streams we've been using here.

If anyone know how to reduce the rtsp latency, I'd love to learn how!

My system was serving up ~50fps from eight rtsp streams. My DVR and China Onvif cams are set for 5 fps, so that accounts for ~30 fps. presumably the Big Buck Bunny and Beach web streams account for the remainder.

2 Likes

Hi, @wb666greene,
I'm interested in your Flow because my cameras are also Onvif (pending for the final bartbutenaers : node-red-contrib-onvif-nodes). I have problems decoding the stream RTSP "Unknown image format" in 1080p (in 360p is a good can).

My question (sorry for my Raspberry newbie): I guess you run the Python OpenCV script, on the Raspberry ... how do you do it? :blush:

Thank you

The RaspberryPi is not a good choice for 1080p rtsp decoding. I have an i5 "Mini PC" that decodes the rtsp streams and feeds the images to Pi via MQTT.

I've a "new" version that that acts more like the Onvif snapshots in that I send an MQTT message that requests the camera I want an image from and the current image from that camera is returned as an MQTT buffer. Didn't make much difference on my desktop, but the Pi, XU-4 etc can't keep up with the data flow and latency grows as buffers pile up in the network layer until it runs out of memory. Making it work sort of like an Onvif snapshot request solves the problem.

Other than this I'm not sure I understand what you are asking?

I would like to know where you run your OpenCV script (which transmits the RTSP stream of the camera via MQTT)?
For Node-red, I understood that it connects to the MQTT Server and displays the video stream.

yes I think like you. By adding more complex functions the RPI begins to saturate. That's why I also test [ANNOUNCE] Node-RED Desktop v0.9.1 Released for Win10 or MAC. Node-red with the power of a PC :star_struck:

I said I run it on an i5 "MiniPC" I guess I forgot to mention its running Ubuntu 16.04. I developed it on my i7-6700K desktop also running Ubuntu 16.04

Here is the current version:

rtsp2mqttPdemand.txt (13.7 KB)

Basically you publish a message to the broker on the host running the rtsp2mqtt script
client.publish(str("sendOne/" + str(camNum)), "", 0, False)

And it publishes the current image from the decoded rtsp stream for that camera as topic MQTTcam/n
where n is the camera number in the sendOne topic.

1 Like

Ok I see. It's a little different from what I thought.
Thank you for the precision, and for sharing.

Also, please note that you can also run Node-RED on Windows and Mac already without using this desktop edition.

yes, I tested the installation with Windows powerShell, but not as trivial as the Desktop version which adds functions like: search for update at startup, button restart (after update palette for example), window autonomous (does not open of webrowser). The latest version included the persistent global context ... Really nice work of @sakazuki .

"Derived from rtsp2mqtt.py modified to use multiprocessing instead of threading"

Did you notice any difference/improvements? Why did you change? Just curious...

In all my other AI tests threading was significantly better than mp, but for the RTSP stream decoding with OpenCV mp performed significantly better. The nice thing about having this little i5 "MiniPC" do the RTSP decoding is the AI resumes after the Lorex DVR reboots (which has been happening a lot lately, failure eminent?).

Apparently the IPC overhead of mp negates any improvement that mp might have over threading. My current code is able to use multiple NCS or mix and match CPU AI threads (mostly useless on Pi class machine), Coral TPU, and NCS/NCS2 threads. It also can mix and match Onvif snapshots, localhost RTSP decoding, and "MQTT cam" input sources.

If you have USB3 the TPU seems fastest, if you have an i5 with AVC/AVC2 CPU is about the same as NCS2. Two NCS is about the same as one NCS2, I only have a single NCS2, but mixing NCS and NCS2 gives worse results than two NCS in my testing.

But for RTSP stream decoding there is no IPC overhead and the only IPC is handled by the localhost MQTT broker which seems really efficient.

The "demand" version solves the issue where QOS 0 MQTT buffers accumulate in the networking layer on the Pi3/4 XU-4 etc. By making the AI camera thread request a frame when ready for one (analogy with the Onvif snapshots which generally gives the best throughput on Pi3 class AI hosts) On i5 or better hardware I never noticed this issue.