Files
pipelines/00_watch_for_new_videos/load_missed.py
2025-10-17 14:16:08 -04:00

112 lines
3.3 KiB
Python
Executable File

#!/home/thebears/envs/ml_vision_objdet/bin/python
import os
import subprocess as sp
import time
import psutil
import stat
import sys
import logging
from watchdog.observers.polling import PollingObserver
from watchdog.events import FileSystemEventHandler
from threading import Thread
from CommonCode import wq
from CommonCode import kwq
from CommonCode.settings import get_logger, LogColorize
import argparse
if not ('__file__' in vars() or '__file__' in globals()):
__file__ = '/home/thebears/Source/pipelines/00_watch_for_new_videos/watch_and_fix_permissions.py'
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.append(parent_dir)
logger = get_logger(__name__,'/var/log/ml_vision_logs/00_load_missed', stdout=True, systemd=False, level=logging.INFO)
from common import settings
#default_dir_watch = settings.dir_watch_missed
default_dir_watch = settings.dir_watch
publish = kwq.producer.send
topic = kwq.TOPICS.exit_00_videos_to_score_detection
necessary_extensions = [
'.oclip_embeds.npz',
'.json',
'.db_has_sl2_embeds_part' ]
def decide_to_put_in_queue(file_name):
if os.path.getsize(file_name) < 1*1000*1000:
return False
disqualifiers = list()
disqualifiers.append(not os.path.exists(file_name))
disqualifiers.append("_reduced" in file_name)
disqualifiers.append("_trimmed" in file_name)
# disqualifiers.append('hummingbird' in file_name)
disqualified = any(disqualifiers)
if file_name.endswith(".mp4") and not disqualified:
rt_path, _ = os.path.splitext(file_name)
all_exist = all([os.path.exists( rt_path + ext) for ext in necessary_extensions])
return (not all_exist)
else:
return False
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.append(parent_dir)
def place_file_in_queue(filef):
dirname = os.path.dirname(filef)
foldmode = stat.S_IMODE(os.stat(dirname).st_mode)
if oct(foldmode) != "0o777":
os.system(f"sudo chmod 777 {dirname}")
logging.info(f"SETTING_PERMISSIONS: {dirname}")
chand = {"root": os.path.dirname(filef), "name": os.path.basename(filef)}
try:
os.chmod(filef, 0o777)
except Exception as e:
pass
#$ logging.error(e)
logging.info(f"QUEUE_PUT: {filef}")
publish(topic, key=filef, value={"filepath": filef})
wq.publish(wq.TOPICS.ml_vision_to_score, filef)
def add_files_to_queue(paths, dry_run = False):
queued = set()
for rt in paths:
for root, dirs, files in os.walk(rt):
for f in files:
new_path = os.path.join(root, f)
if decide_to_put_in_queue(new_path):
queued.add(new_path)
for x in queued:
if dry_run:
print('DRY RUN, CANDIDATES FOR QUEUE: ' + x)
else:
place_file_in_queue(x)
# %%
if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog="Watch and Fix Permissions And Push to Kafka Queue"
)
parser.add_argument("paths", nargs="*", help="Paths to monitor", default=())
parser.add_argument("--dry-run", action='store_true', help='Dry Run')
args, _ = parser.parse_known_args()
paths = args.paths
dry_run = args.dry_run
if len(paths) == 0:
paths = default_dir_watch
add_files_to_queue(paths, dry_run = dry_run)