Files
deepstream/new_temp.py
2025-06-13 18:34:45 -04:00

80 lines
2.4 KiB
Python

import gi
gi.require_version('Gst', '1.0')
#gi.require_version('GstNv', '1.0')
from gi.repository import Gst, GObject
# Initialize GStreamer
Gst.init(None)
def main():
# Create a pipeline
pipeline = Gst.Pipeline()
# Source element: File or Camera
source_bin = Gst.ElementFactory.make("filesrc", "file-source")
source_bin.set_property("location", '/opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.mp4')
decodebin = Gst.ElementFactory.make("decodebin", "decode-bin")
source_bin.link(decodebin)
# Queue 1 for decodebin output
queue1 = Gst.ElementFactory.make("queue", "queue1")
pipeline.add(queue1)
# Splitter: Tee Element
tee = Gst.ElementFactory.make("tee", "splitter")
pipeline.add(tee)
# Resizer branch
queue_resizer = Gst.ElementFactory.make("queue", "resizer-queue")
resizer = Gst.ElementFactory.make("nvvideoconvert", "video-converter")
capsfilter = Gst.ElementFactory.make("capsfilter", "caps-filter")
capsfilter.set_property("caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), width=300, height=300"))
# Model inference element
nvinfer = Gst.ElementFactory.make("nvinfer", "primary-inference")
nvinfer.set_property("config-file-path", "path_to_config_file.txt")
# Create a sink for visualizing output
fakesink_detect = Gst.ElementFactory.make("fakesink","fakesink")
fakesink_detect.set_property('enable-last-sample', 0)
fakesink_detect.set_property('sync', 0)
sink = fakesink_detect
# Add and link all elements (Resizer Branch)
for element in [queue_resizer, resizer, capsfilter, nvinfer, sink]:
pipeline.add(element)
decodebin.connect("pad-added", lambda decode, src_pad: src_pad.link(queue1.get_static_pad("sink")))
queue1.link(tee)
# Link the tee to the resizer pipeline
tee_src_pad1 = tee.get_request_pad("src_%u")
queue_resizer_sink_pad = queue_resizer.get_static_pad("sink")
tee_src_pad1.link(queue_resizer_sink_pad)
queue_resizer.link(resizer)
resizer.link(capsfilter)
capsfilter.link(nvinfer)
nvinfer.link(sink)
# Start the pipeline
pipeline.set_state(Gst.State.PLAYING)
# Run a main loop to listen for EOS or error
loop = GObject.MainLoop()
try:
loop.run()
except KeyboardInterrupt:
pass
pipeline.set_state(Gst.State.NULL)
if __name__ == "__main__":
main()