import asyncio from common_code.settings import LogColorize import concurrent.futures from common_code import kwq import string from random import choices from urllib import parse from io import BytesIO import requests import cv2 import queue import base64 import logging import struct import re import pickle import datetime as dt from functools import partial import cv2 import time import multiprocessing import threading import numpy as np import ctypes import shutil import hashlib from hailo_platform import VDevice, HailoSchedulingAlgorithm, FormatType import pickle import json import redis import os pfm = LogColorize.score_obj_det_embed publish = kwq.publish # %% with open('/home/thebears/source/infer/species_list', 'r') as sl: species_list = [x for x in sl.read().split('\n') if len(x) > 0] r = redis.Redis('localhost', port=6379, db=14) logger = logging.getLogger('live_inference') def get_snap(url, username, password, proxies=None, timeout=5, save_image=None, camera_name='N/A', width=None, height=None, kafka_queue=None): data = { 'cmd': 'Snap', 'channel': 0, 'rs': ''.join(choices(string.ascii_uppercase + string.digits, k=10)), 'snapType': 'main', 'user': username, 'password': password, } if width is not None and height is not None and data['snapType'] == 'sub': data['width'] = width data['height'] = height parms = parse.urlencode(data, safe="!").encode("utf-8") try: response = requests.get(url, proxies=proxies, params=parms, timeout=timeout) if response.status_code == 200: rearr = np.frombuffer(bytearray(response.content), dtype=np.uint8) img_bgr = cv2.imdecode(rearr, cv2.IMREAD_COLOR) img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) if save_image is not None: if kafka_queue is None: os.makedirs(os.path.dirname(save_image), exist_ok=True) cv2.imwrite(save_image, img_bgr) logging.info(f'{camera_name}: Wrote image to {save_image}') else: retval, buffer_jp = cv2.imencode('.jpg', img_bgr) # bbj = base64.b64encode(buffer_jp).decode() kafka_queue.put({ 'type': 'image', 'data': { 'image_bytes': buffer_jp, 'timestamp': time.time(), 'camera_name': camera_name } }) logging.info(f'{camera_name}: Wrote image to kafka queue') logging.info(f'{camera_name}: Got image of {img_rgb.shape}') return img_rgb else: logging.info( f'{camera_name}: Got response code of {response.status_code}') except Exception as e: logging.error(f'{camera_name} failure: {str(e)}') # %% def resize_image(img_in, reshape_to_final=False): if not isinstance(img_in, np.ndarray): img_in = np.asarray(img_in) max_l = 640 asp_rat = img_in.shape[0] / img_in.shape[1] if asp_rat < 1: output_size = [int(asp_rat * max_l), max_l] else: output_size = [max_l, int(max_l / asp_rat)] im_arr_not_pad = cv2.resize(img_in, output_size[::-1]) pad_amt = [max_l, max_l] - np.asarray(im_arr_not_pad.shape[0:2]) left_pad, top_pad = (pad_amt / 2).astype(np.int64) right_pad, bottom_pad = pad_amt - [left_pad, top_pad] im_pass = np.zeros(shape=(max_l, max_l, 3), dtype=np.uint8) im_pass[left_pad:(max_l - right_pad), top_pad:(max_l - bottom_pad)] = (im_arr_not_pad) data = im_pass if reshape_to_final: data = np.moveaxis(data, [2], [0])[None, :, :, :] return data def kafka_poster(kafka_results_queue): while True: msg = kafka_results_queue.get() msg_type = msg.get('type', 'n/a') try: if msg_type in {'objdet'}: data = msg['data'] cam_name = data['camera_name'] topic_name = f'streaming_{cam_name}_{msg_type}' kwq.kafka_client.send(topic=topic_name, key=data['timestamp'], value=data) elif msg_type in {'image'}: data = msg['data'] cam_name = data['camera_name'] topic_name = f'streaming_{cam_name}_{msg_type}' kwq.kafka_client.send(topic=topic_name, key=data['timestamp'], value=data['image_bytes'], serialize_value=False) except Exception as e: logger.error(f'Error when publishing to {topic_name}: {str(e)}') def model_scoring_callback(completion_info, bindings, data, kafka_queue): if completion_info.exception: pass ff = bindings.output().get_buffer() camera_name = data['camera_name'] timestamp = data['image_timestamp'] hash_value = data['image_hash'] ret_obj = dump_model_results_to_json(camera_name, timestamp, ff, hash_value) max_score = 0.0 for sc in ret_obj['scores']: max_score = max( max_score , sc['score']) ret_obj['max_score'] = max_score if max_score > 0.25: oi = data['og_image'] retval, buffer_jp = cv2.imencode('.jpg',cv2.cvtColor(oi, cv2.COLOR_RGB2BGR)) ret_obj['image_bytes'] = base64.b64encode(buffer_jp).decode() kafka_queue.put({'type': 'objdet', 'data': ret_obj}) def round_floats(obj, decimals=4): if isinstance(obj, float): return round(obj, decimals) elif isinstance(obj, dict): return {k: round_floats(v, decimals) for k, v in obj.items()} elif isinstance(obj, list): return [round_floats(item, decimals) for item in obj] return obj def dump_model_results_to_json(camera_name, timestamp, output_array, hash_value, no_write=True, return_dict=True): has_scores = {idx: x for idx, x in enumerate(output_array) if len(x) > 0} score_dict = {} score_dict['camera_name'] = camera_name score_dict['timestamp'] = timestamp score_dict['scores'] = list() score_dict['image_hash'] = hash_value for idx, sc in has_scores.items(): for r in sc: score_dict['scores'].append({ 'idx': idx, 'species': species_list[idx], 'boxes': r[0:4].tolist(), 'score': r[4].tolist() }) rnd_score_dict = round_floats(score_dict) if return_dict and no_write: return rnd_score_dict json_str = json.dumps(rnd_score_dict) if not no_write: rt_path = '/home/thebears/source/infer/scores/' os.makedirs(rt_path, exist_ok=True) with open(rt_path + camera_name, 'a') as ff: ff.write(json_str) ff.write('\n') def run_model(img_scoring_queue, kafka_results_queue): timeout_ms = 1000 logger.info('Starting model scoring process') params = VDevice.create_params() params.scheduling_algorithm = HailoSchedulingAlgorithm.ROUND_ROBIN with VDevice(params) as vdevice: infer_model = vdevice.create_infer_model("yolov11l_inat.hef") logger.info('Loaded model') with infer_model.configure() as configured_infer_model: bindings = configured_infer_model.create_bindings() while True: try: # Use get with timeout for multiprocessing queue res = img_scoring_queue.get(timeout=1.0) r.set('model_inference_heartbeat', time.time()) inp = res['frame'] res_send = { 'camera_name': res['camera_name'], 'image_timestamp': res['image_timestamp'], 'image_hash': res['image_hash'], 'og_image': res['og_image'] } logger.info(f'Running inference for {res_send}') r.set('model_inference_started', str(res_send)) bindings.input().set_buffer(inp) output_array = np.zeros([infer_model.output().shape[0] ]).astype(np.float32) bindings.output().set_buffer(output_array) configured_infer_model.run([bindings], timeout_ms) job = configured_infer_model.run_async( [bindings], partial(model_scoring_callback, bindings=bindings, data=res_send, kafka_queue=kafka_results_queue)) r.set('model_inference_finished', str(res_send)) try: job.wait(timeout_ms) except Exception as e: logger.error(str(e)) except: # Handle both queue.Empty and multiprocessing timeout continue class SnapManager(): def __init__(self, ip, url_api, username, password, camera_name, msg_queue=None, img_scoring_queue=None, kafka_queue=None, split_into_two=False, resolution=None, **kwargs): self.ip = ip self.url_api = url_api self.username = username self.kafka_queue = kafka_queue self.password = password self.camera_name = camera_name self.split_into_two = split_into_two self.msg_queue = msg_queue self.img_scoring_queue = img_scoring_queue self.resolution = resolution logger.info(f"{self.camera_name}: initialized") def format_image_for_model(self, image, timestamp): msg = list() if self.split_into_two: split_point = int(image.shape[1] / 2) left_og_image = image[:, :split_point, :] right_og_image = image[:, split_point:, :] left_frame = resize_image(left_og_image) right_frame = resize_image(right_og_image) msg.append({ 'camera_name': self.camera_name + '_left', 'frame': left_frame, 'image_timestamp': timestamp, 'image_hash': hashlib.sha1(left_frame.tobytes()).hexdigest(), 'og_image': left_og_image }) msg.append({ 'camera_name': self.camera_name + '_right', 'frame': right_frame, 'image_timestamp': timestamp, 'image_hash': hashlib.sha1(right_frame.tobytes()).hexdigest(), 'og_image': right_og_image }) else: frame = resize_image(image) msg.append({ 'camera_name': self.camera_name, 'frame': frame, 'image_timestamp': timestamp, 'image_hash': hashlib.sha1(frame.tobytes()).hexdigest(), 'og_image':image }) return msg def capture_and_prepare(self, save_image=None): if self.resolution is not None: width = self.resolution[1] height = self.resolution[0] else: width = None height = None img = get_snap(self.url_api, self.username, self.password, camera_name=self.camera_name, save_image=save_image, kafka_queue=self.kafka_queue, width=width, height=height) if img is not None: timestamp = time.time() return self.format_image_for_model(img, timestamp) return [] def run_forever(self): while True: try: msg = self.msg_queue.get(timeout=0.1) if msg == 'exit': break elif msg.startswith('get'): if '+save' in msg: save_image = 'images/' + self.camera_name + '/' + str( time.time()) + '.jpg' else: save_image = None logger.info(f'Processing capture for {self.camera_name}') model_msgs = self.capture_and_prepare( save_image=save_image) for model_msg in model_msgs: # Use put_nowait for multiprocessing queue to avoid blocking try: self.img_scoring_queue.put_nowait(model_msg) except: # Queue full, skip this message logger.warning( f"Model queue full, dropping message from {self.camera_name}" ) except queue.Empty: pass def start_snap_manager(**kwargs): obj = SnapManager(**kwargs) obj.run_forever()