# rtsp2mqtt.py # # OpenCV Python code to grab rtsp stream frames and distribute as MQTT buffers. # # only tested with python3 # starting as: # python3 rtsp2mqtt.py 2>/dev/null # is usefull to stop seeing the warnings import cv2 from imutils.video import FPS import os import signal import datetime import time import paho.mqtt.client as mqtt # threading stuff from queue import Queue from threading import Lock, Thread # edit this to match your cameras rtspURL= [ # no reason sources can't be mixed. ##"rtsp://admin:xyzzy@Security.DVR.IP:554/cam/realmonitor?channel=4&subtype=0", # security DVR ##"rtsp://admin:xyzzy@Security.DVR.IP:554/cam/realmonitor?channel=6&subtype=0", # security DVR "rtsp://184.72.239.149/vod/mp4:BigBuckBunny_175k.mov", # web stream "rtsp://b1.dnsdojo.com:1935/live/sys3.stream"##, # larger web stream beach scene ##"rtsp://192.168.2.156:554/user=admin_password=tlJwpbo6_channel=1_stream=0.sdp?real_stream", # China "Onvif" netcams ##"rtsp://192.168.2.126:554/user=admin_password=tlJwpbo6_channel=1_stream=0.sdp?real_stream", ##"rtsp://192.168.2.53:554/user=admin_password=tlJwpbo6_channel=1_stream=0.sdp?real_stream", ##"rtsp://admin:admin@192.168.2.219/media/video1"##, ] Nrtsp=len(rtspURL) global QUIT QUIT=False # True exits main loop and all threads # Boilerplate code to setup signal handler for graceful shutdown on Linux def sigint_handler(signal, frame): global QUIT currentDT = datetime.datetime.now() print('caught SIGINT, normal exit. -- ' + currentDT.strftime("%Y-%m-%d %H:%M:%S")) #quitQ.put(True) QUIT=True # flags removes toolbar and status bar from window windowName = list() for i in range(Nrtsp): windowName.append('rtsp_' + str(i)) cv2.namedWindow(windowName[i], flags=cv2.WINDOW_GUI_NORMAL + cv2.WINDOW_AUTOSIZE) # this will fail for CV without QT for highgui if os.path.exists("/home/dan/ncs") or os.path.exists("/home/wally") or os.path.exists("/home/pi") or os.path.exists("/home/ai"): top=40 left=1 Xshift=643 Yshift=392 if os.path.exists("/home/dan/ncs") or os.path.exists("/home/pi") or os.path.exists("/home/ai"): Nrows=3 # 1920x1080 display top=25 left=0 else: Nrows=5 # my 2160p display for i in range(Nrtsp): col=int(i/Nrows) row=i%Nrows cv2.moveWindow(windowName[i], left+col*Xshift, top+row*Yshift) cv2.waitKey(1) # *** RTSP Sampling Thread #****************************************************************************************************************** # rtsp stream sampling thread ##rtspLock = Lock() # make rtsp frame grabs be atomic, seems openCV may not be completely thread safe. def rtsp_thread(inframe, camn, URL): # negative camn means round-robin stream sampling in a single thread global QUIT ocnt=0 Error=False Error2=False currentDT = datetime.datetime.now() print("[INFO] RTSP stream sampling thread" + str(camn) + " is starting..." + currentDT.strftime(" %Y-%m-%d %H:%M:%S")) Rcap=cv2.VideoCapture(URL) Rcap.set(cv2.CAP_PROP_BUFFERSIZE, 2) # doesn't throw error or warning in python3, but not sure it is actually honored currentDT = datetime.datetime.now() print("[INFO] RTSP stream sampling thread" + str(camn) + " is running..." + currentDT.strftime(" %Y-%m-%d %H:%M:%S")) while not QUIT: # grab the frame try: if Rcap.grab(): ret, frame = Rcap.retrieve() else: continue if ret: # path for sucessful frame grab, following test is in case error recovery is in progress if Error: # log when it recovers currentDT = datetime.datetime.now() print('[$$$$$$] RTSP Camera'+ str(camn) + ' has recovered: ' + URL[0:33] + ' --- ' + currentDT.strftime(" %Y-%m-%d %H:%M:%S")) Error=False # after geting a good frame, enable logging of next error else: frame = None if not Error: Error=True currentDT = datetime.datetime.now() print('[Error!] RTSP Camera'+ str(camn) + ': ' + URL[0:33] + currentDT.strftime(" %Y-%m-%d %H:%M:%S")) else: # try closing the stream and reopeing it, I have one straight from China that does this error regularly Rcap.release() time.sleep(1.0) currentDT = datetime.datetime.now() print('[******] RTSP stream'+ str(camn) + ' closing and re-opening stream ' + URL[0:33] + ' --- ' + currentDT.strftime(" %Y-%m-%d %H:%M:%S")) Rcap=cv2.VideoCapture(URL) if Rcap.isOpened(): continue # avoid the sleep if stream is re-opened. else: currentDT = datetime.datetime.now() print('[Error!] RTSP stream'+ str(camn) + ' re-open failed! $$$ ' + URL[0:33] + ' --- ' + currentDT.strftime(" %Y-%m-%d %H:%M:%S")) except Exception as e: # this appears to fix the Besder camera problem where it drops out for minutes every 5-12 hours, will it work for rtsp stream? frame = None if not Error2: # suppress the zillions of sequential error messages while it recovers Error2=True # unlike the Besder camera snapshot errors, these never seem to recover on their own currentDT = datetime.datetime.now() print('[Error_2] RTSP stream'+ str(camn) + ': ' + str(e) + " " + URL[0:33] + ' --- ' + currentDT.strftime(" %Y-%m-%d %H:%M:%S")) else: # try closing the stream and reopeing it, I have one straight from China that does this error regularly Rcap.release() time.sleep(1.0) Rcap=cv2.VideoCapture(URL) if Rcap.isOpened(): Error2=False continue # avoid the sleep if stream is re-opened. else: print('[Error_2] RTSP stream'+ str(camn) + ' re-open failed! ### ' + URL[0:30] + ' --- ' + currentDT.strftime(" %Y-%m-%d %H:%M:%S")) if QUIT: break # time.sleep(2.0) # let other threads have more time while this camera recovers try: if frame is not None and not QUIT: inframe.put(frame, False) # no block if queue full, go grab fresher frame ## retv, img_as_jpg = cv2.imencode('.jpg', frame, [int(cv2.IMWRITE_JPEG_QUALITY), 70]) # for sending image as mqtt buffer, 10X+ less data being sent. ## if not retv: ## print("[INFO] thread{} conversion of np array to jpg in buffer failed!", str(camn)) ## img_as_jpg = None ## else: ## client.publish(str("MQTTcam/" + str(camn)), bytearray(img_as_jpg), 0, False) except: # most likely queue is full if QUIT: break ocnt=ocnt+1 # a large drop count for rtsp streams is not a bad thing as we are trying to keep the input buffers nearly empty to reduce latency. Rcap.release() print("RTSP stream sampling thread" + str(camn) + " is exiting, dropped frames " + str(ocnt) + " times.") # allocate queues inframe = list() for i in range(Nrtsp): inframe.append(Queue(2)) # *** connect to MQTT broker def on_publish(client, userdata, mid): #print("mid: " + str(mid)) # don't think I need to care about this for now, print for initial tests pass def on_disconnect(client, userdata, rc): if rc != 0: currentDT = datetime.datetime.now() print("Unexpected MQTT disconnection!" + currentDT.strftime(" ... %Y-%m-%d %H:%M:%S")) pass def on_connect(client, userdata, flags, rc): #client.subscribe("MQTTcam/#") pass def on_message(client, userdata, msg): pass print("[INFO] connecting to MQTT broker...") client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message client.on_publish = on_publish client.on_disconnect = on_disconnect client.connect("localhost", 1883, 60) client.loop_start() # *** start camera reading threads o = list() print("[INFO] starting " + str(Nrtsp) + " RTSP Camera Sampling Threads ...") for i in range(Nrtsp): o.append(Thread(target=rtsp_thread, args=(inframe[i], i, rtspURL[i]))) o[i].start() print("[INFO] starting the FPS counter...") fps = FPS().start() while not QUIT: try: for i in range(Nrtsp): # get a frame if not inframe[i].empty(): try: frame = inframe[i].get(False) except: continue else: continue if frame is None: continue retv, img_as_jpg = cv2.imencode('.jpg', frame, [int(cv2.IMWRITE_JPEG_QUALITY), 70]) # for sending image as mqtt buffer, 10X+ less data being sent. if not retv: print("[INFO] conversion of np array to jpg in buffer failed!") img_as_jpg = None else: client.publish(str("MQTTcam/" + str(i)), bytearray(img_as_jpg), 0, False) cv2.imshow(windowName[i],cv2.resize(frame, (640, 360))) fps.update() # update the FPS counter key = cv2.waitKey(1) & 0xFF if key == ord("q"): # if the `q` key was pressed, break from the loop break except KeyboardInterrupt: QUIT = True break except Exception as e: print('EXCEPTION! ' + str(e)) break # stop the FPS counter timer fps.stop() QUIT=True # display FPS information print("[INFO] elapsed time: {:.2f}".format(fps.elapsed())) print("[INFO] approx. FPS: {:.2f}".format(fps.fps())) for i in range(Nrtsp): o[i].join() print("[INFO] All Camera Threads have exited ...") print("[INFO] Stopping mqtt client thread ...") client.loop_stop()