This commit is contained in:
2025-10-04 11:45:45 -04:00
parent 028149b786
commit 36b7784b4e
11 changed files with 281 additions and 73 deletions

View File

@@ -1,3 +1,4 @@
#!/home/thebears/envs/ml_vision_objdet/bin/python
import os
import subprocess as sp
@@ -18,13 +19,13 @@ if not ('__file__' in vars() or '__file__' in globals()):
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.append(parent_dir)
logger = get_logger(__name__,'/var/log/ml_vision_logs/00_watch_and_fix_permissions', stdout=True, systemd=False)
logger = get_logger(__name__,'/var/log/ml_vision_logs/00_load_missed', stdout=True, systemd=False, level=logging.INFO)
from common import settings
default_dir_watch = settings.dir_watch
default_dir_watch = settings.dir_watch_missed
publish = kwq.producer.send
topic = kwq.TOPICS.videos_to_score_detection
topic = kwq.TOPICS.exit_00_videos_to_score_detection
necessary_extensions = [
'.oclip_embeds.npz',
@@ -32,10 +33,13 @@ necessary_extensions = [
'.db_has_sl2_embeds_part' ]
def decide_to_put_in_queue(file_name):
if os.path.getsize(file_name) < 1*1000*1000:
return False
disqualifiers = list()
disqualifiers.append(not os.path.exists(file_name))
disqualifiers.append("_reduced" in file_name)
disqualifiers.append("_trimmed" in file_name)
# disqualifiers.append('hummingbird' in file_name)
disqualified = any(disqualifiers)
if file_name.endswith(".mp4") and not disqualified:

View File

@@ -0,0 +1,11 @@
from CommonCode import kwq
publish = kwq.producer.send
topic = kwq.TOPICS.videos_to_score_detection
cons = kwq.create_consumer()
# %%
cons.subscribe([topic])
# %%
if True:
out = cons.poll()

View File

@@ -9,9 +9,14 @@ from watchdog.observers.polling import PollingObserver
from watchdog.events import FileSystemEventHandler
from threading import Thread
from CommonCode import wq
from CommonCode import kwq
from CommonCode.util import is_ipython
import queue
from CommonCode.settings import get_logger, LogColorize
import argparse
import subprocess
from collections import deque
pfm = LogColorize.watch_and_fix_permissions
if not ('__file__' in vars() or '__file__' in globals()):
__file__ = '/home/thebears/Source/pipelines/00_watch_for_new_videos/watch_and_fix_permissions.py'
@@ -21,13 +26,19 @@ sys.path.append(parent_dir)
from common import settings
default_dir_watch = settings.dir_watch
# %%
logger = get_logger(__name__,'/var/log/ml_vision_logs/00_watch_and_fix_permissions', stdout=True, systemd=False)
tasks = queue.Queue()
logger = get_logger(__name__,'/var/log/ml_vision_logs/00_watch_and_fix_permissions', stdout=True, systemd=False, level = logging.INFO)
logger.info(pfm(f"Starting watch_and_fix_permissions.py"))
publish = kwq.producer.send
topic = kwq.TOPICS.videos_to_score_detection
try:
from CommonCode import kwq
publish = kwq.producer.send
topic = kwq.TOPICS.exit_00_videos_to_score_detection
except Exception as e:
logger.error("ERROR WHEN IMPORTING KAFKA:"+ str(e))
def decide_to_put_in_queue(file_name):
disqualifiers = list()
disqualifiers.append("_reduced" in file_name)
@@ -49,61 +60,8 @@ class logic_execute(Thread):
self.event = event
def run(self):
event = self.event
if isinstance(event, str):
src_path = event
else:
src_path = event.src_path
filef = os.path.abspath(os.path.join(os.getcwd(), src_path))
logging.debug(f"GOT_FILE: {filef}")
while True:
files = list()
for proc in psutil.process_iter():
try:
if "ftp" in proc.name():
files.append(proc.open_files())
except Exception:
pass
f = list()
for x in files:
f.extend(x)
if any([filef == e.path for e in f]):
pass
else:
if decide_to_put_in_queue(filef):
dirname = os.path.dirname(filef)
foldmode = stat.S_IMODE(os.stat(dirname).st_mode)
logging.info(foldmode)
if oct(foldmode) != "0o777":
os.system(f"sudo chmod 777 {dirname}")
logging.info(f"SETTING_PERMISSIONS: {dirname}")
break
time.sleep(1)
chand = {"root": os.path.dirname(filef), "name": os.path.basename(filef)}
try:
os.chmod(filef, 0o777)
except Exception as e:
logging.error(e)
if decide_to_put_in_queue(filef):
logging.info(f"QUEUE_PUT: {pfm(filef)}")
publish(topic, key=filef, value={"filepath": filef})
wq.publish(wq.TOPICS.ml_vision_to_score, filef)
else:
logging.info(f"QUEUE_DO_NOT_PUT: {filef}")
tasks.put(self.event)
return
class Handler(FileSystemEventHandler):
@@ -130,25 +88,89 @@ def observe_path(path):
return observer
def process_file(filef):
logging.debug(f"GOT_FILE: {pfm(filef)}")
if not decide_to_put_in_queue(filef):
return
dirname = os.path.dirname(filef)
foldmode = stat.S_IMODE(os.stat(dirname).st_mode)
if oct(foldmode) != "0o777":
os.system(f"sudo chmod 777 {dirname}")
logging.info(f"SETTING_PERMISSIONS: {dirname}")
chand = {"root": os.path.dirname(filef), "name": os.path.basename(filef)}
try:
os.chmod(filef, 0o777)
except Exception as e:
logging.error(e)
if decide_to_put_in_queue(filef):
logging.info(f"QUEUE_PUT: {pfm(filef)}")
wq.publish(wq.TOPICS.ml_vision_to_score, filef)
try:
future = publish(topic, key=filef, value={"filepath": filef})
except Exception as e:
logging.error("KAFKA ERROR " + str(e))
else:
logging.info(f"QUEUE_DO_NOT_PUT: {filef}")
def watch_and_decide(paths, queue_max = 5, wait_time = 1):
observers = [observe_path(path) for path in paths]
all_files = dict()
while True:
for _ in range(tasks.qsize()):
x = tasks.get(block=True)
if decide_to_put_in_queue(x.src_path) and x.event_type == 'created':
logging.info(f"MONITORING_SIZE: {x.src_path}")
all_files[x.src_path] = deque(maxlen=queue_max)
to_iter_over = list(all_files)
for path in to_iter_over:
if not os.path.exists(path):
all_files.pop(path,'')
if path in all_files:
all_files[path].append(os.path.getsize(path))
for cf in list(all_files):
c_list = list(all_files[cf])
c_set = set(c_list)
if len(c_set) == 1 and len(c_list) == queue_max:
all_files.pop(cf)
process_file(cf)
else:
pass
time.sleep(wait_time)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog="Watch and Fix Permissions And Push to Kafka Queue"
)
parser.add_argument("paths", nargs="*", help="Paths to monitor", default=())
parser.add_argument("-q", default=5, type=int, help="Number of successive file sizes that are same")
parser.add_argument("-t", default=1, type=float, help="Number of seconds to wait between each file size check")
args, _ = parser.parse_known_args()
paths = args.paths
queue_max = args.q
wait_time = args.t
if len(paths) == 0:
paths = default_dir_watch
observers = [observe_path(path) for path in paths]
try:
while True:
time.sleep(1)
except Exception as e:
logging.error(str(e))
for observer in observers:
observer.stop()
observer.join()
watch_and_decide(paths = paths, queue_max = queue_max, wait_time = wait_time)

