From 3dea3786e8d22a9db165d66e9765c6eb3b344169 Mon Sep 17 00:00:00 2001 From: "Ishan S. Patel" Date: Fri, 18 Jul 2025 13:03:16 -0400 Subject: [PATCH] YACWC --- 00_watch_for_new_videos/load_missed.py | 75 +++++++++ .../watch_and_fix_permissions.py | 142 ++++++++++++++++++ 01_do_obj_det | 1 + .../remove_without_nuggets.py | 110 ++++++++++++++ 03_put_into_vectordb/put_into_vectordb.py | 115 ++++++++++++++ common/settings.py | 9 ++ old_01_do_obj_det | 1 + 7 files changed, 453 insertions(+) create mode 100755 00_watch_for_new_videos/load_missed.py create mode 100755 00_watch_for_new_videos/watch_and_fix_permissions.py create mode 160000 01_do_obj_det create mode 100755 02_remove_without_nuggets/remove_without_nuggets.py create mode 100644 03_put_into_vectordb/put_into_vectordb.py create mode 100644 common/settings.py create mode 160000 old_01_do_obj_det diff --git a/00_watch_for_new_videos/load_missed.py b/00_watch_for_new_videos/load_missed.py new file mode 100755 index 0000000..730fae8 --- /dev/null +++ b/00_watch_for_new_videos/load_missed.py @@ -0,0 +1,75 @@ +import os +import subprocess as sp +import time +import psutil +import stat +import sys +import logging +from watchdog.observers.polling import PollingObserver +from watchdog.events import FileSystemEventHandler +from threading import Thread +from CommonCode import wq +from CommonCode import kwq +from CommonCode.settings import get_logger, LogColorize +import argparse + + +publish = kwq.producer.send +topic = kwq.TOPICS.videos_to_score_detection + +def decide_to_put_in_queue(file_name): + disqualifiers = list() + disqualifiers.append("_reduced" in file_name) + disqualifiers.append("_trimmed" in file_name) + + disqualified = any(disqualifiers) + + if file_name.endswith(".mp4") and not disqualified: + return True + else: + return False + +def place_file_in_queue(filef): + dirname = os.path.dirname(filef) + foldmode = stat.S_IMODE(os.stat(dirname).st_mode) + if oct(foldmode) != "0o777": + os.system(f"sudo chmod 777 {dirname}") + + logging.info(f"SETTING_PERMISSIONS: {dirname}") + + chand = {"root": os.path.dirname(filef), "name": os.path.basename(filef)} + try: + os.chmod(filef, 0o777) + except Exception as e: + logging.error(e) + + logging.info(f"QUEUE_PUT: {filef}") + publish(topic, key=filef, value={"filepath": filef}) + + +# %% +paths = ['/srv/ftp/sidefeeder','/srv/ftp/ptz'] +queued = set() +for rt in paths: + for root, dirs, files in os.walk(rt): + for f in files: + new_path = os.path.join(root, f) + if decide_to_put_in_queue(new_path): + queued.add(new_path) + + +for x in queued: + place_file_in_queue(x) + + +# %% +if __name__ == "__main__": + parser = argparse.ArgumentParser( + prog="Watch and Fix Permissions And Push to Kafka Queue" + ) + parser.add_argument("paths", nargs="+", help="Paths to monitor") + args = parser.parse_args() + paths = args.paths + + + diff --git a/00_watch_for_new_videos/watch_and_fix_permissions.py b/00_watch_for_new_videos/watch_and_fix_permissions.py new file mode 100755 index 0000000..13e54d2 --- /dev/null +++ b/00_watch_for_new_videos/watch_and_fix_permissions.py @@ -0,0 +1,142 @@ +import os +import subprocess as sp +import time +import psutil +import stat +import sys +import logging +from watchdog.observers.polling import PollingObserver +from watchdog.events import FileSystemEventHandler +from threading import Thread +from CommonCode import wq +from CommonCode import kwq +from CommonCode.settings import get_logger, LogColorize +import argparse +pfm = LogColorize.watch_and_fix_permissions + +logger = get_logger(__name__,'/var/log/ml_vision_logs/00_watch_and_fix_permissions', stdout=True, systemd=False) + +logger.info(pfm(f"Starting watch_and_fix_permissions.py")) + +publish = kwq.producer.send +topic = kwq.TOPICS.videos_to_score_detection +def decide_to_put_in_queue(file_name): + disqualifiers = list() + disqualifiers.append("_reduced" in file_name) + disqualifiers.append("_trimmed" in file_name) + + disqualified = any(disqualifiers) + + if file_name.endswith(".mp4") and not disqualified: + return True + else: + return False + + + +class logic_execute(Thread): + def __init__(self, event): + logging.debug(f"EVENT: {event}") + Thread.__init__(self) + self.event = event + + def run(self): + event = self.event + if isinstance(event, str): + src_path = event + else: + src_path = event.src_path + + filef = os.path.abspath(os.path.join(os.getcwd(), src_path)) + logging.debug(f"GOT_FILE: {filef}") + + while True: + files = list() + for proc in psutil.process_iter(): + try: + if "ftp" in proc.name(): + files.append(proc.open_files()) + except Exception: + pass + + f = list() + for x in files: + f.extend(x) + + if any([filef == e.path for e in f]): + pass + else: + + + + if decide_to_put_in_queue(filef): + + dirname = os.path.dirname(filef) + foldmode = stat.S_IMODE(os.stat(dirname).st_mode) + logging.info(foldmode) + if oct(foldmode) != "0o777": + os.system(f"sudo chmod 777 {dirname}") + + logging.info(f"SETTING_PERMISSIONS: {dirname}") + + + break + + time.sleep(1) + + chand = {"root": os.path.dirname(filef), "name": os.path.basename(filef)} + try: + os.chmod(filef, 0o777) + except Exception as e: + logging.error(e) + + if decide_to_put_in_queue(filef): + logging.info(f"QUEUE_PUT: {pfm(filef)}") + publish(topic, key=filef, value={"filepath": filef}) + wq.publish(wq.TOPICS.ml_vision_to_score, filef) + else: + logging.info(f"QUEUE_DO_NOT_PUT: {filef}") + + +class Handler(FileSystemEventHandler): + def on_created(self, event): + try: + self.handle_it(event) + except Exception as e: + logging.error(e) + + def handle_it(self, event): + if event.is_directory: + return + lclass = logic_execute(event) + lclass.start() + + +def observe_path(path): + event_handler = Handler() + observer = PollingObserver() + logger.info(f"Monitoring {path}") + observer.schedule(event_handler, path, recursive=True) + observer.start() + observer.is_alive() + return observer + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + prog="Watch and Fix Permissions And Push to Kafka Queue" + ) + parser.add_argument("paths", nargs="+", help="Paths to monitor") + args = parser.parse_args() + paths = args.paths + observers = [observe_path(path) for path in paths] + + try: + while True: + time.sleep(1) + except Exception as e: + logging.error(str(e)) + + for observer in observers: + observer.stop() + observer.join() diff --git a/01_do_obj_det b/01_do_obj_det new file mode 160000 index 0000000..8625930 --- /dev/null +++ b/01_do_obj_det @@ -0,0 +1 @@ +Subproject commit 8625930f38cac08578435042c18080073c5e4ed9 diff --git a/02_remove_without_nuggets/remove_without_nuggets.py b/02_remove_without_nuggets/remove_without_nuggets.py new file mode 100755 index 0000000..4d0d869 --- /dev/null +++ b/02_remove_without_nuggets/remove_without_nuggets.py @@ -0,0 +1,110 @@ +#!/usr/bin/python +import glob +import argparse +import os +import json +from CommonCode.util import exit_if_not_ipython, get_cset_for_file_matching, is_ipython +from CommonCode import kwq +from CommonCode.settings import get_logger, LogColorize +from kafka import TopicPartition +from kafka.structs import OffsetAndMetadata +from pprint import pprint + +pfm = LogColorize.remove_without_nuggets +logger = get_logger(__name__,'/var/log/ml_vision_logs/02_remove_without_nuggets', stdout=True, systemd=False) + +input_topic = kwq.TOPICS.videos_scored_detection + +client_id='obj_detector_remove_without_nuggets' +group_id = client_id + +consumer = kwq.create_consumer(input_topic, group_id = group_id, client_id = client_id) +c_part = TopicPartition(input_topic, 0) +consumer.assign([c_part]) + +producer = kwq.producer + + +class RESULT_TYPE(): + NOTHING = 0 + HAS_OBJS = 1 + KEPT = 2 + REMOVED = 3 + NO_JSON = 4 + + +def exec_file_remove_logic(file_path): + result = RESULT_TYPE.NOTHING + cset = get_cset_for_file_matching(file_path) + logger.info(f"EXECUTING_LOGIC :{file_path}") + json_check = '.json.orin' + if '.has_objs' in cset: + logger.info(f"HAS_OBJS :{file_path}") + result = RESULT_TYPE.HAS_OBJS + return result + + + if json_check in cset and os.path.exists(cset[ json_check]): + logger.info(f"HAS_JSON :{file_path}") + if os.path.getsize(cset[json_check]) == 0: + det_data = {'scores':[]} + else: + with open(cset[json_check],'r') as jj: + det_data = json.load(jj) + + thresh = 0.2 + frames_with_dets_thresh = 0.1 + + all_scores = list() + for x in det_data['scores']: + sc_in = [y['score'] for y in x['detections']] + sc_in.append(0) + all_scores.append(max(sc_in)) + + frames_above_thresh = sum([x>thresh for x in all_scores ]) + num_frames = len(det_data['scores']) + + ratio_with_frames = frames_above_thresh / (num_frames + 1) + if ratio_with_frames > frames_with_dets_thresh: + cpath = os.path.splitext(cset['.mp4'])[0] + '.has_objs.orin' + with open(cpath,'w') as cc: + pass + + + logger.info(f"HAS_OBJECTS_DETECTED_WILL_NOT_REMOVE :{pfm(file_path)}") + result = RESULT_TYPE.KEPT + + else: + logger.info(f"OBJECTS_NOT_DETECTED_WILL_REMOVE :{pfm(file_path)}") + for ff, cf in cset.items(): + if cf.endswith(json_check) or cf.endswith('.jpg'): + pass + else: + logger.info(f"REMOVING :{cf}") + os.remove(cf) + + result = RESULT_TYPE.REMOVED + + else: + result = RESULT_TYPE.NO_JSON + logger.info(f"NO_JSON_DONT_REMOVE :{pfm(file_path)}") + + + return result + + +for msg in consumer: + key = msg.key + value = msg.value + + purge_reason = exec_file_remove_logic(value['filepath']) + d_send = {'value':msg.value, 'key':msg.key} + if purge_reason in [RESULT_TYPE.KEPT, RESULT_TYPE.HAS_OBJS]: + producer.send(kwq.TOPICS.videos_with_nuggets, **d_send) + elif purge_reason == RESULT_TYPE.REMOVED: + producer.send(kwq.TOPICS.videos_without_nuggets, **d_send) + elif purge_reason == RESULT_TYPE.NO_JSON: + producer.send(kwq.TOPICS.videos_no_json, **d_send) + +# %% + diff --git a/03_put_into_vectordb/put_into_vectordb.py b/03_put_into_vectordb/put_into_vectordb.py new file mode 100644 index 0000000..7d0ba00 --- /dev/null +++ b/03_put_into_vectordb/put_into_vectordb.py @@ -0,0 +1,115 @@ +import glob +from pymilvus import MilvusClient +from pymilvus.client.types import LoadState + +import argparse +import os +import json +from CommonCode.util import exit_if_not_ipython, get_cset_for_file_matching, is_ipython +from CommonCode import kwq +from CommonCode.settings import get_logger, LogColorize +from kafka import TopicPartition +from kafka.structs import OffsetAndMetadata +from pprint import pprint +import numpy as np +pfm = LogColorize.embeds_in_db +logger = get_logger(__name__,'/var/log/ml_vision_logs/03_put_into_vectordb', stdout=True, systemd=False) + +input_topic = kwq.TOPICS.videos_with_nuggets + +client_id='embedding_place_in_db_1' +group_id = client_id + +consumer = kwq.create_consumer(input_topic, group_id = group_id, client_id = client_id) +c_part = TopicPartition(input_topic, 0) +consumer.assign([c_part]) + + +producer = kwq.producer +model_type = 'ViT-L-16-SigLIP2-512' + +def get_db_embed_done_path(vpath): + return os.path.splitext(vpath)[0]+'.oclip.orin.indb' + + +#video_file_to_upload='/srv/ftp/ptz/2025/04/14/PTZBackRight_00_20250414063817.mp4' + + +def get_vec_path(vpath): + return os.path.splitext(vpath)[0]+'.oclip.orin' + +def get_date(vpath): + split_entries = os.path.splitext(vpath)[0].split('/') + return ''.join(split_entries[-4:-1]) + +def get_camera_name(vpath): + split_entries = os.path.splitext(vpath)[0].split('/') + return split_entries[split_entries.index('ftp')+1] + +def upload_vector_file(video_file_to_upload): + client = MilvusClient( + uri="http://localhost:19530" + ) + + db_done_path = get_db_embed_done_path(video_file_to_upload) + if os.path.exists(db_done_path): + print('Already exists in DB, skipping upload') +# return + + video_file_to_upload = get_vec_path(video_file_to_upload) + + with open(video_file_to_upload,'r') as jj: + vecs = json.load(jj) + + + embeds = [x['score'] for x in vecs['scores']] + fr_nums = [x['frame'] for x in vecs['scores']] + + fname_root = video_file_to_upload.rsplit('/',1)[-1].split('.')[0] + fc = fname_root.split('_')[-1] + +# data = list() + filepath = video_file_to_upload.replace('/srv/ftp/','').replace('/mergedfs/ftp','').split('.')[-0] + + data_v2 = list() + date = get_date(filepath) + for embed, frame_num in zip(embeds, fr_nums): + fg = '{0:05g}'.format(frame_num) + id_num = int(fc+fg) + embeds_as_np = np.asarray(embed, dtype=np.float16) + to_put_2 = dict(primary_id= id_num, filepath=filepath, frame_number = int(frame_num), so400m=embeds_as_np, date=str(date)) + data_v2.append(to_put_2) + + cam_name = get_camera_name(video_file_to_upload) + client.insert(collection_name = f'nuggets_{cam_name}_so400m_siglip2', data=data_v2) + client.close() + + + with open(db_done_path,'w') as ff: + ff.write("") + + print(f'Inserting into DB, {video_file_to_upload}') + + + +for msg in consumer: + key = msg.key + value = msg.value + file_path = value['filepath'] + success = False + try: + upload_vector_file(value['filepath']) + success = True + logger.info(f"SUCCESS_UPLOADING :{pfm(file_path)}") + except Exception as e: + logger.info(f"ERROR_UPLOADING :{pfm(file_path)} + {e}") + + d_send = {'value':msg.value, 'key':msg.key} + + if success: + send_topic = kwq.TOPICS.videos_embedding_in_db + else: + send_topic = kwq.TOPICS.videos_embedding_in_db_fail + + + producer.send(send_topic, **d_send) diff --git a/common/settings.py b/common/settings.py new file mode 100644 index 0000000..534c496 --- /dev/null +++ b/common/settings.py @@ -0,0 +1,9 @@ +dir_watch = [ + "/srv/ftp/hummingbird", + "/srv/ftp/ptz", + "/srv/ftp/railing/", + "/srv/ftp/sidefeeder", + "/srv/ftp_tcc/leopards1", + "/srv/ftp_tcc/leopard", +] + diff --git a/old_01_do_obj_det b/old_01_do_obj_det new file mode 160000 index 0000000..8625930 --- /dev/null +++ b/old_01_do_obj_det @@ -0,0 +1 @@ +Subproject commit 8625930f38cac08578435042c18080073c5e4ed9