This commit is contained in:
2025-10-17 14:18:41 -04:00
parent 89036ea4c5
commit ec8bc7127f
4 changed files with 460 additions and 0 deletions

View File

@@ -0,0 +1,111 @@
#!/home/thebears/envs/ml_vision_objdet/bin/python
import os
import subprocess as sp
import time
import psutil
import stat
import sys
import logging
from watchdog.observers.polling import PollingObserver
from watchdog.events import FileSystemEventHandler
from threading import Thread
from CommonCode import wq
from CommonCode import kwq
from CommonCode.settings import get_logger, LogColorize
import argparse
if not ('__file__' in vars() or '__file__' in globals()):
__file__ = '/home/thebears/Source/pipelines/00_watch_for_new_videos/watch_and_fix_permissions.py'
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.append(parent_dir)
logger = get_logger(__name__,'/var/log/ml_vision_logs/00_load_missed', stdout=True, systemd=False, level=logging.INFO)
from common import settings
#default_dir_watch = settings.dir_watch_missed
default_dir_watch = settings.dir_watch
publish = kwq.producer.send
topic = kwq.TOPICS.exit_00_videos_to_score_detection
necessary_extensions = [
'.oclip_embeds.npz',
'.json',
'.db_has_sl2_embeds_part' ]
def decide_to_put_in_queue(file_name):
if os.path.getsize(file_name) < 1*1000*1000:
return False
disqualifiers = list()
disqualifiers.append(not os.path.exists(file_name))
disqualifiers.append("_reduced" in file_name)
disqualifiers.append("_trimmed" in file_name)
# disqualifiers.append('hummingbird' in file_name)
disqualified = any(disqualifiers)
if file_name.endswith(".mp4") and not disqualified:
rt_path, _ = os.path.splitext(file_name)
all_exist = all([os.path.exists( rt_path + ext) for ext in necessary_extensions])
return (not all_exist)
else:
return False
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.append(parent_dir)
def place_file_in_queue(filef):
dirname = os.path.dirname(filef)
foldmode = stat.S_IMODE(os.stat(dirname).st_mode)
if oct(foldmode) != "0o777":
os.system(f"sudo chmod 777 {dirname}")
logging.info(f"SETTING_PERMISSIONS: {dirname}")
chand = {"root": os.path.dirname(filef), "name": os.path.basename(filef)}
try:
os.chmod(filef, 0o777)
except Exception as e:
pass
#$ logging.error(e)
logging.info(f"QUEUE_PUT: {filef}")
publish(topic, key=filef, value={"filepath": filef})
wq.publish(wq.TOPICS.ml_vision_to_score, filef)
def add_files_to_queue(paths, dry_run = False):
queued = set()
for rt in paths:
for root, dirs, files in os.walk(rt):
for f in files:
new_path = os.path.join(root, f)
if decide_to_put_in_queue(new_path):
queued.add(new_path)
for x in queued:
if dry_run:
print('DRY RUN, CANDIDATES FOR QUEUE: ' + x)
else:
place_file_in_queue(x)
# %%
if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog="Watch and Fix Permissions And Push to Kafka Queue"
)
parser.add_argument("paths", nargs="*", help="Paths to monitor", default=())
parser.add_argument("--dry-run", action='store_true', help='Dry Run')
args, _ = parser.parse_known_args()
paths = args.paths
dry_run = args.dry_run
if len(paths) == 0:
paths = default_dir_watch
add_files_to_queue(paths, dry_run = dry_run)

View File

@@ -0,0 +1,11 @@
from CommonCode import kwq
publish = kwq.producer.send
topic = kwq.TOPICS.videos_to_score_detection
cons = kwq.create_consumer()
# %%
cons.subscribe([topic])
# %%
if True:
out = cons.poll()

View File

