This commit is contained in:
The Bears
2025-10-31 20:57:56 -04:00
parent e79a9f0458
commit 09ed1b8c97
15 changed files with 3278 additions and 659 deletions

292
utils.py
View File

@@ -1,112 +1,220 @@
import asyncio
import numpy as np
from common_code.settings import LogColorize
import concurrent.futures
import string
from random import choices
from urllib import parse
from io import BytesIO
import requests
import cv2
import queue
import logging
import struct
import re
import pickle
import pickle
import datetime as dt
from functools import partial
import cv2
import time
import numpy
import multiprocessing
import threading
import numpy as np
import ctypes
import shutil
import hashlib
from hailo_platform import VDevice, HailoSchedulingAlgorithm, FormatType
import pickle
import json
import redis
import os
pfm = LogColorize.score_obj_det_embed
with open('/home/thebears/source/infer/species_list','r') as sl:
species_list = [x for x in sl.read().split('\n') if len(x) > 0]
r = redis.Redis('localhost',port=6379, db=14)
logger = logging.getLogger('live_inference')
def resize_image(img_in, reshape_to_final=False):
if not isinstance(img_in, np.ndarray):
img_in = np.asarray(img_in)
max_l = 640
asp_rat = img_in.shape[0] / img_in.shape[1]
if asp_rat < 1:
output_size = [int(asp_rat * max_l), max_l]
else:
output_size = [max_l, int(max_l / asp_rat)]
im_arr_not_pad = cv2.resize(img_in, output_size[::-1])
pad_amt = [max_l, max_l] - np.asarray(im_arr_not_pad.shape[0:2])
left_pad, top_pad = (pad_amt / 2).astype(np.int64)
right_pad, bottom_pad = pad_amt - [left_pad, top_pad]
im_pass = np.zeros(shape=(max_l, max_l, 3), dtype=np.uint8)
im_pass[left_pad:(max_l - right_pad),
top_pad:(max_l - bottom_pad)] = (im_arr_not_pad)
data = im_pass
if reshape_to_final:
data = np.moveaxis(data, [2], [0])[None, :, :, :]
return data
class StreamManager():
def __init__(self, rtsp_url, resolution, camera_name = 'N/A'):
self.cam_name = cam_name
def model_scoring_callback(completion_info, bindings, data):
if completion_info.exception:
pass
ff = bindings.output().get_buffer()
camera_name = data['camera_name']
timestamp = data['image_timestamp']
hash_value = data['image_hash']
dump_model_results_to_json( camera_name, timestamp, ff, hash_value)
def round_floats(obj, decimals=4):
if isinstance(obj, float):
return round(obj, decimals)
elif isinstance(obj, dict):
return {k: round_floats(v, decimals) for k, v in obj.items()}
elif isinstance(obj, list):
return [round_floats(item, decimals) for item in obj]
return obj
self.array_len = np.prod(resolution)
self.cam_name = cam_name
self.img_array = multiprocessing.Array(ctypes.c_uint8,
int(array_len),
lock=True)
self.img_timestamp = multiprocessing.Value(ctypes.c_double)
self.queue = multiprocessing.Queue()
self.rtsp_url = format_ffmpeg_decode_url(rtsp_url).split(" ")
self.process_func = partial(stream_wrapper,
self.resolution,
self.rtsp_url,
camera_name=cam_name,
queue=self.queue,
img_array=self.img_array,
img_timestamp=self.img_timestamp))
def dump_model_results_to_json(camera_name, timestamp, output_array, hash_value):
has_scores = {idx:x for idx,x in enumerate(output_array) if len(x) > 0}
score_dict = {}
score_dict['timestamp'] = timestamp
score_dict['scores'] = list()
score_dict['image_hash'] = hash_value
for idx, sc in has_scores.items():
for r in sc:
score_dict['scores'].append({'idx': idx, 'species': species_list[idx], 'boxes':r[0:4].tolist(), 'score': r[4].tolist()})
def get_next_bitmap( byte_stream, shape):
numel = np.prod(shape)
string_find = b'BM' + struct.pack('<I',numel+54)
fcf = [m.start() for m in re.finditer(re.escape(string_find), byte_stream)]
for start_index in fcf:
end_index = start_index + 60+numel
frame_data_with_header = byte_stream[start_index:end_index]
header = frame_data_with_header[:60]
frame = frame_data_with_header[60:]
nf = np.frombuffer(frame, dtype=np.uint8)
if len(nf) != numel:
return None, None
pic = np.reshape(nf, shape)
pic = pic[::-1,:,:]
if len(nf) == numel:
return pic
json_str = json.dumps(round_floats(score_dict))
def stream_wrapper( shape, cmd, camera_name = None, queue = None, img_array = None, img_timestamp = None):
print('Starting wrapper')
func = read_stream( shape, cmd, camera_name = camera_name, queue = queue, img_array = img_array, img_timestamp = img_timestamp)
asyncio.run(func)
with open('/home/thebears/source/infer/scores/' + camera_name,'a') as ff:
ff.write(json_str)
ff.write('\n')
async def read_stream( shape, cmd, camera_name = None, queue = None, img_array = None, img_timestamp = None):
print('Starting stream')
byte_buffer = b''
bytes_read = (np.prod(shape)+60)*2
print(cmd)
process = await asyncio.create_subprocess_exec(*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
try:
while True:
# Read up to 65KB at a time
chunk = await process.stdout.read(bytes_read)
byte_buffer += chunk
diff = len(byte_buffer) - bytes_read
# print(len(byte_buffer))
if diff > 0:
byte_buffer = byte_buffer[diff::]
def run_model(img_scoring_queue):
timeout_ms = 1000
logger.info('Starting model scoring process')
params = VDevice.create_params()
params.scheduling_algorithm = HailoSchedulingAlgorithm.ROUND_ROBIN
with VDevice(params) as vdevice:
infer_model = vdevice.create_infer_model("yolov11l_inat.hef")
logger.info('Loaded model')
with infer_model.configure() as configured_infer_model:
bindings = configured_infer_model.create_bindings()
while True:
try:
# Use get with timeout for multiprocessing queue
res = img_scoring_queue.get(timeout=1.0)
r.set('model_inference_heartbeat',time.time())
inp = res['frame']
res_send = {'camera_name': res['camera_name'], 'image_timestamp': res['image_timestamp'], 'image_hash':res['image_hash']}
logger.info(f'Running inference for {res_send}')
r.set('model_inference_started',str(res_send))
bindings.input().set_buffer(inp)
output_array = np.zeros([infer_model.output().shape[0]]).astype(np.float32)
bindings.output().set_buffer(output_array)
if not queue.empty():
msg = queue.get_nowait()
print('got message! ' + str( msg))
configured_infer_model.run([bindings], timeout_ms)
job = configured_infer_model.run_async(
[bindings],
partial(model_scoring_callback, bindings=bindings, data=res_send),
)
r.set('model_inference_finished',str(res_send))
try:
job.wait(timeout_ms)
except Exception as e:
logger.error(str(e))
except:
# Handle both queue.Empty and multiprocessing timeout
continue
class SnapManager():
def __init__(self, ip, url_api, username, password, camera_name, msg_queue=None, img_scoring_queue=None, split_into_two=False, **kwargs):
self.ip = ip
self.url_api = url_api
self.username = username
self.password = password
self.camera_name = camera_name
self.split_into_two = split_into_two
n self.msg_queue = msg_queue
self.img_scoring_queue = img_scoring_queue
logger.info(f"{self.camera_name}: initialized")
def format_image_for_model(self, image, timestamp):
msg = list()
if self.split_into_two:
split_point = int(image.shape[1] / 2)
left_frame = resize_image(image[:, :split_point, :])
right_frame = resize_image(image[:, split_point:, :])
msg.append({
'camera_name': self.camera_name + '_left',
'frame': left_frame,
'image_timestamp': timestamp,
'image_hash': hashlib.sha1(left_frame.tobytes()).hexdigest()
})
msg.append({
'camera_name': self.camera_name + '_right',
'frame': right_frame,
'image_timestamp': timestamp,
'image_hash': hashlib.sha1(right_frame.tobytes()).hexdigest()
})
else:
frame = resize_image(image)
msg.append({
'camera_name': self.camera_name,
'frame': frame,
'image_timestamp': timestamp,
'image_hash': hashlib.sha1(frame.tobytes()).hexdigest()
})
return msg
def capture_and_prepare(self):
img = get_snap(self.username, self.password, self.url_api, self.camera_name)
if img is not None:
timestamp = time.time()
return self.format_image_for_model(img, timestamp)
return []
def run_forever(self):
while True:
try:
msg = self.msg_queue.get(timeout=0.1)
if msg == 'exit':
return
break
if msg == 'save_image':
if msg == 'get':
print('doing get!')
frame = get_next_bitmap(byte_buffer, shape)
with img_timestamp.get_lock(), img_array.get_lock():
if frame is None:
print(f"Read empty frame for {camera_name}")
img_array[:] = 0
img_timestamp.value = 0
else:
print(f"Read frame for {camera_name} at {dt.datetime.now()}")
img_array[:] = frame.flatten()[:]
img_timestamp.value = time.time()
logger.info(f'Processing capture for {self.camera_name}')
model_msgs = self.capture_and_prepare()
for model_msg in model_msgs:
# Use put_nowait for multiprocessing queue to avoid blocking
try:
self.img_scoring_queue.put_nowait(model_msg)
except:
# Queue full, skip this message
logger.warning(f"Model queue full, dropping message from {self.camera_name}")
except queue.Empty:
pass
except asyncio.CancelledError:
print('Cancelled Error')
process.kill()
await process.wait()
def format_gst_url(rtsp_url):
gst_pipeline = f"rtspsrc location={rtsp_url} latency=50 ! rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! appsink max-buffers=1 drop=true"
return gst_pipeline
def format_ffmpeg_decode_url(rtsp_url):
cmd = f"ffmpeg -rtsp_transport tcp -i {rtsp_url} -f image2pipe -vcodec bmp -an pipe:1"
return cmd
def start_snap_manager(**kwargs):
obj = SnapManager(**kwargs)
obj.run_forever()