Files
inference_rpi_hailo/run_me.py
The Bears 73a2aead5f bump
2025-10-24 10:39:33 -04:00

240 lines
7.9 KiB
Python

import numpy as np
from functools import partial
from hailo_platform import VDevice, HailoSchedulingAlgorithm, FormatType
import cv2
import numpy as np
def resize_image(img_in, reshape_to_final=True):
if not isinstance(img_in, np.ndarray):
img_in = np.asarray(img_in)
max_l = 640
asp_rat = img_in.shape[0] / img_in.shape[1]
if asp_rat < 1:
output_size = [int(asp_rat * max_l), max_l]
else:
output_size = [max_l, int(max_l / asp_rat)]
im_arr_not_pad = cv2.resize(img_in, output_size[::-1])
pad_amt = [max_l, max_l] - np.asarray(im_arr_not_pad.shape[0:2])
left_pad, top_pad = (pad_amt / 2).astype(np.int64)
right_pad, bottom_pad = pad_amt - [left_pad, top_pad]
im_pass = np.zeros(shape=(max_l, max_l, 3), dtype=np.uint8)
im_pass[left_pad:(max_l - right_pad),
top_pad:(max_l - bottom_pad)] = (im_arr_not_pad)
data = im_pass
if reshape_to_final:
data = np.moveaxis(data, [2], [0])[None, :, :, :]
return data
def model_scoring_callback(completion_info, bindings, data):
if completion_info.exception:
# handle exception
pass
ff = bindings.output().get_buffer()
timeout_ms = 1000
params = VDevice.create_params()
params.scheduling_algorithm = HailoSchedulingAlgorithm.ROUND_ROBIN
import time
# The vdevice is used as a context manager ("with" statement) to ensure it's released on time.
with VDevice(params) as vdevice:
# Create an infer model from an HEF:
infer_model = vdevice.create_infer_model("yolov11l_inat.hef")
# Configure the infer model and create bindings for it
with infer_model.configure() as configured_infer_model:
bindings = configured_infer_model.create_bindings()
st = time.time()
for i in range(1):
# Set input and output buffers
buffer = inp # np.zeros(infer_model.input().shape).astype(np.uint8)
bindings.input().set_buffer(buffer)
output_array = np.zeros([infer_model.output().shape[0]
]).astype(np.float32)
bindings.output().set_buffer(output_array)
# Run synchronous inference and access the output buffers
configured_infer_model.run([bindings], timeout_ms)
buffer = bindings.output().get_buffer()
# Run asynchronous inference
job = configured_infer_model.run_async(
[bindings],
partial(example_callback, bindings=bindings, data=time.time()),
)
job.wait(timeout_ms)
# %%
import cv2
import time
import multiprocessing
import numpy as np
import ctypes
import shutil
cameras = {
"camera_side": {
'url': "rtsp://admin:marybear@192.168.1.151:554/h264Preview_01_sub",
'resolution': (480, 640, 3)
},
"camera_driveway": {
'url': "rtsp://admin:marybear@192.168.1.152:554/h264Preview_01_sub",
'resolution': (480, 640, 3)
},
"camera_railing": {
'url': "rtsp://admin:marybear@192.168.1.153:554/h264Preview_01_sub",
'resolution': (512, 896, 3)
},
"camera_ptz_right": {
'url': "rtsp://admin:marybear@192.168.1.155:554/h264Preview_01_sub",
'resolution': (360, 640, 3)
},
"camera_wrenwatch": {
'url': "rtsp://admin:marybear@192.168.1.158:554/h264Preview_01_sub",
'resolution': (360, 640, 3)
},
"camera_backyard": {
'url': "rtsp://admin:marybear@192.168.1.162:554/h264Preview_01_sub",
'resolution': (432, 1536, 3),
'split_into_two': True
},
}
# # %%
# import os
# os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = 'rtsp_transport;udp'
# cap = cv2.VideoCapture(cameras['camera_railing']['url'])
# # %%
# while True:
# _, frame = cap.read()
# # %%
# _, frame = cap.read()
# cv2.imwrite('FRAME.jpg', frame)
# # %%
def format_gst_url(rtsp_url):
gst_pipeline = f"rtspsrc location={rtsp_url} latency=50 ! rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! appsink max-buffers=1 drop=true"
return gst_pipeline
for cam_name, details in cameras.items():
array_len = np.prod(details['resolution'])
details['cam_name'] = cam_name
details['img_array'] = multiprocessing.Array(ctypes.c_uint8,
int(array_len), lock = True)
details['img_timestamp'] = multiprocessing.Value(ctypes.c_double)
details['queue'] = multiprocessing.Queue()
details['gst_pipeline_str'] = format_gst_url(details['url'])
import datetime as dt
def rtsp_stream_manager( camera_name, gst_pipeline_str, queue, img_array, img_timestamp):
capture_handle = cv2.VideoCapture(gst_pipeline_str, cv2.CAP_GSTREAMER)
while True:
if not queue.empty():
msg = queue.get_nowait()
if msg == 'get':
ret, frame = capture_handle.read()
with img_timestamp.get_lock(), img_array.get_lock():
if frame is None:
print(f"Read empty frame for {camera_name}")
img_array[:] = 0
img_timestamp.value = 0
else:
print(f"Read frame for {camera_name} at {dt.datetime.now()}")
img_array[:] = frame.flatten()[:]
img_timestamp.value = time.time()
elif msg == 'restart':
print('Restarting')
capture_handle = cv2.VideoCapture(gst_pipeline_str, cv2.CAP_GSTREAMER)
elif msg == 'exit':
print('Exiting')
return
img_scoring_queue = multiprocessing.Queue()
for cam_name, details in cameras.items():
p = multiprocessing.Process(target=rtsp_stream_manager,
args=(cam_name, details['gst_pipeline_str'],
details['queue'], details['img_array'], details['img_timestamp']))
details['process'] = p
for cam_name, details in cameras.items():
details['process'].start()
# %%
for cam_name, details in cameras.items():
details['queue'].put('restart')
# %%
for cam_name, details in cameras.items():
details['queue'].put('get')
if os.path.exists('images/'):
shutil.rmtree('images/')
os.makedirs('images/')
def create_score_message( details, reshaped_image, timestamp):
frames = list()
msg = list()
if details.get('split_into_two', False):
split_point = int(reshaped_image.shape[1]/2)
left_frame = resize_image(reshaped_image[:,:split_point,:], reshape_to_final = False)
right_frame = resize_image(reshaped_image[:,split_point:,:], reshape_to_final = False)
left_frame = cv2.cvtColor(left_frame, cv2.COLOR_BGR2RGB)
right_frame = cv2.cvtColor(right_frame, cv2.COLOR_BGR2RGB)
msg.append({'camera_name': details['cam_name']+'_left', 'frame': left_frame, 'image_timestamp': timestamp})
msg.append({'camera_name': details['cam_name']+'_right', 'frame': right_frame, 'image_timestamp': timestamp})
else:
frame = resize_image(reshaped_image, reshape_to_final = False)
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
msg.append({'camera_name': details['cam_name'], 'frame': frame, 'image_timestamp': timestamp})
return msg
for cam_name, details in cameras.items():
img_array = details['img_array']
img_timestamp = details['img_timestamp']
with img_array.get_lock(), img_timestamp.get_lock():
reshaped_image = np.frombuffer(details['img_array'].get_obj(), dtype=np.uint8).reshape(details['resolution'])
timestamp = img_timestamp.value
for msg in create_score_message(details, reshaped_image, timestamp):
img_scoring_queue.put(msg)
print('Writing for ' + cam_name + f' for {reshaped_image.shape}')
# cv2.imwrite('images/'+ cam_name + '.jpg', reshaped_image)
for x in range(img_scoring_queue.qsize()):
qu = img_scoring_queue.get()
print(qu['camera_name'],qu['frame'].shape)
cv2.imwrite(str(x)+'.jpg', qu['frame'])