@@ -0,0 +1,176 @@
import os
import subprocess as sp
import time
import psutil
import stat
import sys
import logging
from watchdog.observers.polling import PollingObserver
from watchdog.events import FileSystemEventHandler
from threading import Thread
from CommonCode import wq
from CommonCode.util import is_ipython
import queue
from CommonCode.settings import get_logger, LogColorize
import argparse
import subprocess
from collections import deque
pfm = LogColorize.watch_and_fix_permissions
if not ('__file__' in vars() or '__file__' in globals()):
__file__ = '/home/thebears/Source/pipelines/00_watch_for_new_videos/watch_and_fix_permissions.py'
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.append(parent_dir)
from common import settings
default_dir_watch = settings.dir_watch
tasks = queue.Queue()
logger = get_logger(__name__,'/var/log/ml_vision_logs/00_watch_and_fix_permissions', stdout=True, systemd=False, level = logging.INFO)
logger.info(pfm(f"Starting watch_and_fix_permissions.py"))
try:
from CommonCode import kwq
publish = kwq.producer.send
topic = kwq.TOPICS.exit_00_videos_to_score_detection
except Exception as e:
logger.error("ERROR WHEN IMPORTING KAFKA:"+ str(e))
def decide_to_put_in_queue(file_name):
disqualifiers = list()
disqualifiers.append("_reduced" in file_name)
disqualifiers.append("_trimmed" in file_name)
disqualified = any(disqualifiers)
if file_name.endswith(".mp4") and not disqualified:
return True
else:
return False
class logic_execute(Thread):
def __init__(self, event):
logging.debug(f"EVENT: {event}")
Thread.__init__(self)
self.event = event
def run(self):
tasks.put(self.event)
return
class Handler(FileSystemEventHandler):
def on_created(self, event):
try:
self.handle_it(event)
except Exception as e:
logging.error(e)
def handle_it(self, event):
if event.is_directory:
return
lclass = logic_execute(event)
lclass.start()
def observe_path(path):
event_handler = Handler()
observer = PollingObserver()
logger.info(f"Monitoring {path}")
observer.schedule(event_handler, path, recursive=True)
observer.start()
observer.is_alive()
return observer
def process_file(filef):
logging.debug(f"GOT_FILE: {pfm(filef)}")
if not decide_to_put_in_queue(filef):
return
dirname = os.path.dirname(filef)
foldmode = stat.S_IMODE(os.stat(dirname).st_mode)
if oct(foldmode) != "0o777":
os.system(f"sudo chmod 777 {dirname}")
logging.info(f"SETTING_PERMISSIONS: {dirname}")
chand = {"root": os.path.dirname(filef), "name": os.path.basename(filef)}
try:
os.chmod(filef, 0o777)
except Exception as e:
logging.error(e)
if decide_to_put_in_queue(filef):
logging.info(f"QUEUE_PUT: {pfm(filef)}")
wq.publish(wq.TOPICS.ml_vision_to_score, filef)
try:
future = publish(topic, key=filef, value={"filepath": filef})
except Exception as e:
logging.error("KAFKA ERROR " + str(e))
else:
logging.info(f"QUEUE_DO_NOT_PUT: {filef}")
def watch_and_decide(paths, queue_max = 5, wait_time = 1):
observers = [observe_path(path) for path in paths]
all_files = dict()
while True:
for _ in range(tasks.qsize()):
x = tasks.get(block=True)
if decide_to_put_in_queue(x.src_path) and x.event_type == 'created':
logging.info(f"MONITORING_SIZE: {x.src_path}")
all_files[x.src_path] = deque(maxlen=queue_max)
to_iter_over = list(all_files)
for path in to_iter_over:
if not os.path.exists(path):
all_files.pop(path,'')
if path in all_files:
all_files[path].append(os.path.getsize(path))
for cf in list(all_files):
c_list = list(all_files[cf])
c_set = set(c_list)
if len(c_set) == 1 and len(c_list) == queue_max:
all_files.pop(cf)
process_file(cf)
else:
pass
time.sleep(wait_time)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog="Watch and Fix Permissions And Push to Kafka Queue"
)
parser.add_argument("paths", nargs="*", help="Paths to monitor", default=())
parser.add_argument("-q", default=5, type=int, help="Number of successive file sizes that are same")
parser.add_argument("-t", default=1, type=float, help="Number of seconds to wait between each file size check")
args, _ = parser.parse_known_args()
paths = args.paths
queue_max = args.q
wait_time = args.t
if len(paths) == 0:
paths = default_dir_watch
watch_and_decide(paths = paths, queue_max = queue_max, wait_time = wait_time)

