import sys import gi gi.require_version('Gst', '1.0') from gi.repository import GObject, Gst, GLib # Initialize GStreamer Gst.init(None) def create_source_bin(uri, source_id): # Create a source bin element bin_name = "source-bin-%02d" % source_id source_bin = Gst.Bin.new(bin_name) if not source_bin: sys.stderr.write("Unable to create source bin\n") return None # Create elements uri_decode_bin = Gst.ElementFactory.make("uridecodebin", "uri-decode-bin") if not uri_decode_bin: sys.stderr.write("Unable to create uri decode bin\n") return None # Set properties uri_decode_bin.set_property("uri", uri) # Add the URI decode bin to the source bin source_bin.add(uri_decode_bin) # Add pad probe for data flow pad = Gst.Ghost.Pad.new_no_target("src", Gst.PadDirection.SRC) if not pad: sys.stderr.write("Failed to add ghost pad in source bin\n") return None source_bin.add_pad(pad) # Connect to the "pad-added" signal of the decoder uri_decode_bin.connect("pad-added", cb_newpad, source_bin) return source_bin def cb_newpad(decoder, pad, source_bin): # Check the media type of the pad pad_caps = pad.get_current_caps() gst_struct = pad_caps.get_structure(0) pad_type = gst_struct.get_name() if pad_type.startswith("video"): # Link the decoder to the ghost pad sink_pad = source_bin.get_static_pad("src") if not sink_pad.is_linked(): pad.link(sink_pad) def main(): # Create the main pipeline pipeline = Gst.Pipeline() # Create the source bin source_bin = create_source_bin("file:///path/to/video.mp4", 0) # Create a tee element to split the stream tee = Gst.ElementFactory.make("tee", "tee") # Create two streammux elements streammux1 = Gst.ElementFactory.make("nvstreammux", "stream-muxer1") streammux2 = Gst.ElementFactory.make("nvstreammux", "stream-muxer2") # Set streammux properties streammux1.set_property("width", 1920) streammux1.set_property("height", 1080) streammux1.set_property("batch-size", 1) streammux1.set_property("batched-push-timeout", 4000000) streammux2.set_property("width", 1280) streammux2.set_property("height", 720) streammux2.set_property("batch-size", 1) streammux2.set_property("batched-push-timeout", 4000000) # Create queue elements for each branch queue1 = Gst.ElementFactory.make("queue", "queue1") queue2 = Gst.ElementFactory.make("queue", "queue2") # Add all elements to the pipeline pipeline.add(source_bin) pipeline.add(tee) pipeline.add(queue1) pipeline.add(queue2) pipeline.add(streammux1) pipeline.add(streammux2) # Link the source bin to the tee source_bin.link(tee) # Link tee to the first queue and then to streammux1 tee_pad1 = tee.get_request_pad("src_%u") queue1_pad = queue1.get_static_pad("sink") tee_pad1.link(queue1_pad) # Link the queue1 to streammux1 sinkpad1 = streammux1.get_request_pad("sink_0") srcpad1 = queue1.get_static_pad("src") srcpad1.link(sinkpad1) # Link tee to the second queue and then to streammux2 tee_pad2 = tee.get_request_pad("src_%u") queue2_pad = queue2.get_static_pad("sink") tee_pad2.link(queue2_pad) # Link the queue2 to streammux2 sinkpad2 = streammux2.get_request_pad("sink_0") srcpad2 = queue2.get_static_pad("src") srcpad2.link(sinkpad2) # Continue building your pipeline with the two streammux outputs # For example, you can add nvinfer, nvtracker, nvvideoconvert, etc. # Start the pipeline pipeline.set_state(Gst.State.PLAYING) # Run the main loop loop = GLib.MainLoop() loop.run() # Clean up pipeline.set_state(Gst.State.NULL) if __name__ == "__main__": main() import gi gi.require_version('Gst', '1.0') from gi.repository import Gst, GObject def main(): # Initialize GStreamer Gst.init(None) # Create the pipeline pipeline = Gst.Pipeline() # Create a source bin (e.g., decodebin from a file source or cameras) source_bin = create_source_bin("file://path/to/your/video") pipeline.add(source_bin) # Create the tee element tee = Gst.ElementFactory.make("tee", "tee") pipeline.add(tee) # Create two nvstreammux elements streammux1 = Gst.ElementFactory.make("nvstreammux", "streammux1") streammux1.set_property("batch-size", 1) pipeline.add(streammux1) streammux2 = Gst.ElementFactory.make("nvstreammux", "streammux2") streammux2.set_property("batch-size", 1) pipeline.add(streammux2) # Request pads from the tee element tee_src_pad_1 = tee.get_request_pad("src_%u") tee_src_pad_2 = tee.get_request_pad("src_%u") # Create two queues for each branch queue1 = Gst.ElementFactory.make("queue", "queue1") queue2 = Gst.ElementFactory.make("queue", "queue2") pipeline.add(queue1) pipeline.add(queue2) # Link tee to the queues tee_src_pad_1.link(queue1.get_static_pad("sink")) tee_src_pad_2.link(queue2.get_static_pad("sink")) # Link queue1 -> streammux1 and queue2 -> streammux2 queue1.link(streammux1) queue2.link(streammux2) # Add other elements (e.g., processing, display sinks) after each streammux as required. # For simplicity, you can just add a fakesink at the end of each branch for now. fake_sink_1 = Gst.ElementFactory.make("fakesink", "fakesink1") fake_sink_2 = Gst.ElementFactory.make("fakesink2") pipeline.add(fake_sink_1) pipeline.add(fake_sink_2) streammux1.link(fake_sink_1) streammux2.link(fake_sink_2) # Connect the source bin to the tee source_bin_src_pad = source_bin.get_static_pad("src") tee_sink_pad = tee.get_static_pad("sink") source_bin_src_pad.link(tee_sink_pad) # Start the pipeline pipeline.set_state(Gst.State.PLAYING) # Run the pipeline loop = GObject.MainLoop() try: loop.run() except KeyboardInterrupt: print("Exiting...") pipeline.set_state(Gst.State.NULL) def create_source_bin(uri): # Create the source bin (file or live source) uridecodebin = Gst.ElementFactory.make("uridecodebin", "source-bin") uridecodebin.set_property("uri", uri) bin_pad = uridecodebin.get_static_pad("src") return uridecodebin if __name__ == "__main__": main()