diff --git a/__pycache__/kwq.cpython-312.pyc b/__pycache__/kwq.cpython-312.pyc index 9cb26c5..af85804 100644 Binary files a/__pycache__/kwq.cpython-312.pyc and b/__pycache__/kwq.cpython-312.pyc differ diff --git a/__pycache__/kwq.cpython-313.pyc b/__pycache__/kwq.cpython-313.pyc index 9f2d65f..13536e1 100644 Binary files a/__pycache__/kwq.cpython-313.pyc and b/__pycache__/kwq.cpython-313.pyc differ diff --git a/__pycache__/settings.cpython-312.pyc b/__pycache__/settings.cpython-312.pyc index 8a68dc1..6992ba8 100644 Binary files a/__pycache__/settings.cpython-312.pyc and b/__pycache__/settings.cpython-312.pyc differ diff --git a/__pycache__/settings.cpython-313.pyc b/__pycache__/settings.cpython-313.pyc index 1be1650..4d7fe5c 100644 Binary files a/__pycache__/settings.cpython-313.pyc and b/__pycache__/settings.cpython-313.pyc differ diff --git a/__pycache__/util.cpython-312.pyc b/__pycache__/util.cpython-312.pyc index 349b8d7..20cfc69 100644 Binary files a/__pycache__/util.cpython-312.pyc and b/__pycache__/util.cpython-312.pyc differ diff --git a/__pycache__/video_meta.cpython-312.pyc b/__pycache__/video_meta.cpython-312.pyc index 2faf677..36a7af6 100644 Binary files a/__pycache__/video_meta.cpython-312.pyc and b/__pycache__/video_meta.cpython-312.pyc differ diff --git a/__pycache__/video_meta.cpython-313.pyc b/__pycache__/video_meta.cpython-313.pyc index c128990..5c1957e 100644 Binary files a/__pycache__/video_meta.cpython-313.pyc and b/__pycache__/video_meta.cpython-313.pyc differ diff --git a/__pycache__/wq.cpython-312.pyc b/__pycache__/wq.cpython-312.pyc index cb909be..907ac4f 100644 Binary files a/__pycache__/wq.cpython-312.pyc and b/__pycache__/wq.cpython-312.pyc differ diff --git a/cache_checker.py b/cache_checker.py new file mode 100644 index 0000000..0db1761 --- /dev/null +++ b/cache_checker.py @@ -0,0 +1,11 @@ +import ring + +@ring.lru() +def hello(a,b,c): + print(a,b,c) + return str(a)+str(b)+str(c) + +# %% + +hello(1,2,3) + diff --git a/kwq.py b/kwq.py index b748a2a..e6388b7 100644 --- a/kwq.py +++ b/kwq.py @@ -2,53 +2,81 @@ from enum import Enum import json import sys -from kafka import KafkaProducer, KafkaConsumer +from confluent_kafka import Producer, Consumer -bootstrap_server = ["192.168.1.242:19092"] +bootstrap_server = "192.168.1.242:19092" class TOPICS(): - videos_to_score_detection = "videos_to_score_detection" - videos_scored_detection="videos_scored_detection" - videos_with_nuggets="videos_with_nuggets" + exit_00_videos_to_score_detection = "00_videos_to_score_detection" + + exit_10_videos_scored_detection="10_videos_scored_detection" + + exit_40_videos_with_nuggets="40_videos_with_nuggets" + exit_40_videos_without_nuggets="40_videos_without_nuggets" + + exit_50_videos_modified="50_videos_modified" + + + enter_60_videos_embed_priority="60_videos_embed_priority" # Higher prioriy queue + exit_60_videos_invalid="60_videos_invalid_file" + exit_60_embedding_failed="exit_60_embedding_failed" + exit_60_videos_embedded="60_videos_scored_embedding" + videos_no_json="videos_no_json" - videos_without_nuggets="videos_without_nuggets" videos_embedding_in_db="videos_embed_in_db" videos_embedding_in_db_fail = "videos_embed_in_db_fail" -serializer = lambda v: json.dumps(v).encode("utf-8") -deserializer = json.loads +class KafkaClient: + def __init__(self): + self.producer = Producer({ + 'bootstrap.servers': bootstrap_server, + 'security.protocol': 'SASL_PLAINTEXT', # change to SASL_SSL if TLS is enabled + 'sasl.mechanism': 'SCRAM-SHA-256', + 'sasl.username': 'superuser', + 'sasl.password': 'marybear', + 'request.timeout.ms': 15000, + 'retries': 3, + 'retry.backoff.ms': 100, + 'linger.ms': 5, + 'metadata.max.age.ms': 300000, + }) -producer = KafkaProducer( - bootstrap_servers=bootstrap_server, - key_serializer=serializer, - value_serializer=serializer, - request_timeout_ms=15000, # 15s (keep small) - max_block_ms=10000, # 10s max blocking - metadata_max_age_ms=300000, - retry_backoff_ms=100, - linger_ms=5, - retries=3, - security_protocol='SASL_PLAINTEXT', # change to SASL_SSL if TLS is enabled - sasl_mechanism='SCRAM-SHA-256', - sasl_plain_username='superuser', - sasl_plain_password='marybear' -) + def send(self, topic, key=None, value=None): + def delivery_report(err, msg): + if err is not None: + print(f'Kafka delivery failed: {err}') + else: + print(f'Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}') + + key_bytes = json.dumps(key).encode("utf-8") if key is not None else None + value_bytes = json.dumps(value).encode("utf-8") if value is not None else None + self.producer.produce(topic, key=key_bytes, value=value_bytes, callback=delivery_report) + self.producer.flush() + + def create_consumer(self, group_id=None, client_id=None, auto_offset_reset = 'latest'): + conf = { + 'bootstrap.servers': bootstrap_server, + 'group.id': group_id if group_id else 'default_group', + 'auto.offset.reset': auto_offset_reset, + 'enable.auto.commit': False, + 'security.protocol': 'SASL_PLAINTEXT', + 'sasl.mechanism': 'SCRAM-SHA-256', + 'sasl.username': 'superuser', + 'sasl.password': 'marybear', + 'client.id': client_id if client_id else 'default_client', + } + consumer = Consumer(conf) + return consumer -def create_consumer(group_id = None, client_id = None): - return KafkaConsumer( - bootstrap_servers=bootstrap_server, - key_deserializer=deserializer, - value_deserializer=deserializer, - enable_auto_commit = False, - group_id=group_id, - client_id = client_id, - auto_offset_reset = 'earliest', - security_protocol='SASL_PLAINTEXT', # change to SASL_SSL if TLS is enabled - sasl_mechanism='SCRAM-SHA-256', - sasl_plain_username='superuser', - sasl_plain_password='marybear' - ) +# Instantiate a global client for compatibility +kafka_client = KafkaClient() +producer = kafka_client +publish = kafka_client.send + + +def create_consumer(group_id=None, client_id=None, auto_offset_reset = 'latest'): + return kafka_client.create_consumer(group_id, client_id, auto_offset_reset) diff --git a/util.py b/util.py index 6f6f364..b403f7d 100644 --- a/util.py +++ b/util.py @@ -44,6 +44,7 @@ def append_json_no_file_read(obj_append, jpath = '/home/thebears/Videos/Winslow/ def get_cset_match(input_in): + if not os.path.exists(input_in): return {} @@ -90,6 +91,7 @@ def get_cset_match_dir(rtdir): except: pass + mp4_tstamps = sorted(summary) other_tstamps = sorted(other_files) idx_other = 0 diff --git a/video_meta.py b/video_meta.py index 24a0dbb..9abba6d 100644 --- a/video_meta.py +++ b/video_meta.py @@ -6,6 +6,7 @@ import pickle from CommonCode.settings import get_logger, LogColorize import logging import numpy +import time pfm = LogColorize.video_meta if not ('__file__' in vars() or '__file__' in globals()): @@ -37,9 +38,14 @@ def get_cache_loc(cpath): class FTPVideo: - def __init__(self, cpath): + def __init__(self, cpath, ignore_filename = False): self.cpath = cpath - self.file_info = get_info_from_ftp_filename(cpath) + self.ignore_filename = ignore_filename + if self.ignore_filename: + self.file_info = {'name':'None','index':0, 'timestamp': dt.datetime.now(), 'path': os.path.abspath(cpath)} + else: + self.file_info = get_info_from_ftp_filename(cpath) + self._real_path = None self._frame_info = None self._embeds = None @@ -75,11 +81,18 @@ class FTPVideo: self._embeds = npz_contents npz_contents = self._embeds + + ret_dict = {}; ret_dict['embeds'] = npz_contents['embeds'] ret_dict['frame_numbers'] = [int(x) for x in npz_contents['frame_numbers']] + + if max(ret_dict['frame_numbers']) >= len(self.frames_info): + self.invalidate_timestamp_cache() + self.frames_info_rewrite() + ret_dict['frame_offsets'] = [self.frames_info[x]['offset'] for x in ret_dict['frame_numbers']] ret_dict['frame_time'] = [self.frames_info[x]['time'] for x in ret_dict['frame_numbers']] e_scores = self.embed_scores @@ -114,6 +127,16 @@ class FTPVideo: except Exception as e: logger.warn(pfm(f'WRITE TO CACHE FAILED: {e} while writing {cache_loc}')) + def invalidate_timestamp_cache(): + cache_loc = get_cache_loc(self.real_path) + os.remove(cache_loc) + + def frames_info_rewrite(self): + self._frame_info = None + + new_info = self.get_frames_info() + self.try_cache_write(self._frame_info) + self._frame_info = self.try_cache_read() @property def frames_info(self): @@ -142,7 +165,7 @@ class FTPVideo: logger.warn(pfm(f'RUNNING FFPROBE FAILED FOR {e} on {fpath}')) logger.info(pfm(f'RAN FFPROBE SUCCESSFULLY FOR {fpath}')) - + timestamps = list() for line in out.decode("UTF-8").split("\n"): if "," not in line: diff --git a/wq.py b/wq.py index 56a9d24..136f960 100644 --- a/wq.py +++ b/wq.py @@ -4,23 +4,23 @@ from enum import Enum import json class TOPICS(Enum): - ml_vision_to_score = 'ml_vision_to_score' - ml_vision_to_score_ext = 'ml_vision_to_score_ext' - ml_vision_objdet_failed = 'ml_vision_objdet_failed' - ml_vision_objdet_success = 'ml_vision_objdet_success' - ml_vision_objdet_skipped = 'ml_vision_objdet_skipped' - ml_vision_objdet_results_db_success = 'ml_vision_objdet_db_upload_success' - ml_vision_objdet_results_db_failed = 'ml_vision_objdet_db_upload_failed' - ml_vision_objdet_results_pg_success = 'ml_vision_objdet_pg_upload_success' - ml_vision_objdet_results_pg_failed = 'ml_vision_objdet_pg_upload_failed' - ml_vision_objdet_results_purge_success = 'ml_vision_objdet_purge_success' - ml_vision_objdet_results_purge_failed = 'ml_vision_objdet_purge_failed' - ml_vision_objdet_results_purge_skipped = 'ml_vision_objdet_purge_skipped' - ml_vision_videos_modify_success = 'ml_vision_videos_modify_success' - ml_vision_videos_modify_failed = 'ml_vision_videos_modify_failed' - ml_vision_embedding_success = 'ml_vision_embedding_success' - ml_vision_embedding_fail = 'ml_vision_embedding_fail' - ml_vision_embedding_skipped = 'ml_vision_embedding_skipped' + ml_vision_to_score = 'vision:ml_vision_to_score' + ml_vision_to_score_ext = 'vision:ml_vision_to_score_ext' + ml_vision_objdet_failed = 'vision:ml_vision_objdet_failed' + ml_vision_objdet_success = 'vision:ml_vision_objdet_success' + ml_vision_objdet_skipped = 'vision:ml_vision_objdet_skipped' + ml_vision_objdet_results_db_success = 'vision:ml_vision_objdet_db_upload_success' + ml_vision_objdet_results_db_failed = 'vision:ml_vision_objdet_db_upload_failed' + ml_vision_objdet_results_pg_success = 'vision:ml_vision_objdet_pg_upload_success' + ml_vision_objdet_results_pg_failed = 'vision:ml_vision_objdet_pg_upload_failed' + ml_vision_objdet_results_purge_success = 'vision:ml_vision_objdet_purge_success' + ml_vision_objdet_results_purge_failed = 'vision:ml_vision_objdet_purge_failed' + ml_vision_objdet_results_purge_skipped = 'vision:ml_vision_objdet_purge_skipped' + ml_vision_videos_modify_success = 'vision:ml_vision_videos_modify_success' + ml_vision_videos_modify_failed = 'vision:m.l_vision_videos_modify_failed' + ml_vision_embedding_success = 'vision:ml_vision_embedding_success' + ml_vision_embedding_fail = 'vision:ml_vision_embedding_fail' + ml_vision_embedding_skipped = 'vision:ml_vision_embedding_skipped' r = redis.StrictRedis()