import os import subprocess as sp import time import psutil import stat import sys import logging from watchdog.observers.polling import PollingObserver from watchdog.events import FileSystemEventHandler from threading import Thread from CommonCode import wq from CommonCode import kwq from CommonCode.settings import get_logger, LogColorize import argparse if not ('__file__' in vars() or '__file__' in globals()): __file__ = '/home/thebears/Source/pipelines/00_watch_for_new_videos/watch_and_fix_permissions.py' parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) sys.path.append(parent_dir) logger = get_logger(__name__,'/var/log/ml_vision_logs/00_watch_and_fix_permissions', stdout=True, systemd=False) from common import settings default_dir_watch = settings.dir_watch publish = kwq.producer.send topic = kwq.TOPICS.videos_to_score_detection def decide_to_put_in_queue(file_name): disqualifiers = list() disqualifiers.append("_reduced" in file_name) disqualifiers.append("_trimmed" in file_name) disqualified = any(disqualifiers) if file_name.endswith(".mp4") and not disqualified: return True else: return False parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) sys.path.append(parent_dir) def place_file_in_queue(filef): dirname = os.path.dirname(filef) foldmode = stat.S_IMODE(os.stat(dirname).st_mode) if oct(foldmode) != "0o777": os.system(f"sudo chmod 777 {dirname}") logging.info(f"SETTING_PERMISSIONS: {dirname}") chand = {"root": os.path.dirname(filef), "name": os.path.basename(filef)} try: os.chmod(filef, 0o777) except Exception as e: pass #$ logging.error(e) logging.info(f"QUEUE_PUT: {filef}") publish(topic, key=filef, value={"filepath": filef}) wq.publish(wq.TOPICS.ml_vision_to_score, filef) def add_files_to_queue(paths): queued = set() for rt in paths: for root, dirs, files in os.walk(rt): for f in files: new_path = os.path.join(root, f) if decide_to_put_in_queue(new_path): queued.add(new_path) for x in queued: place_file_in_queue(x) # %% if __name__ == "__main__": parser = argparse.ArgumentParser( prog="Watch and Fix Permissions And Push to Kafka Queue" ) parser.add_argument("paths", nargs="*", help="Paths to monitor", default=()) args, _ = parser.parse_known_args() paths = args.paths if len(paths) == 0: paths = default_dir_watch add_files_to_queue(paths)