import asyncio import numpy as np import struct import re import pickle import datetime as dt import time import numpy class StreamManager(): def __init__(self, rtsp_url, resolution, camera_name = 'N/A'): self.cam_name = cam_name self.array_len = np.prod(resolution) self.cam_name = cam_name self.img_array = multiprocessing.Array(ctypes.c_uint8, int(array_len), lock=True) self.img_timestamp = multiprocessing.Value(ctypes.c_double) self.queue = multiprocessing.Queue() self.rtsp_url = format_ffmpeg_decode_url(rtsp_url).split(" ") self.process_func = partial(stream_wrapper, self.resolution, self.rtsp_url, camera_name=cam_name, queue=self.queue, img_array=self.img_array, img_timestamp=self.img_timestamp)) def get_next_bitmap( byte_stream, shape): numel = np.prod(shape) string_find = b'BM' + struct.pack(' 0: byte_buffer = byte_buffer[diff::] if not queue.empty(): msg = queue.get_nowait() print('got message! ' + str( msg)) if msg == 'exit': return if msg == 'get': print('doing get!') frame = get_next_bitmap(byte_buffer, shape) with img_timestamp.get_lock(), img_array.get_lock(): if frame is None: print(f"Read empty frame for {camera_name}") img_array[:] = 0 img_timestamp.value = 0 else: print(f"Read frame for {camera_name} at {dt.datetime.now()}") img_array[:] = frame.flatten()[:] img_timestamp.value = time.time() except asyncio.CancelledError: print('Cancelled Error') process.kill() await process.wait() def format_gst_url(rtsp_url): gst_pipeline = f"rtspsrc location={rtsp_url} latency=50 ! rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! appsink max-buffers=1 drop=true" return gst_pipeline def format_ffmpeg_decode_url(rtsp_url): cmd = f"ffmpeg -rtsp_transport tcp -i {rtsp_url} -f image2pipe -vcodec bmp -an pipe:1" return cmd