This commit is contained in:
2025-06-16 14:08:32 -04:00
parent 6778393bfd
commit f31aa282dd
3 changed files with 196 additions and 252 deletions

View File

@@ -16,7 +16,7 @@ import argparse
pfm = LogColorize.watch_and_fix_permissions
logger = get_logger(__name__,'/var/log/ml_vision_logs/00_watch_and_fix_permissions', stdout=True, systemd=False)
Gst.debug_set_default_threshold(Gst.DebugLevel.INFO)
Gst.debug_set_default_threshold(Gst.DebugLevel.ERROR)
os.environ.pop("DISPLAY",':0')
target_width_detect = 1280
target_height_detect = 720
@@ -26,6 +26,99 @@ target_width_embed = 512
target_height_embed = 512
MUXER_BATCH_TIMEOUT_USEC = 1000000
def print_pipeline_structure(pipeline):
"""
Recursively prints elements in the pipeline and their properties.
"""
if not isinstance(pipeline, Gst.Pipeline):
print("Not a valid GStreamer pipeline.")
return
def _print_element_properties(element, indent=0):
spaces = " " * indent
print(spaces + f"Element: {element.get_name()} (Type: {element.get_factory().get_name()})")
# Print its properties
for prop in element.list_properties():
try:
val = element.get_property(prop.name)
if val != prop.default_value: # Display only non-default properties
print(spaces + f" - {prop.name}: {val}")
except:
pass
def _print_pipeline_structure(element, indent=0):
spaces = " " * indent
children = element.children if hasattr(element, 'children') else []
if len(children) > 0:
print(spaces + f"[{element.get_name()}]")
for child in children:
_print_pipeline_structure(child, indent + 2)
else:
_print_element_properties(element, indent)
print("\nPipeline Structure:")
print("===================")
_print_pipeline_structure(pipeline)
print("===================\n")
def get_detailed_pipeline_string(pipeline):
"""Generate a more detailed pipeline string with properties"""
if not isinstance(pipeline, Gst.Pipeline):
return None
def get_element_string(element):
# Get element factory name
factory = element.get_factory()
if factory:
element_str = factory.get_name()
else:
element_str = element.get_name()
# Add properties
props = []
for prop in element.list_properties():
# Skip some properties that are typically not set in command line
if prop.name in ('name', 'parent'):
continue
try:
val = element.get_property(prop.name)
if val is not None and val != prop.default_value:
# Format value appropriately based on type
if isinstance(val, str):
props.append(f"{prop.name}=\"{val}\"")
elif isinstance(val, bool):
props.append(f"{prop.name}={str(val).lower()}")
else:
props.append(f"{prop.name}={val}")
except:
# Skip properties that can't be read
pass
if props:
element_str += " " + " ".join(props)
return element_str
result = []
# Simple approach - just gets top-level elements
iterator = pipeline.iterate_elements()
while True:
ret, element = iterator.next()
if ret != Gst.IteratorResult.OK:
break
result.append(get_element_string(element))
return " ! ".join(result)
def embed_results_probe(pad,info,u_data, list_add, frame_num = 0):
gst_buffer = info.get_buffer()
print("HEY I AM PROBING EMBEDDINGS")
@@ -243,7 +336,9 @@ def create_source_bin(uri):
return nbin
def run_inference(file_path):
#def run_inference(file_path):
if True:
file_path = '/home/thebears/local/source/short.mp4'
os.environ.pop("DISPLAY",None)
if not file_path.startswith('file://'):
file_path = 'file://'+file_path
@@ -421,97 +516,7 @@ def run_inference(file_path):
pass
# cleanup
pipeline.set_state(Gst.State.NULL)
return detector_list, embed_list
def print_pipeline_structure(pipeline):
"""
Recursively prints elements in the pipeline and their properties.
"""
if not isinstance(pipeline, Gst.Pipeline):
print("Not a valid GStreamer pipeline.")
return
def _print_element_properties(element, indent=0):
spaces = " " * indent
print(spaces + f"Element: {element.get_name()} (Type: {element.get_factory().get_name()})")
# Print its properties
for prop in element.list_properties():
try:
val = element.get_property(prop.name)
if val != prop.default_value: # Display only non-default properties
print(spaces + f" - {prop.name}: {val}")
except:
pass
def _print_pipeline_structure(element, indent=0):
spaces = " " * indent
children = element.children if hasattr(element, 'children') else []
if len(children) > 0:
print(spaces + f"[{element.get_name()}]")
for child in children:
_print_pipeline_structure(child, indent + 2)
else:
_print_element_properties(element, indent)
print("\nPipeline Structure:")
print("===================")
_print_pipeline_structure(pipeline)
print("===================\n")
def get_detailed_pipeline_string(pipeline):
"""Generate a more detailed pipeline string with properties"""
if not isinstance(pipeline, Gst.Pipeline):
return None
def get_element_string(element):
# Get element factory name
factory = element.get_factory()
if factory:
element_str = factory.get_name()
else:
element_str = element.get_name()
# Add properties
props = []
for prop in element.list_properties():
# Skip some properties that are typically not set in command line
if prop.name in ('name', 'parent'):
continue
try:
val = element.get_property(prop.name)
if val is not None and val != prop.default_value:
# Format value appropriately based on type
if isinstance(val, str):
props.append(f"{prop.name}=\"{val}\"")
elif isinstance(val, bool):
props.append(f"{prop.name}={str(val).lower()}")
else:
props.append(f"{prop.name}={val}")
except:
# Skip properties that can't be read
pass
if props:
element_str += " " + " ".join(props)
return element_str
result = []
# Simple approach - just gets top-level elements
iterator = pipeline.iterate_elements()
while True:
ret, element = iterator.next()
if ret != Gst.IteratorResult.OK:
break
result.append(get_element_string(element))
return " ! ".join(result)
# return detector_list, embed_list