View File

@@ -0,0 +1,162 @@
import os
import subprocess as sp
import time
import psutil
import stat
import sys
import logging
from watchdog.observers.polling import PollingObserver
from watchdog.events import FileSystemEventHandler
from threading import Thread
from CommonCode import wq
from CommonCode.settings import get_logger, LogColorize
import argparse
pfm = LogColorize.watch_and_fix_permissions
if not ('__file__' in vars() or '__file__' in globals()):
__file__ = '/home/thebears/Source/pipelines/00_watch_for_new_videos/watch_and_fix_permissions.py'
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.append(parent_dir)
from common import settings
default_dir_watch = settings.dir_watch
# %%
logger = get_logger(__name__,'/var/log/ml_vision_logs/00_watch_and_fix_permissions', stdout=True, systemd=False, level = logging.INFO)
logger.info(pfm(f"Starting watch_and_fix_permissions.py"))
try:
from CommonCode import kwq
publish = kwq.producer.send
topic = kwq.TOPICS.exit_00_videos_to_score_detection
except Exception as e:
logger.error("ERROR WHEN IMPORTING KAFKA:"+ str(e))
def decide_to_put_in_queue(file_name):
disqualifiers = list()
disqualifiers.append("_reduced" in file_name)
disqualifiers.append("_trimmed" in file_name)
disqualified = any(disqualifiers)
if file_name.endswith(".mp4") and not disqualified:
return True
else:
return False
class logic_execute(Thread):
def __init__(self, event):
logging.debug(f"EVENT: {event}")
Thread.__init__(self)
self.event = event
def run(self):
event = self.event
if isinstance(event, str):
src_path = event
else:
src_path = event.src_path
filef = os.path.abspath(os.path.join(os.getcwd(), src_path))
logging.debug(f"GOT_FILE: {pfm(filef)}")
broke_out = False
for i in range(120):
files = list()
for proc in psutil.process_iter():
try:
if "ftp" in proc.name():
files.append(proc.open_files())
except Exception:
pass
f = list()
for x in files:
f.extend(x)
if any([filef == e.path for e in f]):
pass
else:
if decide_to_put_in_queue(filef):
dirname = os.path.dirname(filef)
foldmode = stat.S_IMODE(os.stat(dirname).st_mode)
if oct(foldmode) != "0o777":
os.system(f"sudo chmod 777 {dirname}")
logging.info(f"SETTING_PERMISSIONS: {dirname}")
broke_out = True
break
time.sleep(1)
if not broke_out:
logging.info(f"TIMED_OUT: {filef}")
chand = {"root": os.path.dirname(filef), "name": os.path.basename(filef)}
try:
os.chmod(filef, 0o777)
except Exception as e:
logging.error(e)
if decide_to_put_in_queue(filef):
logging.info(f"QUEUE_PUT: {pfm(filef)}")
wq.publish(wq.TOPICS.ml_vision_to_score, filef)
try:
future = publish(topic, key=filef, value={"filepath": filef})
except Exception as e:
logging.error("KAFKA ERROR " + str(e))
else:
logging.info(f"QUEUE_DO_NOT_PUT: {filef}")
class Handler(FileSystemEventHandler):
def on_created(self, event):
try:
self.handle_it(event)
except Exception as e:
logging.error(e)
def handle_it(self, event):
if event.is_directory:
return
lclass = logic_execute(event)
lclass.start()
def observe_path(path):
event_handler = Handler()
observer = PollingObserver()
logger.info(f"Monitoring {path}")
observer.schedule(event_handler, path, recursive=True)
observer.start()
observer.is_alive()
return observer
if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog="Watch and Fix Permissions And Push to Kafka Queue"
)
parser.add_argument("paths", nargs="*", help="Paths to monitor", default=())
args, _ = parser.parse_known_args()
paths = args.paths
if len(paths) == 0:
paths = default_dir_watch
observers = [observe_path(path) for path in paths]
try:
while True:
time.sleep(1)
except Exception as e:
logging.error(str(e))
for observer in observers:
observer.stop()
observer.join()