commit 62cf036cc61f1d9017fe0377902e0836aeb8fbf2 Author: Ishan S. Patel Date: Wed Jul 2 12:08:56 2025 -0400 YACWC diff --git a/01_do_obj_det/ml_run.py b/01_do_obj_det/ml_run.py new file mode 100644 index 0000000..8328553 --- /dev/null +++ b/01_do_obj_det/ml_run.py @@ -0,0 +1,7 @@ +from model_runner import ModelRunner +mr = ModelRunner() +# %% +mr.init_model_det() +mr.init_model_clip() +# %% +scored_results = mr.score_video('/home/thebears/source/ml_code/short.mp4') diff --git a/01_do_obj_det/model_runner.py b/01_do_obj_det/model_runner.py new file mode 100644 index 0000000..b441528 --- /dev/null +++ b/01_do_obj_det/model_runner.py @@ -0,0 +1,310 @@ +import sys + +sys.path.insert(0, "/home/thebears/source/models/yolov7") +import time +import base64 as b64 +from datetime import datetime +import cv2 +import numpy as np +import json +from pymediainfo import MediaInfo +import inspect +import open_clip +import sys +import torch +import yaml +from models.experimental import attempt_load +from utils.general import check_img_size, non_max_suppression +from torchvision import transforms +import torch.nn.functional as F +import os +device = torch.device("cuda") + + +# %% +class ModelRunner: + def __init__(self): + self.pretrained_name = "webli" + self.model_name = "ViT-SO400M-16-SigLIP2-512" + self.det_root_path = "/home/thebears/source/model_weights" + + def init_model_clip(self): + if hasattr(self, 'clip_preprocess'): + return + + + model_name = self.model_name + pretrained_name = self.pretrained_name + + clip_model, _, clip_preprocess_og = open_clip.create_model_and_transforms( + model_name, pretrained=pretrained_name + ) + tokenizer = open_clip.get_tokenizer("hf-hub:timm/" + model_name) + clip_model = clip_model.half().to(device) + clip_dtype = next(clip_model.parameters()).dtype + clip_img_size = clip_preprocess_og.transforms[0].size + clip_model.encode_image( + torch.rand(1, 3, *clip_img_size, dtype=clip_dtype, device=device)) + clip_preprocess = transforms.Compose( + [clip_preprocess_og.transforms[x] for x in [0, 3]] + ) + self.clip_model = clip_model + self.clip_preprocess_og = clip_preprocess_og + self.clip_tokenizer = tokenizer + self.clip_dtype = clip_dtype + self.clip_img_size = clip_img_size + self.clip_preprocess = clip_preprocess + + def init_model_det(self): + if hasattr(self, 'det_model'): + return + + det_root_path = self.det_root_path + det_model_weights_root = os.path.join(det_root_path, "yolov7") + det_model_weights_path = os.path.join(det_model_weights_root, "best.pt") + det_data_yaml_path = os.path.join(det_model_weights_root, "inaturalist.yaml") + det_model = attempt_load(det_model_weights_path, map_location=device) + det_model = det_model.half().to(device) + + det_dtype = next(det_model.parameters()).dtype + det_imgsz = 1280 + det_stride = int(det_model.stride.max()) + det_imgsz = check_img_size(det_imgsz, s=det_stride) + _ = det_model( + torch.zeros(1, 3, det_imgsz, det_imgsz, dtype=det_dtype).to(device) + ) + + with open(det_data_yaml_path, "r") as ff: + det_model_info = yaml.safe_load(ff) + det_labels = det_model_info["names"] + + self.det_dtype = det_dtype + self.det_imgsz = det_imgsz + self.det_stride = det_stride + self.det_model_info = det_model_info + self.det_labels = det_labels + self.det_model = det_model + + def get_det_vid_preprocessor(self, vid_h, vid_w): + if not hasattr(self, "_det_vid_preprocessors"): + self._det_vid_preprocessors = dict() + self.curr_det_vid_preprocessor = None + dict_key = (vid_h, vid_w) + det_stride = self.det_stride + if dict_key in self._det_vid_preprocessors: + self.curr_det_vid_preprocessor = self._det_vid_preprocessors[dict_key] + return self.curr_det_vid_preprocessor + + target_max = self.det_imgsz + + if vid_h > vid_w: + target_h = target_max + target_w = target_max * vid_w / vid_h + elif vid_h == vid_w: + target_h = target_max + target_w = target_max + elif vid_h < vid_w: + target_h = target_max * vid_h / vid_w + target_w = target_max + + target_h = int(target_h) + target_w = int(target_w) + + pad_amt = [None, None, None, None] + if target_w % det_stride != 0: + off = det_stride - target_w % det_stride + new_w = target_w + off + pad_diff = new_w - target_w + pad_left = round(pad_diff / 2) + pad_right = pad_diff - pad_left + pad_amt[0] = pad_left + pad_amt[2] = pad_right + else: + pad_amt[0] = 0 + pad_amt[2] = 0 + + if target_h % det_stride != 0: + off = det_stride - target_h % det_stride + new_h = target_h + off + pad_diff = new_h - target_h + pad_up = round(pad_diff / 2) + pad_down = pad_diff - pad_up + pad_amt[1] = pad_up + pad_amt[3] = pad_down + else: + pad_amt[1] = 0 + pad_amt[3] = 0 + + det_vid_preprocess = transforms.Compose( + [transforms.Resize((target_h, target_w)), transforms.Pad(pad_amt, fill=127)] + ) + + self.target_h = target_h + self.target_w = target_w + self.pad_amt = pad_amt + + self._det_vid_preprocessors[dict_key] = det_vid_preprocess + self.curr_det_vid_preprocessor = self._det_vid_preprocessors[dict_key] + return self.curr_det_vid_preprocessor + + def score_frames_det(self, array_score, det_vid_preprocess=None): + det_model = self.det_model + if det_vid_preprocess is None: + det_vid_preprocess = self.curr_det_vid_preprocessor + + frame_numbers = [x[0] for x in array_score] + frame_values = [x[1] for x in array_score] + frame_as_tensor = ( + torch.from_numpy(np.stack(frame_values)[:, :, :, 0:3]) + .to(torch.float16) + .to(device) + .permute([0, 3, 1, 2]) + ) + + with torch.no_grad(): + frame_for_model = det_vid_preprocess(frame_as_tensor).div(255)[ + :, [2, 1, 0], :, : + ] + det_preds = det_model(frame_for_model)[0] + det_pred_post_nms = non_max_suppression(det_preds, 0.25, 0.5) + det_cpu_pred = [x.detach().cpu().numpy() for x in det_pred_post_nms] + + return {"det": det_cpu_pred, "fr#": frame_numbers} + + def score_frames_clip(self, clip_array_score): + frame_numbers = [x[0] for x in clip_array_score] + frame_values = [x[1] for x in clip_array_score] + frame_as_tensor = ( + torch.from_numpy(np.stack(frame_values)[:, :, :, 0:3]) + .to(torch.float16) + .to(device) + .permute([0, 3, 1, 2]) + ) + + with torch.no_grad(): + frame_for_clip = self.clip_preprocess(frame_as_tensor[:, [0, 1, 2], :, :]) + clip_pred = self.clip_model.encode_image(frame_for_clip).detach().cpu().numpy() + + return {"clip": clip_pred, "fr#": frame_numbers} + + def get_video_info(self, file_path): + file_info = MediaInfo.parse(file_path) + video_info = None + frame_count = 0 + if len(file_info.video_tracks) > 0: + video_info = file_info.video_tracks[0] + + video_info.frame_count = int(video_info.frame_count) + return video_info + + def score_video(self, file_to_score, batch_size = 6, clip_interval = 10): + video_info = self.get_video_info(file_to_score) + vid_decoder = "h264parse" + if video_info.format.lower() == "HEVC".lower(): + vid_decoder = "h265parse" + + gst_cmd = "filesrc location={file_to_score} ! qtdemux name=demux demux.video_0 ! queue ! {vid_decoder} ! nvv4l2decoder ! nvvidconv ! videoscale method=1 add-borders=false ! video/x-raw,width=1280,height=1280 ! appsink sync=false".format( + file_to_score=file_to_score, vid_decoder=vid_decoder + ) + cap_handle = cv2.VideoCapture(gst_cmd, cv2.CAP_GSTREAMER) + vid_h = video_info.height + vid_w = video_info.width + + vid_preprocessor = self.get_det_vid_preprocessor(vid_h, vid_w) + target_w = self.target_w + target_h = self.target_h + pad_amt = self.pad_amt + + + array_score = list() + final_output = dict() + final_output["start_score_time"] = time.time() + final_output["num_frames"] = video_info.frame_count + st = time.time() + frame_numbers = list() + det_results = list() + clip_results = list() + clip_frame_numbers = list() + + clip_array = list() + + for i in range(video_info.frame_count): + success, frame_matrix = cap_handle.read() + + if not success: + break + + array_score.append((i, frame_matrix)) + + if len(array_score) >= batch_size: + score_result = self.score_frames_det(array_score, det_vid_preprocess = vid_preprocessor) + det_results.extend(score_result["det"]) + frame_numbers.extend(score_result["fr#"]) + array_score = list() + + if not (i % clip_interval): + clip_score_result = self.score_frames_clip([(i, frame_matrix)]) + clip_results.extend(clip_score_result["clip"]) + clip_frame_numbers.extend(clip_score_result["fr#"]) + + + if len(array_score) > 0: + score_result = self.score_frames_det(array_score, det_vid_preprocess = vid_preprocessor) + det_results.extend(score_result["det"]) + frame_numbers.extend(score_result["fr#"]) + + cap_handle.release() + + + final_output["end_score_time"] = time.time() + final_output["video"] = { + "w": vid_w, + "h": vid_h, + "path": file_to_score, + "target_w": target_w, + "target_h": target_h, + "pad_amt": pad_amt, + } + + try: + final_output["scoring_fps"] = final_output["num_frames"] / ( + final_output["end_score_time"] - final_output["start_score_time"] + ) + except Exception as e: + pass + + final_output["scores"] = list() + + + clip_results_as_np = np.asarray(clip_results) + + for frame_number, frame in zip(frame_numbers, det_results): + cframe_dict = dict() + cframe_dict["frame"] = frame_number + cframe_dict["detections"] = list() + + for det in frame: + data = dict() + data["coords"] = [float(x) for x in list(det[0:4])] + data["score"] = float(det[4]) + data["idx"] = int(det[5]) + + try: + data["name"] = det_labels[data["idx"]] + except: + data["name"] = "Code failed" + + cframe_dict["detections"].append(data) + + final_output["scores"].append(cframe_dict) + + emb_dict = dict() + + emb_dict["frame_numbers"] = clip_frame_numbers + emb_dict["array_size"] = clip_results_as_np.shape + emb_dict["array_dtype"] = str(clip_results_as_np.dtype) + emb_dict["array_binary"] = b64.b64encode(clip_results_as_np).decode() + + final_output["embeds"] = emb_dict + + return final_output diff --git a/01_do_obj_det/wait_for_new_messages.py b/01_do_obj_det/wait_for_new_messages.py new file mode 100644 index 0000000..44611d8 --- /dev/null +++ b/01_do_obj_det/wait_for_new_messages.py @@ -0,0 +1,167 @@ + +from CommonCode import kwq +import time +import json +import logging +import os +from CommonCode.settings import get_logger, LogColorize +from kafka import TopicPartition +from kafka.structs import OffsetAndMetadata +pfm = LogColorize.score_obj_det_orin +logger = get_logger(__name__,'/var/log/ml_vision_logs/01_score_obj_det_orin', stdout=True, systemd=False) + +os.system("sudo /usr/bin/systemctl restart --now systemd-journal-upload.service") + +logger.info(pfm(f"Starting wait_for_new_messages.py on orin for scoring object detection")) +input_topic = kwq.TOPICS.videos_to_score_detection + +topic_produce = kwq.TOPICS.videos_scored_detection + +client_id = 'obj_detector_orin_3' +group_id = client_id +# %% + + +import json + + +logger.debug("Starting Kafka Consumer") + +from deepstream_obj_det import run_inference, target_width, target_height +import deepstream_obj_det as dod +import os + +os.environ.pop("DISPLAY",None) + +def run_inference_for_file(file_path): + start_time = time.time() + + end_time = time.time() + + pre_path, _ = os.path.splitext(file_path) + + det_path = pre_path + '.json.orin' + emb_path = pre_path + '.oclip.orin' + if os.path.exists(det_path) and os.path.exists(emb_path): + return "Already scored" + + if not os.path.exists(file_path): + return "Movie does not exist" + + cmd = f'/usr/bin/ffprobe -v error -select_streams v:0 -count_packets -show_entries stream=nb_read_packets -of csv=p=0 {file_path}'#/srv/ftp/railing/2025/02/28/railing_00_20250228115800.mp4 + try: + frames = int(os.popen(cmd).read().strip()) + except: + frames = 0 + + logger.info(f"TOTAL_FRAMES: {frames}") + if frames < 30: + logger.info(f"TOTAL_FRAMES_SKIPPING: {file_path}") + return "FAILED, NOT ENOUGH FRAMES" + + while True: + try: + with open(det_path,'w') as ff: + out = ff.write(' '*100) + except OSError as e: + logger.error(f"NO_SPACE :{det_path}:{e}") + else: + break + + time.sleep(5) + + try: + os.remove(det_path) + except: + pass + logger.info(f"ENOUGH SPACE, STARTING INFERENCE") + detector_results, embedder_results = run_inference(file_path) + obj_det_dict = dict() + obj_det_dict['meta'] = {'model_version':'orin_v1'} + obj_det_dict['scoring'] = {'start_time':start_time, 'end_time': end_time} + obj_det_dict['json'] ={'path':det_path} + obj_det_dict['video'] = {'path':file_path, 'target_w': target_width, 'target_h': target_height} + + + + + by_frame_num = dict() + for idx, sc in enumerate(detector_results): + c_res = dict() + c_frame = sc['frame_number'] + c_res['score'] = sc['score'] + c_res['L'] = sc['left'] + c_res['T'] = sc['top'] + c_res['W'] = sc['width'] + c_res['H'] = sc['height'] + c_res['name'] = sc['label'] + c_res['idx'] = sc['class_id'] + if c_frame not in by_frame_num: + by_frame_num[c_frame] = list() + + by_frame_num[c_frame].append(c_res) + + obj_det_dict['scores'] = [{'frame':key, 'detections':val} for key,val in by_frame_num.items()] + + + with open(det_path,'w') as ff: + json.dump(obj_det_dict, ff, indent=4) + + + emb_dict = dict() + emb_dict['meta'] = {'model_version':'ViT-L-16-SigLIP2-512','host':'orin'} + emb_dict['scoring'] = {'start_time':start_time, 'end_time': end_time} + emb_dict['json'] ={'path':det_path} + emb_dict['video'] = {'path':file_path, 'target_w': target_width, 'target_h': target_height} + emb_dict['scores'] = list() + + + for c_score in embedder_results: + fr_num = c_score['frame_number'] + vect = c_score['vector'] + emb_dict['scores'].append({'score':vect, 'frame':fr_num}) + + with open(emb_path,'w') as ff: + json.dump(emb_dict, ff, indent=4) + + return "Success" + +# %% +consumer = kwq.create_consumer(input_topic, group_id = group_id, client_id = client_id) +#consumer.subscribe(input_topic) + +c_part = TopicPartition(input_topic, 0) +consumer.assign([c_part]) + + +c_committed = consumer.committed(c_part) +logger.info(f"KAFKA_POSITION_IS: {str(consumer.position(c_part))}") + +if c_committed is None: + logger.info(f"KAFKA_POSITION_NOT_COMMITTED") +else: + logger.info(f"KAFKA_POSITION_COMMITTED_IS: {c_committed}") + consumer.seek(c_part, c_committed) +logger.info("START POLLING") + +#while True: +# out = consumer.poll(timeout_ms=5000 , update_offsets = False) +# msgs = list() +# logger.info(f"KAFKA_POSITION_COMMITTED_IS: {str(consumer.committed(c_part))}") +# logger.info(f"KAFKA_POSITION_IS: {str(consumer.position(c_part))}") +# +# for k, v in out.items(): +# msgs.extend(v) +# for message in msgs: +producer = kwq.producer +for message in consumer: + logger.info(f"KAFKA_POSITION_COMMITTED_IS: {str(consumer.committed(c_part))}") + logger.info(f"KAFKA_POSITION_IS: {str(consumer.position(c_part))}") + logger.info(f"MSG_RECEIVED :{message}") + logger.info(f"INFERENCE_START: {pfm(message.key)}") + result = run_inference_for_file(message.key) + logger.info(f"INFERENCE_DONE:{pfm(result)} {message.key}") + oandm = OffsetAndMetadata(message.offset,'') + consumer.commit({c_part:oandm}) + producer.send(topic_produce, value=message.value, key=message.key) +