116 lines
3.4 KiB
Python
116 lines
3.4 KiB
Python
import glob
|
|
from pymilvus import MilvusClient
|
|
from pymilvus.client.types import LoadState
|
|
|
|
import argparse
|
|
import os
|
|
import json
|
|
from CommonCode.util import exit_if_not_ipython, get_cset_for_file_matching, is_ipython
|
|
from CommonCode import kwq
|
|
from CommonCode.settings import get_logger, LogColorize
|
|
from kafka import TopicPartition
|
|
from kafka.structs import OffsetAndMetadata
|
|
from pprint import pprint
|
|
import numpy as np
|
|
pfm = LogColorize.embeds_in_db
|
|
logger = get_logger(__name__,'/var/log/ml_vision_logs/03_put_into_vectordb', stdout=True, systemd=False)
|
|
|
|
input_topic = kwq.TOPICS.videos_with_nuggets
|
|
|
|
client_id='embedding_place_in_db_1'
|
|
group_id = client_id
|
|
|
|
consumer = kwq.create_consumer(input_topic, group_id = group_id, client_id = client_id)
|
|
c_part = TopicPartition(input_topic, 0)
|
|
consumer.assign([c_part])
|
|
|
|
|
|
producer = kwq.producer
|
|
model_type = 'ViT-L-16-SigLIP2-512'
|
|
|
|
def get_db_embed_done_path(vpath):
|
|
return os.path.splitext(vpath)[0]+'.oclip.orin.indb'
|
|
|
|
|
|
#video_file_to_upload='/srv/ftp/ptz/2025/04/14/PTZBackRight_00_20250414063817.mp4'
|
|
|
|
|
|
def get_vec_path(vpath):
|
|
return os.path.splitext(vpath)[0]+'.oclip.orin'
|
|
|
|
def get_date(vpath):
|
|
split_entries = os.path.splitext(vpath)[0].split('/')
|
|
return ''.join(split_entries[-4:-1])
|
|
|
|
def get_camera_name(vpath):
|
|
split_entries = os.path.splitext(vpath)[0].split('/')
|
|
return split_entries[split_entries.index('ftp')+1]
|
|
|
|
def upload_vector_file(video_file_to_upload):
|
|
client = MilvusClient(
|
|
uri="http://localhost:19530"
|
|
)
|
|
|
|
db_done_path = get_db_embed_done_path(video_file_to_upload)
|
|
if os.path.exists(db_done_path):
|
|
print('Already exists in DB, skipping upload')
|
|
# return
|
|
|
|
video_file_to_upload = get_vec_path(video_file_to_upload)
|
|
|
|
with open(video_file_to_upload,'r') as jj:
|
|
vecs = json.load(jj)
|
|
|
|
|
|
embeds = [x['score'] for x in vecs['scores']]
|
|
fr_nums = [x['frame'] for x in vecs['scores']]
|
|
|
|
fname_root = video_file_to_upload.rsplit('/',1)[-1].split('.')[0]
|
|
fc = fname_root.split('_')[-1]
|
|
|
|
# data = list()
|
|
filepath = video_file_to_upload.replace('/srv/ftp/','').replace('/mergedfs/ftp','').split('.')[-0]
|
|
|
|
data_v2 = list()
|
|
date = get_date(filepath)
|
|
for embed, frame_num in zip(embeds, fr_nums):
|
|
fg = '{0:05g}'.format(frame_num)
|
|
id_num = int(fc+fg)
|
|
embeds_as_np = np.asarray(embed, dtype=np.float16)
|
|
to_put_2 = dict(primary_id= id_num, filepath=filepath, frame_number = int(frame_num), so400m=embeds_as_np, date=str(date))
|
|
data_v2.append(to_put_2)
|
|
|
|
cam_name = get_camera_name(video_file_to_upload)
|
|
client.insert(collection_name = f'nuggets_{cam_name}_so400m_siglip2', data=data_v2)
|
|
client.close()
|
|
|
|
|
|
with open(db_done_path,'w') as ff:
|
|
ff.write("")
|
|
|
|
print(f'Inserting into DB, {video_file_to_upload}')
|
|
|
|
|
|
|
|
for msg in consumer:
|
|
key = msg.key
|
|
value = msg.value
|
|
file_path = value['filepath']
|
|
success = False
|
|
try:
|
|
upload_vector_file(value['filepath'])
|
|
success = True
|
|
logger.info(f"SUCCESS_UPLOADING :{pfm(file_path)}")
|
|
except Exception as e:
|
|
logger.info(f"ERROR_UPLOADING :{pfm(file_path)} + {e}")
|
|
|
|
d_send = {'value':msg.value, 'key':msg.key}
|
|
|
|
if success:
|
|
send_topic = kwq.TOPICS.videos_embedding_in_db
|
|
else:
|
|
send_topic = kwq.TOPICS.videos_embedding_in_db_fail
|
|
|
|
|
|
producer.send(send_topic, **d_send)
|