diff --git a/vision_v3/00_watch_for_new_videos/load_missed.py b/vision_v3/00_watch_for_new_videos/load_missed.py new file mode 100755 index 0000000..e694c2f --- /dev/null +++ b/vision_v3/00_watch_for_new_videos/load_missed.py @@ -0,0 +1,111 @@ +#!/home/thebears/envs/ml_vision_objdet/bin/python +import os + +import subprocess as sp +import time +import psutil +import stat +import sys +import logging +from watchdog.observers.polling import PollingObserver +from watchdog.events import FileSystemEventHandler +from threading import Thread +from CommonCode import wq +from CommonCode import kwq +from CommonCode.settings import get_logger, LogColorize +import argparse +if not ('__file__' in vars() or '__file__' in globals()): + __file__ = '/home/thebears/Source/pipelines/00_watch_for_new_videos/watch_and_fix_permissions.py' + +parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) +sys.path.append(parent_dir) +logger = get_logger(__name__,'/var/log/ml_vision_logs/00_load_missed', stdout=True, systemd=False, level=logging.INFO) +from common import settings + +#default_dir_watch = settings.dir_watch_missed +default_dir_watch = settings.dir_watch + +publish = kwq.producer.send +topic = kwq.TOPICS.exit_00_videos_to_score_detection + +necessary_extensions = [ + '.oclip_embeds.npz', + '.json', + '.db_has_sl2_embeds_part' ] + +def decide_to_put_in_queue(file_name): + if os.path.getsize(file_name) < 1*1000*1000: + return False + disqualifiers = list() + disqualifiers.append(not os.path.exists(file_name)) + disqualifiers.append("_reduced" in file_name) + disqualifiers.append("_trimmed" in file_name) +# disqualifiers.append('hummingbird' in file_name) + + disqualified = any(disqualifiers) + if file_name.endswith(".mp4") and not disqualified: + rt_path, _ = os.path.splitext(file_name) + all_exist = all([os.path.exists( rt_path + ext) for ext in necessary_extensions]) + return (not all_exist) + else: + return False + + +parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) +sys.path.append(parent_dir) +def place_file_in_queue(filef): + dirname = os.path.dirname(filef) + foldmode = stat.S_IMODE(os.stat(dirname).st_mode) + if oct(foldmode) != "0o777": + os.system(f"sudo chmod 777 {dirname}") + + logging.info(f"SETTING_PERMISSIONS: {dirname}") + + chand = {"root": os.path.dirname(filef), "name": os.path.basename(filef)} + try: + + os.chmod(filef, 0o777) + except Exception as e: + pass +#$ logging.error(e) + + logging.info(f"QUEUE_PUT: {filef}") + publish(topic, key=filef, value={"filepath": filef}) + wq.publish(wq.TOPICS.ml_vision_to_score, filef) + + +def add_files_to_queue(paths, dry_run = False): + queued = set() + for rt in paths: + for root, dirs, files in os.walk(rt): + for f in files: + new_path = os.path.join(root, f) + if decide_to_put_in_queue(new_path): + queued.add(new_path) + + + for x in queued: + if dry_run: + print('DRY RUN, CANDIDATES FOR QUEUE: ' + x) + else: + place_file_in_queue(x) + + +# %% +if __name__ == "__main__": + parser = argparse.ArgumentParser( + prog="Watch and Fix Permissions And Push to Kafka Queue" + ) + + parser.add_argument("paths", nargs="*", help="Paths to monitor", default=()) + parser.add_argument("--dry-run", action='store_true', help='Dry Run') + args, _ = parser.parse_known_args() + paths = args.paths + dry_run = args.dry_run + + if len(paths) == 0: + paths = default_dir_watch + + add_files_to_queue(paths, dry_run = dry_run) + + diff --git a/vision_v3/00_watch_for_new_videos/test.py b/vision_v3/00_watch_for_new_videos/test.py new file mode 100644 index 0000000..b4b8fbf --- /dev/null +++ b/vision_v3/00_watch_for_new_videos/test.py @@ -0,0 +1,11 @@ +from CommonCode import kwq +publish = kwq.producer.send +topic = kwq.TOPICS.videos_to_score_detection +cons = kwq.create_consumer() + +# %% +cons.subscribe([topic]) + +# %% +if True: + out = cons.poll() diff --git a/vision_v3/00_watch_for_new_videos/watch_and_fix_permissions.py b/vision_v3/00_watch_for_new_videos/watch_and_fix_permissions.py new file mode 100755 index 0000000..f0f52a8 --- /dev/null +++ b/vision_v3/00_watch_for_new_videos/watch_and_fix_permissions.py @@ -0,0 +1,176 @@ +import os +import subprocess as sp +import time +import psutil +import stat +import sys +import logging +from watchdog.observers.polling import PollingObserver +from watchdog.events import FileSystemEventHandler +from threading import Thread +from CommonCode import wq +from CommonCode.util import is_ipython +import queue +from CommonCode.settings import get_logger, LogColorize +import argparse + +import subprocess +from collections import deque + +pfm = LogColorize.watch_and_fix_permissions +if not ('__file__' in vars() or '__file__' in globals()): + __file__ = '/home/thebears/Source/pipelines/00_watch_for_new_videos/watch_and_fix_permissions.py' + +parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) +sys.path.append(parent_dir) + +from common import settings +default_dir_watch = settings.dir_watch + +tasks = queue.Queue() +logger = get_logger(__name__,'/var/log/ml_vision_logs/00_watch_and_fix_permissions', stdout=True, systemd=False, level = logging.INFO) + +logger.info(pfm(f"Starting watch_and_fix_permissions.py")) + +try: + from CommonCode import kwq + publish = kwq.producer.send + topic = kwq.TOPICS.exit_00_videos_to_score_detection +except Exception as e: + logger.error("ERROR WHEN IMPORTING KAFKA:"+ str(e)) + +def decide_to_put_in_queue(file_name): + disqualifiers = list() + disqualifiers.append("_reduced" in file_name) + disqualifiers.append("_trimmed" in file_name) + + disqualified = any(disqualifiers) + + if file_name.endswith(".mp4") and not disqualified: + return True + else: + return False + + + +class logic_execute(Thread): + def __init__(self, event): + logging.debug(f"EVENT: {event}") + Thread.__init__(self) + self.event = event + + def run(self): + tasks.put(self.event) + return + + +class Handler(FileSystemEventHandler): + def on_created(self, event): + try: + self.handle_it(event) + except Exception as e: + logging.error(e) + + def handle_it(self, event): + if event.is_directory: + return + lclass = logic_execute(event) + lclass.start() + + +def observe_path(path): + event_handler = Handler() + observer = PollingObserver() + logger.info(f"Monitoring {path}") + observer.schedule(event_handler, path, recursive=True) + observer.start() + observer.is_alive() + return observer + + + + +def process_file(filef): + logging.debug(f"GOT_FILE: {pfm(filef)}") + + if not decide_to_put_in_queue(filef): + return + + dirname = os.path.dirname(filef) + foldmode = stat.S_IMODE(os.stat(dirname).st_mode) + if oct(foldmode) != "0o777": + os.system(f"sudo chmod 777 {dirname}") + + logging.info(f"SETTING_PERMISSIONS: {dirname}") + + chand = {"root": os.path.dirname(filef), "name": os.path.basename(filef)} + + try: + os.chmod(filef, 0o777) + except Exception as e: + logging.error(e) + + if decide_to_put_in_queue(filef): + logging.info(f"QUEUE_PUT: {pfm(filef)}") + wq.publish(wq.TOPICS.ml_vision_to_score, filef) + try: + future = publish(topic, key=filef, value={"filepath": filef}) + except Exception as e: + logging.error("KAFKA ERROR " + str(e)) + else: + logging.info(f"QUEUE_DO_NOT_PUT: {filef}") + + +def watch_and_decide(paths, queue_max = 5, wait_time = 1): + observers = [observe_path(path) for path in paths] + + all_files = dict() + + + while True: + for _ in range(tasks.qsize()): + x = tasks.get(block=True) + if decide_to_put_in_queue(x.src_path) and x.event_type == 'created': + logging.info(f"MONITORING_SIZE: {x.src_path}") + all_files[x.src_path] = deque(maxlen=queue_max) + + to_iter_over = list(all_files) + for path in to_iter_over: + if not os.path.exists(path): + all_files.pop(path,'') + + if path in all_files: + all_files[path].append(os.path.getsize(path)) + + for cf in list(all_files): + c_list = list(all_files[cf]) + c_set = set(c_list) + if len(c_set) == 1 and len(c_list) == queue_max: + all_files.pop(cf) + process_file(cf) + else: + pass + + time.sleep(wait_time) + + + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + prog="Watch and Fix Permissions And Push to Kafka Queue" + ) + + parser.add_argument("paths", nargs="*", help="Paths to monitor", default=()) + parser.add_argument("-q", default=5, type=int, help="Number of successive file sizes that are same") + parser.add_argument("-t", default=1, type=float, help="Number of seconds to wait between each file size check") + + args, _ = parser.parse_known_args() + paths = args.paths + queue_max = args.q + wait_time = args.t + + if len(paths) == 0: + paths = default_dir_watch + + watch_and_decide(paths = paths, queue_max = queue_max, wait_time = wait_time) diff --git a/vision_v3/00_watch_for_new_videos/watch_and_fix_permissions_old.py b/vision_v3/00_watch_for_new_videos/watch_and_fix_permissions_old.py new file mode 100755 index 0000000..837a81c --- /dev/null +++ b/vision_v3/00_watch_for_new_videos/watch_and_fix_permissions_old.py @@ -0,0 +1,162 @@ +import os +import subprocess as sp +import time +import psutil +import stat +import sys +import logging +from watchdog.observers.polling import PollingObserver +from watchdog.events import FileSystemEventHandler +from threading import Thread +from CommonCode import wq + +from CommonCode.settings import get_logger, LogColorize +import argparse +pfm = LogColorize.watch_and_fix_permissions +if not ('__file__' in vars() or '__file__' in globals()): + __file__ = '/home/thebears/Source/pipelines/00_watch_for_new_videos/watch_and_fix_permissions.py' + +parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) +sys.path.append(parent_dir) + +from common import settings +default_dir_watch = settings.dir_watch +# %% +logger = get_logger(__name__,'/var/log/ml_vision_logs/00_watch_and_fix_permissions', stdout=True, systemd=False, level = logging.INFO) + +logger.info(pfm(f"Starting watch_and_fix_permissions.py")) + +try: + from CommonCode import kwq + publish = kwq.producer.send + topic = kwq.TOPICS.exit_00_videos_to_score_detection +except Exception as e: + logger.error("ERROR WHEN IMPORTING KAFKA:"+ str(e)) + +def decide_to_put_in_queue(file_name): + disqualifiers = list() + disqualifiers.append("_reduced" in file_name) + disqualifiers.append("_trimmed" in file_name) + + disqualified = any(disqualifiers) + + if file_name.endswith(".mp4") and not disqualified: + return True + else: + return False + + + +class logic_execute(Thread): + def __init__(self, event): + logging.debug(f"EVENT: {event}") + Thread.__init__(self) + self.event = event + + def run(self): + event = self.event + if isinstance(event, str): + src_path = event + else: + src_path = event.src_path + + filef = os.path.abspath(os.path.join(os.getcwd(), src_path)) + logging.debug(f"GOT_FILE: {pfm(filef)}") + + broke_out = False + for i in range(120): + files = list() + for proc in psutil.process_iter(): + try: + if "ftp" in proc.name(): + files.append(proc.open_files()) + except Exception: + pass + + f = list() + for x in files: + f.extend(x) + + if any([filef == e.path for e in f]): + pass + else: + if decide_to_put_in_queue(filef): + dirname = os.path.dirname(filef) + foldmode = stat.S_IMODE(os.stat(dirname).st_mode) + if oct(foldmode) != "0o777": + os.system(f"sudo chmod 777 {dirname}") + + logging.info(f"SETTING_PERMISSIONS: {dirname}") + broke_out = True + break + + time.sleep(1) + + if not broke_out: + logging.info(f"TIMED_OUT: {filef}") + + + chand = {"root": os.path.dirname(filef), "name": os.path.basename(filef)} + + try: + os.chmod(filef, 0o777) + except Exception as e: + logging.error(e) + + if decide_to_put_in_queue(filef): + logging.info(f"QUEUE_PUT: {pfm(filef)}") + wq.publish(wq.TOPICS.ml_vision_to_score, filef) + try: + future = publish(topic, key=filef, value={"filepath": filef}) + except Exception as e: + logging.error("KAFKA ERROR " + str(e)) + else: + logging.info(f"QUEUE_DO_NOT_PUT: {filef}") + + +class Handler(FileSystemEventHandler): + def on_created(self, event): + try: + self.handle_it(event) + except Exception as e: + logging.error(e) + + def handle_it(self, event): + if event.is_directory: + return + lclass = logic_execute(event) + lclass.start() + + +def observe_path(path): + event_handler = Handler() + observer = PollingObserver() + logger.info(f"Monitoring {path}") + observer.schedule(event_handler, path, recursive=True) + observer.start() + observer.is_alive() + return observer + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + prog="Watch and Fix Permissions And Push to Kafka Queue" + ) + + parser.add_argument("paths", nargs="*", help="Paths to monitor", default=()) + + args, _ = parser.parse_known_args() + paths = args.paths + if len(paths) == 0: + paths = default_dir_watch + observers = [observe_path(path) for path in paths] + + try: + while True: + time.sleep(1) + except Exception as e: + logging.error(str(e)) + + for observer in observers: + observer.stop() + observer.join()