Files
inference_rpi_hailo/utils.py
The Bears ad10b3ceed YACWC
2025-10-31 21:57:14 -04:00

265 lines
9.6 KiB
Python

import asyncio
from common_code.settings import LogColorize
import concurrent.futures
import string
from random import choices
from urllib import parse
from io import BytesIO
import requests
import cv2
import queue
import logging
import struct
import re
import pickle
import datetime as dt
from functools import partial
import cv2
import time
import multiprocessing
import threading
import numpy as np
import ctypes
import shutil
import hashlib
from hailo_platform import VDevice, HailoSchedulingAlgorithm, FormatType
import pickle
import json
import redis
import os
pfm = LogColorize.score_obj_det_embed
# %%
with open('/home/thebears/source/infer/species_list','r') as sl:
species_list = [x for x in sl.read().split('\n') if len(x) > 0]
r = redis.Redis('localhost',port=6379, db=14)
logger = logging.getLogger('live_inference')
def get_snap( url, username, password, proxies = None, timeout=5, save_image = None, camera_name = 'N/A', width = None, height = None):
data = {
'cmd': 'Snap',
'channel': 0,
'rs': ''.join(choices(string.ascii_uppercase + string.digits, k=10)),
'snapType':'sub',
'user': username,
'password': password,
}
if width is not None and height is not None:
data['width'] = width
data['height'] = height
parms = parse.urlencode(data, safe="!").encode("utf-8")
try:
response = requests.get(url, proxies=proxies, params=parms, timeout=timeout)
if response.status_code == 200:
rearr = np.frombuffer(bytearray(response.content), dtype=np.uint8)
img_bgr = cv2.imdecode(rearr,cv2.IMREAD_COLOR)
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
if save_image is not None:
os.makedirs(os.path.dirname(save_image), exist_ok=True)
cv2.imwrite( save_image, img_bgr)
logging.info(f'{camera_name}: Wrote image to {save_image}')
logging.info(f'{camera_name}: Got image of {img_rgb.shape}')
return img_rgb
else:
logging.info(f'{camera_name}: Got response code of {response.status_code}')
except Exception as e:
logging.error(f'{camera_name} failure: {str(e)}')
# %%
def resize_image(img_in, reshape_to_final=False):
if not isinstance(img_in, np.ndarray):
img_in = np.asarray(img_in)
max_l = 640
asp_rat = img_in.shape[0] / img_in.shape[1]
if asp_rat < 1:
output_size = [int(asp_rat * max_l), max_l]
else:
output_size = [max_l, int(max_l / asp_rat)]
im_arr_not_pad = cv2.resize(img_in, output_size[::-1])
pad_amt = [max_l, max_l] - np.asarray(im_arr_not_pad.shape[0:2])
left_pad, top_pad = (pad_amt / 2).astype(np.int64)
right_pad, bottom_pad = pad_amt - [left_pad, top_pad]
im_pass = np.zeros(shape=(max_l, max_l, 3), dtype=np.uint8)
im_pass[left_pad:(max_l - right_pad),
top_pad:(max_l - bottom_pad)] = (im_arr_not_pad)
data = im_pass
if reshape_to_final:
data = np.moveaxis(data, [2], [0])[None, :, :, :]
return data
def model_scoring_callback(completion_info, bindings, data):
if completion_info.exception:
pass
ff = bindings.output().get_buffer()
camera_name = data['camera_name']
timestamp = data['image_timestamp']
hash_value = data['image_hash']
dump_model_results_to_json( camera_name, timestamp, ff, hash_value)
def round_floats(obj, decimals=4):
if isinstance(obj, float):
return round(obj, decimals)
elif isinstance(obj, dict):
return {k: round_floats(v, decimals) for k, v in obj.items()}
elif isinstance(obj, list):
return [round_floats(item, decimals) for item in obj]
return obj
def dump_model_results_to_json(camera_name, timestamp, output_array, hash_value):
has_scores = {idx:x for idx,x in enumerate(output_array) if len(x) > 0}
score_dict = {}
score_dict['timestamp'] = timestamp
score_dict['scores'] = list()
score_dict['image_hash'] = hash_value
for idx, sc in has_scores.items():
for r in sc:
score_dict['scores'].append({'idx': idx, 'species': species_list[idx], 'boxes':r[0:4].tolist(), 'score': r[4].tolist()})
json_str = json.dumps(round_floats(score_dict))
rt_path = '/home/thebears/source/infer/scores/'
os.makedirs(rt_path, exist_ok = True)
with open(rt_path + camera_name,'a') as ff:
ff.write(json_str)
ff.write('\n')
def run_model(img_scoring_queue):
timeout_ms = 1000
logger.info('Starting model scoring process')
params = VDevice.create_params()
params.scheduling_algorithm = HailoSchedulingAlgorithm.ROUND_ROBIN
with VDevice(params) as vdevice:
infer_model = vdevice.create_infer_model("yolov11l_inat.hef")
logger.info('Loaded model')
with infer_model.configure() as configured_infer_model:
bindings = configured_infer_model.create_bindings()
while True:
try:
# Use get with timeout for multiprocessing queue
res = img_scoring_queue.get(timeout=1.0)
r.set('model_inference_heartbeat',time.time())
inp = res['frame']
res_send = {'camera_name': res['camera_name'], 'image_timestamp': res['image_timestamp'], 'image_hash':res['image_hash']}
logger.info(f'Running inference for {res_send}')
r.set('model_inference_started',str(res_send))
bindings.input().set_buffer(inp)
output_array = np.zeros([infer_model.output().shape[0]]).astype(np.float32)
bindings.output().set_buffer(output_array)
configured_infer_model.run([bindings], timeout_ms)
job = configured_infer_model.run_async(
[bindings],
partial(model_scoring_callback, bindings=bindings, data=res_send),
)
r.set('model_inference_finished',str(res_send))
try:
job.wait(timeout_ms)
except Exception as e:
logger.error(str(e))
except:
# Handle both queue.Empty and multiprocessing timeout
continue
class SnapManager():
def __init__(self, ip, url_api, username, password, camera_name, msg_queue=None, img_scoring_queue=None, split_into_two=False, resolution = None, **kwargs):
self.ip = ip
self.url_api = url_api
self.username = username
self.password = password
self.camera_name = camera_name
self.split_into_two = split_into_two
self.msg_queue = msg_queue
self.img_scoring_queue = img_scoring_queue
self.resolution = resolution
logger.info(f"{self.camera_name}: initialized")
def format_image_for_model(self, image, timestamp):
msg = list()
if self.split_into_two:
split_point = int(image.shape[1] / 2)
left_frame = resize_image(image[:, :split_point, :])
right_frame = resize_image(image[:, split_point:, :])
msg.append({
'camera_name': self.camera_name + '_left',
'frame': left_frame,
'image_timestamp': timestamp,
'image_hash': hashlib.sha1(left_frame.tobytes()).hexdigest()
})
msg.append({
'camera_name': self.camera_name + '_right',
'frame': right_frame,
'image_timestamp': timestamp,
'image_hash': hashlib.sha1(right_frame.tobytes()).hexdigest()
})
else:
frame = resize_image(image)
msg.append({
'camera_name': self.camera_name,
'frame': frame,
'image_timestamp': timestamp,
'image_hash': hashlib.sha1(frame.tobytes()).hexdigest()
})
return msg
def capture_and_prepare(self, save_image = None):
if self.resolution is not None:
width = self.resolution[1]
height = self.resolution[0]
else:
width = None
height = None
img = get_snap(self.url_api,self.username, self.password, camera_name = self.camera_name, save_image= save_image, width = width, height = height)
if img is not None:
timestamp = time.time()
return self.format_image_for_model(img, timestamp)
return []
def run_forever(self):
while True:
try:
msg = self.msg_queue.get(timeout=0.1)
if msg == 'exit':
break
elif msg.startswith('get'):
if '+save' in msg:
save_image = 'images/'+self.camera_name +'/'+ str(time.time()) + '.jpg'
else:
save_image = None
logger.info(f'Processing capture for {self.camera_name}')
model_msgs = self.capture_and_prepare(save_image = save_image)
for model_msg in model_msgs:
# Use put_nowait for multiprocessing queue to avoid blocking
try:
self.img_scoring_queue.put_nowait(model_msg)
except:
# Queue full, skip this message
logger.warning(f"Model queue full, dropping message from {self.camera_name}")
except queue.Empty:
pass
def start_snap_manager(**kwargs):
obj = SnapManager(**kwargs)
obj.run_forever()