View File

@@ -0,0 +1,162 @@
import os
import subprocess as sp
import time
import psutil
import stat
import sys
import logging
from watchdog.observers.polling import PollingObserver
from watchdog.events import FileSystemEventHandler
from threading import Thread
from CommonCode import wq
from CommonCode.settings import get_logger, LogColorize
import argparse
pfm = LogColorize.watch_and_fix_permissions
if not ('__file__' in vars() or '__file__' in globals()):
__file__ = '/home/thebears/Source/pipelines/00_watch_for_new_videos/watch_and_fix_permissions.py'
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.append(parent_dir)
from common import settings
default_dir_watch = settings.dir_watch
# %%
logger = get_logger(__name__,'/var/log/ml_vision_logs/00_watch_and_fix_permissions', stdout=True, systemd=False, level = logging.INFO)
logger.info(pfm(f"Starting watch_and_fix_permissions.py"))
try:
from CommonCode import kwq
publish = kwq.producer.send
topic = kwq.TOPICS.exit_00_videos_to_score_detection
except Exception as e:
logger.error("ERROR WHEN IMPORTING KAFKA:"+ str(e))
def decide_to_put_in_queue(file_name):
disqualifiers = list()
disqualifiers.append("_reduced" in file_name)
disqualifiers.append("_trimmed" in file_name)
disqualified = any(disqualifiers)
if file_name.endswith(".mp4") and not disqualified:
return True
else:
return False
class logic_execute(Thread):
def __init__(self, event):
logging.debug(f"EVENT: {event}")
Thread.__init__(self)
self.event = event
def run(self):
event = self.event
if isinstance(event, str):
src_path = event
else:
src_path = event.src_path
filef = os.path.abspath(os.path.join(os.getcwd(), src_path))
logging.debug(f"GOT_FILE: {pfm(filef)}")
broke_out = False
for i in range(120):
files = list()
for proc in psutil.process_iter():
try:
if "ftp" in proc.name():
files.append(proc.open_files())
except Exception:
pass
f = list()
for x in files:
f.extend(x)
if any([filef == e.path for e in f]):
pass
else:
if decide_to_put_in_queue(filef):
dirname = os.path.dirname(filef)
foldmode = stat.S_IMODE(os.stat(dirname).st_mode)
if oct(foldmode) != "0o777":
os.system(f"sudo chmod 777 {dirname}")
logging.info(f"SETTING_PERMISSIONS: {dirname}")
broke_out = True
break
time.sleep(1)
if not broke_out:
logging.info(f"TIMED_OUT: {filef}")
chand = {"root": os.path.dirname(filef), "name": os.path.basename(filef)}
try:
os.chmod(filef, 0o777)
except Exception as e:
logging.error(e)
if decide_to_put_in_queue(filef):
logging.info(f"QUEUE_PUT: {pfm(filef)}")
wq.publish(wq.TOPICS.ml_vision_to_score, filef)
try:
future = publish(topic, key=filef, value={"filepath": filef})
except Exception as e:
logging.error("KAFKA ERROR " + str(e))
else:
logging.info(f"QUEUE_DO_NOT_PUT: {filef}")
class Handler(FileSystemEventHandler):
def on_created(self, event):
try:
self.handle_it(event)
except Exception as e:
logging.error(e)
def handle_it(self, event):
if event.is_directory:
return
lclass = logic_execute(event)
lclass.start()
def observe_path(path):
event_handler = Handler()
observer = PollingObserver()
logger.info(f"Monitoring {path}")
observer.schedule(event_handler, path, recursive=True)
observer.start()
observer.is_alive()
return observer
if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog="Watch and Fix Permissions And Push to Kafka Queue"
)
parser.add_argument("paths", nargs="*", help="Paths to monitor", default=())
args, _ = parser.parse_known_args()
paths = args.paths
if len(paths) == 0:
paths = default_dir_watch
observers = [observe_path(path) for path in paths]
try:
while True:
time.sleep(1)
except Exception as e:
logging.error(str(e))
for observer in observers:
observer.stop()
observer.join()

View File

@@ -5,5 +5,14 @@ dir_watch = [
"/srv/ftp/sidefeeder",
"/srv/ftp_tcc/leopards1",
"/srv/ftp_tcc/leopards2",
"/srv/ftp/wrenwatch",
"/srv/ftp/backyard",
"/srv/ftp/driveway"
]
dir_watch_missed = [
"/srv/ftp_tcc/leopards1",
"/srv/ftp_tcc/leopards2",
]