import glob from pymilvus import MilvusClient from pymilvus.client.types import LoadState import argparse import os import json from CommonCode.util import exit_if_not_ipython, get_cset_for_file_matching, is_ipython from CommonCode import kwq from CommonCode.settings import get_logger, LogColorize from kafka import TopicPartition from kafka.structs import OffsetAndMetadata from pprint import pprint import numpy as np pfm = LogColorize.embeds_in_db logger = get_logger(__name__,'/var/log/ml_vision_logs/03_put_into_vectordb', stdout=True, systemd=False) input_topic = kwq.TOPICS.videos_with_nuggets client_id='embedding_place_in_db_1' group_id = client_id consumer = kwq.create_consumer(input_topic, group_id = group_id, client_id = client_id) c_part = TopicPartition(input_topic, 0) consumer.assign([c_part]) producer = kwq.producer model_type = 'ViT-L-16-SigLIP2-512' def get_db_embed_done_path(vpath): return os.path.splitext(vpath)[0]+'.oclip.orin.indb' #video_file_to_upload='/srv/ftp/ptz/2025/04/14/PTZBackRight_00_20250414063817.mp4' def get_vec_path(vpath): return os.path.splitext(vpath)[0]+'.oclip.orin' def get_date(vpath): split_entries = os.path.splitext(vpath)[0].split('/') return ''.join(split_entries[-4:-1]) def get_camera_name(vpath): split_entries = os.path.splitext(vpath)[0].split('/') return split_entries[split_entries.index('ftp')+1] def upload_vector_file(video_file_to_upload): client = MilvusClient( uri="http://localhost:19530" ) db_done_path = get_db_embed_done_path(video_file_to_upload) if os.path.exists(db_done_path): print('Already exists in DB, skipping upload') # return video_file_to_upload = get_vec_path(video_file_to_upload) with open(video_file_to_upload,'r') as jj: vecs = json.load(jj) embeds = [x['score'] for x in vecs['scores']] fr_nums = [x['frame'] for x in vecs['scores']] fname_root = video_file_to_upload.rsplit('/',1)[-1].split('.')[0] fc = fname_root.split('_')[-1] # data = list() filepath = video_file_to_upload.replace('/srv/ftp/','').replace('/mergedfs/ftp','').split('.')[-0] data_v2 = list() date = get_date(filepath) for embed, frame_num in zip(embeds, fr_nums): fg = '{0:05g}'.format(frame_num) id_num = int(fc+fg) embeds_as_np = np.asarray(embed, dtype=np.float16) to_put_2 = dict(primary_id= id_num, filepath=filepath, frame_number = int(frame_num), so400m=embeds_as_np, date=str(date)) data_v2.append(to_put_2) cam_name = get_camera_name(video_file_to_upload) client.insert(collection_name = f'nuggets_{cam_name}_so400m_siglip2', data=data_v2) client.close() with open(db_done_path,'w') as ff: ff.write("") print(f'Inserting into DB, {video_file_to_upload}') for msg in consumer: key = msg.key value = msg.value file_path = value['filepath'] success = False try: upload_vector_file(value['filepath']) success = True logger.info(f"SUCCESS_UPLOADING :{pfm(file_path)}") except Exception as e: logger.info(f"ERROR_UPLOADING :{pfm(file_path)} + {e}") d_send = {'value':msg.value, 'key':msg.key} if success: send_topic = kwq.TOPICS.videos_embedding_in_db else: send_topic = kwq.TOPICS.videos_embedding_in_db_fail producer.send(send_topic, **d_send)