This commit is contained in:
The Bears
2025-11-03 12:59:33 -05:00
parent f045d8fec5
commit 47871c5370
125 changed files with 6554 additions and 12007 deletions

242
utils.py
View File

@@ -1,7 +1,7 @@
import asyncio
from common_code.settings import LogColorize
import concurrent.futures
from common_code import kwq
import string
from random import choices
from urllib import parse
@@ -10,6 +10,7 @@ import requests
import cv2
import queue
import base64
import logging
import struct
import re
@@ -29,21 +30,35 @@ import pickle
import json
import redis
import os
pfm = LogColorize.score_obj_det_embed
publish = kwq.publish
# %%
with open('/home/thebears/source/infer/species_list','r') as sl:
with open('/home/thebears/source/infer/species_list', 'r') as sl:
species_list = [x for x in sl.read().split('\n') if len(x) > 0]
r = redis.Redis('localhost',port=6379, db=14)
r = redis.Redis('localhost', port=6379, db=14)
logger = logging.getLogger('live_inference')
def get_snap( url, username, password, proxies = None, timeout=5, save_image = None, camera_name = 'N/A', width = None, height = None):
def get_snap(url,
username,
password,
proxies=None,
timeout=5,
save_image=None,
camera_name='N/A',
width=None,
height=None,
kafka_queue=None):
data = {
'cmd': 'Snap',
'channel': 0,
'rs': ''.join(choices(string.ascii_uppercase + string.digits, k=10)),
'snapType':'sub',
'snapType': 'sub',
'user': username,
'password': password,
}
@@ -53,25 +68,43 @@ def get_snap( url, username, password, proxies = None, timeout=5, save_image = N
parms = parse.urlencode(data, safe="!").encode("utf-8")
try:
response = requests.get(url, proxies=proxies, params=parms, timeout=timeout)
response = requests.get(url,
proxies=proxies,
params=parms,
timeout=timeout)
if response.status_code == 200:
rearr = np.frombuffer(bytearray(response.content), dtype=np.uint8)
img_bgr = cv2.imdecode(rearr,cv2.IMREAD_COLOR)
img_bgr = cv2.imdecode(rearr, cv2.IMREAD_COLOR)
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
if save_image is not None:
os.makedirs(os.path.dirname(save_image), exist_ok=True)
cv2.imwrite( save_image, img_bgr)
logging.info(f'{camera_name}: Wrote image to {save_image}')
if kafka_queue is None:
os.makedirs(os.path.dirname(save_image), exist_ok=True)
cv2.imwrite(save_image, img_bgr)
logging.info(f'{camera_name}: Wrote image to {save_image}')
else:
retval, buffer_jp = cv2.imencode('.jpg', img_bgr)
# bbj = base64.b64encode(buffer_jp).decode()
kafka_queue.put({
'type': 'image',
'data': {
'image_bytes': buffer_jp,
'timestamp': time.time(),
'camera_name': camera_name
}
})
logging.info(f'{camera_name}: Wrote image to kafka queue')
logging.info(f'{camera_name}: Got image of {img_rgb.shape}')
return img_rgb
else:
logging.info(f'{camera_name}: Got response code of {response.status_code}')
logging.info(
f'{camera_name}: Got response code of {response.status_code}')
except Exception as e:
logging.error(f'{camera_name} failure: {str(e)}')
# %%
def resize_image(img_in, reshape_to_final=False):
if not isinstance(img_in, np.ndarray):
@@ -99,14 +132,49 @@ def resize_image(img_in, reshape_to_final=False):
return data
def model_scoring_callback(completion_info, bindings, data):
def kafka_poster(kafka_results_queue):
while True:
msg = kafka_results_queue.get()
msg_type = msg.get('type', 'n/a')
if msg_type in {'objdet'}:
data = msg['data']
cam_name = data['camera_name']
topic_name = f'streaming_{cam_name}_{msg_type}'
kwq.kafka_client.send(topic=topic_name,
key=data['timestamp'],
value=data)
elif msg_type in {'image'}:
data = msg['data']
cam_name = data['camera_name']
topic_name = f'streaming_{cam_name}_{msg_type}'
kwq.kafka_client.send(topic=topic_name,
key=data['timestamp'],
value=data['image_bytes'],
serialize_value=False)
def model_scoring_callback(completion_info, bindings, data, kafka_queue):
if completion_info.exception:
pass
ff = bindings.output().get_buffer()
camera_name = data['camera_name']
timestamp = data['image_timestamp']
hash_value = data['image_hash']
dump_model_results_to_json( camera_name, timestamp, ff, hash_value)
ret_obj = dump_model_results_to_json(camera_name, timestamp, ff,
hash_value)
max_score = 0.0
for sc in ret_obj['scores']:
max_score = max( max_score , sc['score'])
ret_obj['max_score'] = max_score
if max_score > 0.25:
oi = data['og_image']
retval, buffer_jp = cv2.imencode('.jpg',cv2.cvtColor(oi, cv2.COLOR_RGB2BGR))
ret_obj['image_bytes'] = base64.b64encode(buffer_jp).decode()
kafka_queue.put({'type': 'objdet', 'data': ret_obj})
def round_floats(obj, decimals=4):
if isinstance(obj, float):
@@ -118,30 +186,43 @@ def round_floats(obj, decimals=4):
return obj
def dump_model_results_to_json(camera_name, timestamp, output_array, hash_value):
has_scores = {idx:x for idx,x in enumerate(output_array) if len(x) > 0}
def dump_model_results_to_json(camera_name,
timestamp,
output_array,
hash_value,
no_write=True,
return_dict=True):
has_scores = {idx: x for idx, x in enumerate(output_array) if len(x) > 0}
score_dict = {}
score_dict['camera_name'] = camera_name
score_dict['timestamp'] = timestamp
score_dict['scores'] = list()
score_dict['image_hash'] = hash_value
for idx, sc in has_scores.items():
for r in sc:
score_dict['scores'].append({'idx': idx, 'species': species_list[idx], 'boxes':r[0:4].tolist(), 'score': r[4].tolist()})
score_dict['scores'].append({
'idx': idx,
'species': species_list[idx],
'boxes': r[0:4].tolist(),
'score': r[4].tolist()
})
rnd_score_dict = round_floats(score_dict)
if return_dict and no_write:
return rnd_score_dict
json_str = json.dumps(rnd_score_dict)
if not no_write:
rt_path = '/home/thebears/source/infer/scores/'
os.makedirs(rt_path, exist_ok=True)
with open(rt_path + camera_name, 'a') as ff:
ff.write(json_str)
ff.write('\n')
json_str = json.dumps(round_floats(score_dict))
rt_path = '/home/thebears/source/infer/scores/'
os.makedirs(rt_path, exist_ok = True)
with open(rt_path + camera_name,'a') as ff:
ff.write(json_str)
ff.write('\n')
def run_model(img_scoring_queue):
def run_model(img_scoring_queue, kafka_results_queue):
timeout_ms = 1000
logger.info('Starting model scoring process')
params = VDevice.create_params()
@@ -155,24 +236,32 @@ def run_model(img_scoring_queue):
try:
# Use get with timeout for multiprocessing queue
res = img_scoring_queue.get(timeout=1.0)
r.set('model_inference_heartbeat',time.time())
r.set('model_inference_heartbeat', time.time())
inp = res['frame']
res_send = {'camera_name': res['camera_name'], 'image_timestamp': res['image_timestamp'], 'image_hash':res['image_hash']}
res_send = {
'camera_name': res['camera_name'],
'image_timestamp': res['image_timestamp'],
'image_hash': res['image_hash'],
'og_image': res['og_image']
}
logger.info(f'Running inference for {res_send}')
r.set('model_inference_started',str(res_send))
r.set('model_inference_started', str(res_send))
bindings.input().set_buffer(inp)
output_array = np.zeros([infer_model.output().shape[0]]).astype(np.float32)
output_array = np.zeros([infer_model.output().shape[0]
]).astype(np.float32)
bindings.output().set_buffer(output_array)
configured_infer_model.run([bindings], timeout_ms)
job = configured_infer_model.run_async(
[bindings],
partial(model_scoring_callback, bindings=bindings, data=res_send),
)
r.set('model_inference_finished',str(res_send))
partial(model_scoring_callback,
bindings=bindings,
data=res_send,
kafka_queue=kafka_results_queue))
r.set('model_inference_finished', str(res_send))
try:
job.wait(timeout_ms)
except Exception as e:
@@ -181,11 +270,25 @@ def run_model(img_scoring_queue):
# Handle both queue.Empty and multiprocessing timeout
continue
class SnapManager():
def __init__(self, ip, url_api, username, password, camera_name, msg_queue=None, img_scoring_queue=None, split_into_two=False, resolution = None, **kwargs):
def __init__(self,
ip,
url_api,
username,
password,
camera_name,
msg_queue=None,
img_scoring_queue=None,
kafka_queue=None,
split_into_two=False,
resolution=None,
**kwargs):
self.ip = ip
self.url_api = url_api
self.username = username
self.kafka_queue = kafka_queue
self.password = password
self.camera_name = camera_name
self.split_into_two = split_into_two
@@ -198,20 +301,32 @@ class SnapManager():
msg = list()
if self.split_into_two:
split_point = int(image.shape[1] / 2)
left_frame = resize_image(image[:, :split_point, :])
right_frame = resize_image(image[:, split_point:, :])
left_og_image = image[:, :split_point, :]
right_og_image = image[:, split_point:, :]
left_frame = resize_image(left_og_image)
right_frame = resize_image(right_og_image)
msg.append({
'camera_name': self.camera_name + '_left',
'frame': left_frame,
'image_timestamp': timestamp,
'image_hash': hashlib.sha1(left_frame.tobytes()).hexdigest()
'camera_name':
self.camera_name + '_left',
'frame':
left_frame,
'image_timestamp':
timestamp,
'image_hash':
hashlib.sha1(left_frame.tobytes()).hexdigest(),
'og_image': left_og_image
})
msg.append({
'camera_name': self.camera_name + '_right',
'frame': right_frame,
'image_timestamp': timestamp,
'image_hash': hashlib.sha1(right_frame.tobytes()).hexdigest()
'camera_name':
self.camera_name + '_right',
'frame':
right_frame,
'image_timestamp':
timestamp,
'image_hash':
hashlib.sha1(right_frame.tobytes()).hexdigest(),
'og_image': right_og_image
})
else:
frame = resize_image(image)
@@ -219,18 +334,26 @@ class SnapManager():
'camera_name': self.camera_name,
'frame': frame,
'image_timestamp': timestamp,
'image_hash': hashlib.sha1(frame.tobytes()).hexdigest()
'image_hash': hashlib.sha1(frame.tobytes()).hexdigest(),
'og_image':image
})
return msg
def capture_and_prepare(self, save_image = None):
def capture_and_prepare(self, save_image=None):
if self.resolution is not None:
width = self.resolution[1]
height = self.resolution[0]
else:
width = None
height = None
img = get_snap(self.url_api,self.username, self.password, camera_name = self.camera_name, save_image= save_image, width = width, height = height)
img = get_snap(self.url_api,
self.username,
self.password,
camera_name=self.camera_name,
save_image=save_image,
kafka_queue=self.kafka_queue,
width=width,
height=height)
if img is not None:
timestamp = time.time()
return self.format_image_for_model(img, timestamp)
@@ -244,21 +367,26 @@ class SnapManager():
break
elif msg.startswith('get'):
if '+save' in msg:
save_image = 'images/'+self.camera_name +'/'+ str(time.time()) + '.jpg'
save_image = 'images/' + self.camera_name + '/' + str(
time.time()) + '.jpg'
else:
save_image = None
logger.info(f'Processing capture for {self.camera_name}')
model_msgs = self.capture_and_prepare(save_image = save_image)
model_msgs = self.capture_and_prepare(
save_image=save_image)
for model_msg in model_msgs:
# Use put_nowait for multiprocessing queue to avoid blocking
try:
self.img_scoring_queue.put_nowait(model_msg)
except:
# Queue full, skip this message
logger.warning(f"Model queue full, dropping message from {self.camera_name}")
logger.warning(
f"Model queue full, dropping message from {self.camera_name}"
)
except queue.Empty:
pass
def start_snap_manager(**kwargs):
obj = SnapManager(**kwargs)
obj.run_forever()