#!/home/thebears/envs/ml_vision_objdet/bin/python import os import subprocess as sp import time import psutil import stat import sys import logging from watchdog.observers.polling import PollingObserver from watchdog.events import FileSystemEventHandler from threading import Thread from CommonCode import wq from CommonCode import kwq from CommonCode.settings import get_logger, LogColorize import argparse if not ('__file__' in vars() or '__file__' in globals()): __file__ = '/home/thebears/Source/pipelines/00_watch_for_new_videos/watch_and_fix_permissions.py' parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) sys.path.append(parent_dir) logger = get_logger(__name__,'/var/log/ml_vision_logs/00_load_missed', stdout=True, systemd=False, level=logging.INFO) from common import settings #default_dir_watch = settings.dir_watch_missed default_dir_watch = settings.dir_watch publish = kwq.producer.send topic = kwq.TOPICS.exit_00_videos_to_score_detection necessary_extensions = [ '.oclip_embeds.npz', '.json', '.db_has_sl2_embeds_part' ] def decide_to_put_in_queue(file_name): if os.path.getsize(file_name) < 1*1000*1000: return False disqualifiers = list() disqualifiers.append(not os.path.exists(file_name)) disqualifiers.append("_reduced" in file_name) disqualifiers.append("_trimmed" in file_name) # disqualifiers.append('hummingbird' in file_name) disqualified = any(disqualifiers) if file_name.endswith(".mp4") and not disqualified: rt_path, _ = os.path.splitext(file_name) all_exist = all([os.path.exists( rt_path + ext) for ext in necessary_extensions]) return (not all_exist) else: return False parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) sys.path.append(parent_dir) def place_file_in_queue(filef): dirname = os.path.dirname(filef) foldmode = stat.S_IMODE(os.stat(dirname).st_mode) if oct(foldmode) != "0o777": os.system(f"sudo chmod 777 {dirname}") logging.info(f"SETTING_PERMISSIONS: {dirname}") chand = {"root": os.path.dirname(filef), "name": os.path.basename(filef)} try: os.chmod(filef, 0o777) except Exception as e: pass #$ logging.error(e) logging.info(f"QUEUE_PUT: {filef}") publish(topic, key=filef, value={"filepath": filef}) wq.publish(wq.TOPICS.ml_vision_to_score, filef) def add_files_to_queue(paths, dry_run = False): queued = set() for rt in paths: for root, dirs, files in os.walk(rt): for f in files: new_path = os.path.join(root, f) if decide_to_put_in_queue(new_path): queued.add(new_path) for x in queued: if dry_run: print('DRY RUN, CANDIDATES FOR QUEUE: ' + x) else: place_file_in_queue(x) # %% if __name__ == "__main__": parser = argparse.ArgumentParser( prog="Watch and Fix Permissions And Push to Kafka Queue" ) parser.add_argument("paths", nargs="*", help="Paths to monitor", default=()) parser.add_argument("--dry-run", action='store_true', help='Dry Run') args, _ = parser.parse_known_args() paths = args.paths dry_run = args.dry_run if len(paths) == 0: paths = default_dir_watch add_files_to_queue(paths, dry_run = dry_run)