import asyncio from common_code.settings import LogColorize import concurrent.futures import string from random import choices from urllib import parse from io import BytesIO import requests import cv2 import queue import logging import struct import re import pickle import datetime as dt from functools import partial import cv2 import time import multiprocessing import threading import numpy as np import ctypes import shutil import hashlib from hailo_platform import VDevice, HailoSchedulingAlgorithm, FormatType import pickle import json import redis import os pfm = LogColorize.score_obj_det_embed # %% with open('/home/thebears/source/infer/species_list','r') as sl: species_list = [x for x in sl.read().split('\n') if len(x) > 0] r = redis.Redis('localhost',port=6379, db=14) logger = logging.getLogger('live_inference') def get_snap( url, username, password, proxies = None, timeout=5, save_image = None, camera_name = 'N/A'): data = { 'cmd': 'Snap', 'channel': 0, 'rs': ''.join(choices(string.ascii_uppercase + string.digits, k=10)), 'snapType':'sub', 'user': username, 'password': password, } parms = parse.urlencode(data, safe="!").encode("utf-8") try: response = requests.get(url, proxies=proxies, params=parms, timeout=timeout) if response.status_code == 200: rearr = np.frombuffer(bytearray(response.content), dtype=np.uint8) img_bgr = cv2.imdecode(rearr,cv2.IMREAD_COLOR) img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) if save_image is not None: os.makedirs(os.path.dirname(save_image), exist_ok=True) cv2.imwrite( save_image, img_bgr) logging.info(f'{camera_name}: Wrote image to {save_image}') logging.info(f'{camera_name}: Got image of {img_rgb.shape}') return img_rgb except Exception as e: logging.error(f'{camera_name} failure: {str(e)}') raise # %% def resize_image(img_in, reshape_to_final=False): if not isinstance(img_in, np.ndarray): img_in = np.asarray(img_in) max_l = 640 asp_rat = img_in.shape[0] / img_in.shape[1] if asp_rat < 1: output_size = [int(asp_rat * max_l), max_l] else: output_size = [max_l, int(max_l / asp_rat)] im_arr_not_pad = cv2.resize(img_in, output_size[::-1]) pad_amt = [max_l, max_l] - np.asarray(im_arr_not_pad.shape[0:2]) left_pad, top_pad = (pad_amt / 2).astype(np.int64) right_pad, bottom_pad = pad_amt - [left_pad, top_pad] im_pass = np.zeros(shape=(max_l, max_l, 3), dtype=np.uint8) im_pass[left_pad:(max_l - right_pad), top_pad:(max_l - bottom_pad)] = (im_arr_not_pad) data = im_pass if reshape_to_final: data = np.moveaxis(data, [2], [0])[None, :, :, :] return data def model_scoring_callback(completion_info, bindings, data): if completion_info.exception: pass ff = bindings.output().get_buffer() camera_name = data['camera_name'] timestamp = data['image_timestamp'] hash_value = data['image_hash'] dump_model_results_to_json( camera_name, timestamp, ff, hash_value) def round_floats(obj, decimals=4): if isinstance(obj, float): return round(obj, decimals) elif isinstance(obj, dict): return {k: round_floats(v, decimals) for k, v in obj.items()} elif isinstance(obj, list): return [round_floats(item, decimals) for item in obj] return obj def dump_model_results_to_json(camera_name, timestamp, output_array, hash_value): has_scores = {idx:x for idx,x in enumerate(output_array) if len(x) > 0} score_dict = {} score_dict['timestamp'] = timestamp score_dict['scores'] = list() score_dict['image_hash'] = hash_value for idx, sc in has_scores.items(): for r in sc: score_dict['scores'].append({'idx': idx, 'species': species_list[idx], 'boxes':r[0:4].tolist(), 'score': r[4].tolist()}) json_str = json.dumps(round_floats(score_dict)) with open('/home/thebears/source/infer/scores/' + camera_name,'a') as ff: ff.write(json_str) ff.write('\n') def run_model(img_scoring_queue): timeout_ms = 1000 logger.info('Starting model scoring process') params = VDevice.create_params() params.scheduling_algorithm = HailoSchedulingAlgorithm.ROUND_ROBIN with VDevice(params) as vdevice: infer_model = vdevice.create_infer_model("yolov11l_inat.hef") logger.info('Loaded model') with infer_model.configure() as configured_infer_model: bindings = configured_infer_model.create_bindings() while True: try: # Use get with timeout for multiprocessing queue res = img_scoring_queue.get(timeout=1.0) r.set('model_inference_heartbeat',time.time()) inp = res['frame'] res_send = {'camera_name': res['camera_name'], 'image_timestamp': res['image_timestamp'], 'image_hash':res['image_hash']} logger.info(f'Running inference for {res_send}') r.set('model_inference_started',str(res_send)) bindings.input().set_buffer(inp) output_array = np.zeros([infer_model.output().shape[0]]).astype(np.float32) bindings.output().set_buffer(output_array) configured_infer_model.run([bindings], timeout_ms) job = configured_infer_model.run_async( [bindings], partial(model_scoring_callback, bindings=bindings, data=res_send), ) r.set('model_inference_finished',str(res_send)) try: job.wait(timeout_ms) except Exception as e: logger.error(str(e)) except: # Handle both queue.Empty and multiprocessing timeout continue class SnapManager(): def __init__(self, ip, url_api, username, password, camera_name, msg_queue=None, img_scoring_queue=None, split_into_two=False, **kwargs): self.ip = ip self.url_api = url_api self.username = username self.password = password self.camera_name = camera_name self.split_into_two = split_into_two self.msg_queue = msg_queue self.img_scoring_queue = img_scoring_queue logger.info(f"{self.camera_name}: initialized") def format_image_for_model(self, image, timestamp): msg = list() if self.split_into_two: split_point = int(image.shape[1] / 2) left_frame = resize_image(image[:, :split_point, :]) right_frame = resize_image(image[:, split_point:, :]) msg.append({ 'camera_name': self.camera_name + '_left', 'frame': left_frame, 'image_timestamp': timestamp, 'image_hash': hashlib.sha1(left_frame.tobytes()).hexdigest() }) msg.append({ 'camera_name': self.camera_name + '_right', 'frame': right_frame, 'image_timestamp': timestamp, 'image_hash': hashlib.sha1(right_frame.tobytes()).hexdigest() }) else: frame = resize_image(image) msg.append({ 'camera_name': self.camera_name, 'frame': frame, 'image_timestamp': timestamp, 'image_hash': hashlib.sha1(frame.tobytes()).hexdigest() }) return msg def capture_and_prepare(self): img = get_snap(self.url_api,self.username, self.password, camera_name = self.camera_name) if img is not None: timestamp = time.time() return self.format_image_for_model(img, timestamp) return [] def run_forever(self): while True: try: msg = self.msg_queue.get(timeout=0.1) if msg == 'exit': break if msg == 'get': logger.info(f'Processing capture for {self.camera_name}') model_msgs = self.capture_and_prepare() for model_msg in model_msgs: # Use put_nowait for multiprocessing queue to avoid blocking try: self.img_scoring_queue.put_nowait(model_msg) except: # Queue full, skip this message logger.warning(f"Model queue full, dropping message from {self.camera_name}") except queue.Empty: pass def start_snap_manager(**kwargs): obj = SnapManager(**kwargs) obj.run_forever()