Files
deepstream/random_stuff.py
2025-06-17 14:08:34 -04:00

254 lines
8.0 KiB
Python

def element_to_string(element, include_props=True):
"""Convert a GStreamer element to string with properties"""
factory_name = element.get_factory().get_name()
result = factory_name
if include_props:
# Get interesting properties
for prop in element.list_properties():
# Skip default values
if element.get_property(prop.name) != prop.default_value:
value = element.get_property(prop.name)
result += f" {prop.name}={value}"
# For containers like bins, recursively process children
if isinstance(element, Gst.Bin):
children = []
for child in element.iterate_elements():
children.append(element_to_string(child, include_props))
if children:
result += " ( " + " ! ".join(children) + " )"
return result
# Example usage
#pipeline = Gst.parse_launch("videotestsrc pattern=ball ! videoconvert ! autovideosink")
#print(element_to_string(pipeline))
def pipeline_to_string(pipeline):
"""Convert a GStreamer pipeline to string representation"""
result = []
if isinstance(pipeline, Gst.Pipeline):
# Get top-level elements
iterator = pipeline.iterate_elements()
elements = list(iterator)
for i, element in enumerate(elements):
# Get element factory name
factory_name = element.get_factory().get_name()
result.append(factory_name)
# Add properties that differ from default
# (This is complex and incomplete)
# Add separator if not the last element
if i < len(elements) - 1:
result.append("!")
return " ".join(result)
# Example usage
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst
# Initialize GStreamer
Gst.init(None)
# Create a pipeline
pipeline_description = "videotestsrc pattern=3 ! videoconvert ! autovideosink"
pipeline = Gst.parse_launch(pipeline_description)
def get_pipeline_string(element):
"""Recursively convert elements of a pipeline to a pipeline string."""
if not element:
return ""
iterator = element.iterate_elements()
pipeline_parts = []
while True:
result, elem = iterator.next()
if result == Gst.IteratorResult.OK:
# Get element name
elem_name = elem.get_name()
elem_factory_name = elem.get_factory().get_name()
# Fetch element properties
properties = elem.list_properties()
prop_strings = []
for prop in properties:
try:
prop_value = elem.get_property(prop.name)
prop_strings.append(f'{prop.name}={prop_value}')
except Exception:
pass # Skip properties that cannot be retrieved
# Build string representation of the element
if prop_strings:
element_string = f'{elem_factory_name} {",".join(prop_strings)}'
else:
element_string = elem_factory_name
pipeline_parts.append(element_string)
elif result != Gst.IteratorResult.OK:
break
return " ! ".join(pipeline_parts)
# Convert pipeline to a string
pipeline_string = get_pipeline_string(pipeline)
print(f"Pipeline String: {pipeline_string}")
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst
# Initialize GStreamer
Gst.init(None)
# Create a pipeline
pipeline_description = "videotestsrc pattern=3 ! videoconvert ! autovideosink"
pipeline = Gst.parse_launch(pipeline_description)
def get_pipeline_string(element):
"""Recursively convert elements of a pipeline to a pipeline string."""
if not element:
return ""
iterator = element.iterate_elements()
pipeline_parts = []
while True:
result, elem = iterator.next()
if result == Gst.IteratorResult.OK:
# Get element name
elem_name = elem.get_name()
elem_factory_name = elem.get_factory().get_name()
# Fetch element properties
properties = elem.list_properties()
prop_strings = []
for prop in properties:
try:
prop_value = elem.get_property(prop.name)
prop_strings.append(f'{prop.name}={prop_value}')
except Exception:
pass # Skip properties that cannot be retrieved
# Build string representation of the element
if prop_strings:
element_string = f'{elem_factory_name} {",".join(prop_strings)}'
else:
element_string = elem_factory_name
pipeline_parts.append(element_string)
elif result != Gst.IteratorResult.OK:
break
return " ! ".join(pipeline_parts)
# Convert pipeline to a string
pipeline_string = get_pipeline_string(pipeline)
print(f"Pipeline String: {pipeline_string}")
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst
# Initialize GStreamer
Gst.init(None)
# Create a pipeline
pipeline_description = "videotestsrc pattern=3 ! videoconvert ! autovideosink"
pipeline = Gst.parse_launch(pipeline_description)
def get_pipeline_string(element):
"""Recursively convert elements of a pipeline to a pipeline string."""
if not element:
return ""
iterator = element.iterate_elements()
pipeline_parts = []
while True:
result, elem = iterator.next()
if result == Gst.IteratorResult.OK:
# Get element name
elem_name = elem.get_name()
elem_factory_name = elem.get_factory().get_name()
# Fetch element properties
properties = elem.list_properties()
prop_strings = []
for prop in properties:
try:
prop_value = elem.get_property(prop.name)
prop_strings.append(f'{prop.name}={prop_value}')
except Exception:
pass # Skip properties that cannot be retrieved
# Build string representation of the element
if prop_strings:
element_string = f'{elem_factory_name} {",".join(prop_strings)}'
else:
element_string = elem_factory_name
pipeline_parts.append(element_string)
elif result != Gst.IteratorResult.OK:
break
return " ! ".join(pipeline_parts)
# Convert pipeline to a string
pipeline_string = get_pipeline_string(pipeline)
print(f"Pipeline String: {pipeline_string}")
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst
# Initialize GStreamer
Gst.init(None)
# Create a pipeline
pipeline_description = "videotestsrc pattern=3 ! videoconvert ! autovideosink"
pipeline = Gst.parse_launch(pipeline_description)
def get_pipeline_string(element):
"""Recursively convert elements of a pipeline to a pipeline string."""
if not element:
return ""
iterator = element.iterate_elements()
pipeline_parts = []
while True:
result, elem = iterator.next()
if result == Gst.IteratorResult.OK:
# Get element name
elem_name = elem.get_name()
elem_factory_name = elem.get_factory().get_name()
# Fetch element properties
properties = elem.list_properties()
prop_strings = []
for prop in properties:
try:
prop_value = elem.get_property(prop.name)
prop_strings.append(f'{prop.name}={prop_value}')
except Exception:
pass # Skip properties that cannot be retrieved
# Build string representation of the element
if prop_strings:
element_string = f'{elem_factory_name} {",".join(prop_strings)}'
else:
element_string = elem_factory_name
pipeline_parts.append(element_string)
elif result != Gst.IteratorResult.OK:
break
return " ! ".join(pipeline_parts)