From 36b7784b4e1f33f2dfb30fd90fe308de663e629a Mon Sep 17 00:00:00 2001 From: "Ishan S. Patel" Date: Sat, 4 Oct 2025 11:45:45 -0400 Subject: [PATCH] YACWC --- 00_watch_for_new_videos/load_missed.py | 10 +- 00_watch_for_new_videos/test.py | 11 ++ .../watch_and_fix_permissions.py | 162 ++++++++++-------- .../watch_and_fix_permissions_old.py | 162 ++++++++++++++++++ common/__pycache__/settings.cpython-312.pyc | Bin 296 -> 406 bytes common/__pycache__/settings.cpython-313.pyc | Bin 296 -> 406 bytes common/settings.py | 9 + 01_do_obj_det => orin/01_do_obj_det | 0 .../remove_without_nuggets.py | 0 .../put_into_vectordb.py | 0 old_01_do_obj_det => orin/old_01_do_obj_det | 0 11 files changed, 281 insertions(+), 73 deletions(-) create mode 100644 00_watch_for_new_videos/test.py create mode 100755 00_watch_for_new_videos/watch_and_fix_permissions_old.py rename 01_do_obj_det => orin/01_do_obj_det (100%) rename {02_remove_without_nuggets => orin/02_remove_without_nuggets}/remove_without_nuggets.py (100%) rename {03_put_into_vectordb => orin/03_put_into_vectordb}/put_into_vectordb.py (100%) rename old_01_do_obj_det => orin/old_01_do_obj_det (100%) diff --git a/00_watch_for_new_videos/load_missed.py b/00_watch_for_new_videos/load_missed.py index e922005..c7cccde 100755 --- a/00_watch_for_new_videos/load_missed.py +++ b/00_watch_for_new_videos/load_missed.py @@ -1,3 +1,4 @@ +#!/home/thebears/envs/ml_vision_objdet/bin/python import os import subprocess as sp @@ -18,13 +19,13 @@ if not ('__file__' in vars() or '__file__' in globals()): parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) sys.path.append(parent_dir) -logger = get_logger(__name__,'/var/log/ml_vision_logs/00_watch_and_fix_permissions', stdout=True, systemd=False) +logger = get_logger(__name__,'/var/log/ml_vision_logs/00_load_missed', stdout=True, systemd=False, level=logging.INFO) from common import settings -default_dir_watch = settings.dir_watch +default_dir_watch = settings.dir_watch_missed publish = kwq.producer.send -topic = kwq.TOPICS.videos_to_score_detection +topic = kwq.TOPICS.exit_00_videos_to_score_detection necessary_extensions = [ '.oclip_embeds.npz', @@ -32,10 +33,13 @@ necessary_extensions = [ '.db_has_sl2_embeds_part' ] def decide_to_put_in_queue(file_name): + if os.path.getsize(file_name) < 1*1000*1000: + return False disqualifiers = list() disqualifiers.append(not os.path.exists(file_name)) disqualifiers.append("_reduced" in file_name) disqualifiers.append("_trimmed" in file_name) +# disqualifiers.append('hummingbird' in file_name) disqualified = any(disqualifiers) if file_name.endswith(".mp4") and not disqualified: diff --git a/00_watch_for_new_videos/test.py b/00_watch_for_new_videos/test.py new file mode 100644 index 0000000..b4b8fbf --- /dev/null +++ b/00_watch_for_new_videos/test.py @@ -0,0 +1,11 @@ +from CommonCode import kwq +publish = kwq.producer.send +topic = kwq.TOPICS.videos_to_score_detection +cons = kwq.create_consumer() + +# %% +cons.subscribe([topic]) + +# %% +if True: + out = cons.poll() diff --git a/00_watch_for_new_videos/watch_and_fix_permissions.py b/00_watch_for_new_videos/watch_and_fix_permissions.py index 82a1c8d..f0f52a8 100755 --- a/00_watch_for_new_videos/watch_and_fix_permissions.py +++ b/00_watch_for_new_videos/watch_and_fix_permissions.py @@ -9,9 +9,14 @@ from watchdog.observers.polling import PollingObserver from watchdog.events import FileSystemEventHandler from threading import Thread from CommonCode import wq -from CommonCode import kwq +from CommonCode.util import is_ipython +import queue from CommonCode.settings import get_logger, LogColorize import argparse + +import subprocess +from collections import deque + pfm = LogColorize.watch_and_fix_permissions if not ('__file__' in vars() or '__file__' in globals()): __file__ = '/home/thebears/Source/pipelines/00_watch_for_new_videos/watch_and_fix_permissions.py' @@ -21,13 +26,19 @@ sys.path.append(parent_dir) from common import settings default_dir_watch = settings.dir_watch -# %% -logger = get_logger(__name__,'/var/log/ml_vision_logs/00_watch_and_fix_permissions', stdout=True, systemd=False) + +tasks = queue.Queue() +logger = get_logger(__name__,'/var/log/ml_vision_logs/00_watch_and_fix_permissions', stdout=True, systemd=False, level = logging.INFO) logger.info(pfm(f"Starting watch_and_fix_permissions.py")) -publish = kwq.producer.send -topic = kwq.TOPICS.videos_to_score_detection +try: + from CommonCode import kwq + publish = kwq.producer.send + topic = kwq.TOPICS.exit_00_videos_to_score_detection +except Exception as e: + logger.error("ERROR WHEN IMPORTING KAFKA:"+ str(e)) + def decide_to_put_in_queue(file_name): disqualifiers = list() disqualifiers.append("_reduced" in file_name) @@ -49,61 +60,8 @@ class logic_execute(Thread): self.event = event def run(self): - event = self.event - if isinstance(event, str): - src_path = event - else: - src_path = event.src_path - - filef = os.path.abspath(os.path.join(os.getcwd(), src_path)) - logging.debug(f"GOT_FILE: {filef}") - - while True: - files = list() - for proc in psutil.process_iter(): - try: - if "ftp" in proc.name(): - files.append(proc.open_files()) - except Exception: - pass - - f = list() - for x in files: - f.extend(x) - - if any([filef == e.path for e in f]): - pass - else: - - - - if decide_to_put_in_queue(filef): - - dirname = os.path.dirname(filef) - foldmode = stat.S_IMODE(os.stat(dirname).st_mode) - logging.info(foldmode) - if oct(foldmode) != "0o777": - os.system(f"sudo chmod 777 {dirname}") - - logging.info(f"SETTING_PERMISSIONS: {dirname}") - - - break - - time.sleep(1) - - chand = {"root": os.path.dirname(filef), "name": os.path.basename(filef)} - try: - os.chmod(filef, 0o777) - except Exception as e: - logging.error(e) - - if decide_to_put_in_queue(filef): - logging.info(f"QUEUE_PUT: {pfm(filef)}") - publish(topic, key=filef, value={"filepath": filef}) - wq.publish(wq.TOPICS.ml_vision_to_score, filef) - else: - logging.info(f"QUEUE_DO_NOT_PUT: {filef}") + tasks.put(self.event) + return class Handler(FileSystemEventHandler): @@ -130,25 +88,89 @@ def observe_path(path): return observer + + +def process_file(filef): + logging.debug(f"GOT_FILE: {pfm(filef)}") + + if not decide_to_put_in_queue(filef): + return + + dirname = os.path.dirname(filef) + foldmode = stat.S_IMODE(os.stat(dirname).st_mode) + if oct(foldmode) != "0o777": + os.system(f"sudo chmod 777 {dirname}") + + logging.info(f"SETTING_PERMISSIONS: {dirname}") + + chand = {"root": os.path.dirname(filef), "name": os.path.basename(filef)} + + try: + os.chmod(filef, 0o777) + except Exception as e: + logging.error(e) + + if decide_to_put_in_queue(filef): + logging.info(f"QUEUE_PUT: {pfm(filef)}") + wq.publish(wq.TOPICS.ml_vision_to_score, filef) + try: + future = publish(topic, key=filef, value={"filepath": filef}) + except Exception as e: + logging.error("KAFKA ERROR " + str(e)) + else: + logging.info(f"QUEUE_DO_NOT_PUT: {filef}") + + +def watch_and_decide(paths, queue_max = 5, wait_time = 1): + observers = [observe_path(path) for path in paths] + + all_files = dict() + + + while True: + for _ in range(tasks.qsize()): + x = tasks.get(block=True) + if decide_to_put_in_queue(x.src_path) and x.event_type == 'created': + logging.info(f"MONITORING_SIZE: {x.src_path}") + all_files[x.src_path] = deque(maxlen=queue_max) + + to_iter_over = list(all_files) + for path in to_iter_over: + if not os.path.exists(path): + all_files.pop(path,'') + + if path in all_files: + all_files[path].append(os.path.getsize(path)) + + for cf in list(all_files): + c_list = list(all_files[cf]) + c_set = set(c_list) + if len(c_set) == 1 and len(c_list) == queue_max: + all_files.pop(cf) + process_file(cf) + else: + pass + + time.sleep(wait_time) + + + + if __name__ == "__main__": parser = argparse.ArgumentParser( prog="Watch and Fix Permissions And Push to Kafka Queue" ) parser.add_argument("paths", nargs="*", help="Paths to monitor", default=()) + parser.add_argument("-q", default=5, type=int, help="Number of successive file sizes that are same") + parser.add_argument("-t", default=1, type=float, help="Number of seconds to wait between each file size check") args, _ = parser.parse_known_args() paths = args.paths + queue_max = args.q + wait_time = args.t + if len(paths) == 0: paths = default_dir_watch - observers = [observe_path(path) for path in paths] - - try: - while True: - time.sleep(1) - except Exception as e: - logging.error(str(e)) - for observer in observers: - observer.stop() - observer.join() + watch_and_decide(paths = paths, queue_max = queue_max, wait_time = wait_time) diff --git a/00_watch_for_new_videos/watch_and_fix_permissions_old.py b/00_watch_for_new_videos/watch_and_fix_permissions_old.py new file mode 100755 index 0000000..837a81c --- /dev/null +++ b/00_watch_for_new_videos/watch_and_fix_permissions_old.py @@ -0,0 +1,162 @@ +import os +import subprocess as sp +import time +import psutil +import stat +import sys +import logging +from watchdog.observers.polling import PollingObserver +from watchdog.events import FileSystemEventHandler +from threading import Thread +from CommonCode import wq + +from CommonCode.settings import get_logger, LogColorize +import argparse +pfm = LogColorize.watch_and_fix_permissions +if not ('__file__' in vars() or '__file__' in globals()): + __file__ = '/home/thebears/Source/pipelines/00_watch_for_new_videos/watch_and_fix_permissions.py' + +parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) +sys.path.append(parent_dir) + +from common import settings +default_dir_watch = settings.dir_watch +# %% +logger = get_logger(__name__,'/var/log/ml_vision_logs/00_watch_and_fix_permissions', stdout=True, systemd=False, level = logging.INFO) + +logger.info(pfm(f"Starting watch_and_fix_permissions.py")) + +try: + from CommonCode import kwq + publish = kwq.producer.send + topic = kwq.TOPICS.exit_00_videos_to_score_detection +except Exception as e: + logger.error("ERROR WHEN IMPORTING KAFKA:"+ str(e)) + +def decide_to_put_in_queue(file_name): + disqualifiers = list() + disqualifiers.append("_reduced" in file_name) + disqualifiers.append("_trimmed" in file_name) + + disqualified = any(disqualifiers) + + if file_name.endswith(".mp4") and not disqualified: + return True + else: + return False + + + +class logic_execute(Thread): + def __init__(self, event): + logging.debug(f"EVENT: {event}") + Thread.__init__(self) + self.event = event + + def run(self): + event = self.event + if isinstance(event, str): + src_path = event + else: + src_path = event.src_path + + filef = os.path.abspath(os.path.join(os.getcwd(), src_path)) + logging.debug(f"GOT_FILE: {pfm(filef)}") + + broke_out = False + for i in range(120): + files = list() + for proc in psutil.process_iter(): + try: + if "ftp" in proc.name(): + files.append(proc.open_files()) + except Exception: + pass + + f = list() + for x in files: + f.extend(x) + + if any([filef == e.path for e in f]): + pass + else: + if decide_to_put_in_queue(filef): + dirname = os.path.dirname(filef) + foldmode = stat.S_IMODE(os.stat(dirname).st_mode) + if oct(foldmode) != "0o777": + os.system(f"sudo chmod 777 {dirname}") + + logging.info(f"SETTING_PERMISSIONS: {dirname}") + broke_out = True + break + + time.sleep(1) + + if not broke_out: + logging.info(f"TIMED_OUT: {filef}") + + + chand = {"root": os.path.dirname(filef), "name": os.path.basename(filef)} + + try: + os.chmod(filef, 0o777) + except Exception as e: + logging.error(e) + + if decide_to_put_in_queue(filef): + logging.info(f"QUEUE_PUT: {pfm(filef)}") + wq.publish(wq.TOPICS.ml_vision_to_score, filef) + try: + future = publish(topic, key=filef, value={"filepath": filef}) + except Exception as e: + logging.error("KAFKA ERROR " + str(e)) + else: + logging.info(f"QUEUE_DO_NOT_PUT: {filef}") + + +class Handler(FileSystemEventHandler): + def on_created(self, event): + try: + self.handle_it(event) + except Exception as e: + logging.error(e) + + def handle_it(self, event): + if event.is_directory: + return + lclass = logic_execute(event) + lclass.start() + + +def observe_path(path): + event_handler = Handler() + observer = PollingObserver() + logger.info(f"Monitoring {path}") + observer.schedule(event_handler, path, recursive=True) + observer.start() + observer.is_alive() + return observer + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + prog="Watch and Fix Permissions And Push to Kafka Queue" + ) + + parser.add_argument("paths", nargs="*", help="Paths to monitor", default=()) + + args, _ = parser.parse_known_args() + paths = args.paths + if len(paths) == 0: + paths = default_dir_watch + observers = [observe_path(path) for path in paths] + + try: + while True: + time.sleep(1) + except Exception as e: + logging.error(str(e)) + + for observer in observers: + observer.stop() + observer.join() diff --git a/common/__pycache__/settings.cpython-312.pyc b/common/__pycache__/settings.cpython-312.pyc index cdb4d8aca3498b3c6262ee234454ea6071c7cd20..e535a686ecabbf14d18397148c8fc220cd1541ad 100644 GIT binary patch delta 243 zcmZ3%G>w_>G%qg~0|NuY3F8|Xfs7ORBxFPw7#OBAq%)*2EMkmeNMTH2N@t28F(x=$99z=9MRwBxh6!!o`vjld~%mi&Bt; zQi?LmQp*!7ibw~7#RFCnQn2WWERDPb>9+zGvjkJi;GiJRx*4Bx$l>eenx(7 zs(wjEYEo)qQL%n-e(A*f+IpM}3=E763=G9G3=9k(m>C%vKQVFfGBt31kYQkzy}=^$ Rft6W==^}$b5jz6|0|0B_M*IK( delta 107 zcmbQnyn>1EG%qg~0|NuY)8?v-jSLg{Bt&=^7#OBAq%)*2EMkmesASY+(qxw_>GcPX}0|NuY3F8|Xfs7ORBxFPw7#OB9=raT}1Tq#e1TzLR=`$5ErZa1@ zXmU=p@?rcnalROn;jfAF#p|nt^oxti^wUZT^vjD<^U4!Tk~69V;bKXN$=Q{OMJY%^ zDMguOspW~4MNA9~3`NWg3=DpnOt&~wGK=EDx^D@1EGcPX}0|NuY)8?v-jSLg{Bt&=^7#OB9=raT}1Tq#eq%&$VX|hdp@?or+ z*e?zu#Q2T;G#PJkreqeymnW7aXG~UNbdzIaU|?WmU|=W)Y5%~?$jJDKiJh0Jk+X<} Hfq?-4QNI^U diff --git a/common/settings.py b/common/settings.py index 6f862bf..6e52ebc 100644 --- a/common/settings.py +++ b/common/settings.py @@ -5,5 +5,14 @@ dir_watch = [ "/srv/ftp/sidefeeder", "/srv/ftp_tcc/leopards1", "/srv/ftp_tcc/leopards2", + "/srv/ftp/wrenwatch", + "/srv/ftp/backyard", + "/srv/ftp/driveway" +] + + +dir_watch_missed = [ + "/srv/ftp_tcc/leopards1", + "/srv/ftp_tcc/leopards2", ] diff --git a/01_do_obj_det b/orin/01_do_obj_det similarity index 100% rename from 01_do_obj_det rename to orin/01_do_obj_det diff --git a/02_remove_without_nuggets/remove_without_nuggets.py b/orin/02_remove_without_nuggets/remove_without_nuggets.py similarity index 100% rename from 02_remove_without_nuggets/remove_without_nuggets.py rename to orin/02_remove_without_nuggets/remove_without_nuggets.py diff --git a/03_put_into_vectordb/put_into_vectordb.py b/orin/03_put_into_vectordb/put_into_vectordb.py similarity index 100% rename from 03_put_into_vectordb/put_into_vectordb.py rename to orin/03_put_into_vectordb/put_into_vectordb.py diff --git a/old_01_do_obj_det b/orin/old_01_do_obj_det similarity index 100% rename from old_01_do_obj_det rename to orin/old_01_do_obj_det