Files
SearchInterface/VectorService/util/embed_scores.py
2025-09-17 16:50:06 -04:00

340 lines
12 KiB
Python

from CommonCode.video_meta import FTPVideo
from CommonCode.settings import get_logger
import logging
import json
import datetime as dt
import functools
import requests
import numpy as np
from pqdm.processes import pqdm
from multiprocessing import Pool
import os
import lttb
import pickle
import redis
from hashlib import md5
r = redis.Redis(host='localhost', port=6379, db=15)
logger = get_logger(__name__,'/var/log/vector_search_logs/util_embed_scores', stdout=True, systemd=False, level = logging.INFO)
def get_matching_file_for_tstamp(target_tstamp, folder_scores):
matching_file = None
for video_file in folder_scores['videos']:
start_time = video_file['start_time']
end_time = video_file['end_time']
if target_tstamp > start_time and target_tstamp < end_time:
matching_file = video_file
if matching_file is not None:
fname = matching_file['file_name']
offset = target_tstamp - matching_file['start_time']
else:
fname = 'None Found'
offset = -1
web_name = 'media/'+os.path.basename(fname)
return dict(full_path = fname, path=web_name, timeoffset = offset)
def get_vec_rep_file_loc(c_dir):
vec_rep_file = os.path.join(c_dir, 'vec_rep.npz')
return vec_rep_file
def get_vector_representation(c_dir, force_compute = False, redis_key = 'compute_log'):
message = {'task':'VECTOR_CALC_IN_FOLDER_START', 'when': str(c_dir), 'time': dt.datetime.now().timestamp()}
r.rpush(redis_key, json.dumps(message))
vec_rep_file = get_vec_rep_file_loc(c_dir)
if os.path.exists(vec_rep_file) and not force_compute:
try:
result = dict(np.load(vec_rep_file))
message = {'task':'VECTOR_CALC_IN_FOLDER_DONE', 'when': str(c_dir), 'time': dt.datetime.now().timestamp(), 'precomputed':True}
r.rpush(redis_key, json.dumps(message))
return result
except:
os.remove(vec_rep_file)
ff = list()
for root, dirs, files in os.walk(c_dir):
for f in files:
if f.endswith('.mp4') and '_reduced' not in f:
ff.append(os.path.join(root, f))
videos = list()
for x in ff:
cvid = FTPVideo(x)
videos.append(FTPVideo(x))
sorted_videos = sorted(videos)
all_cat = list()
all_idx = list()
all_source = list()
all_tstamps = list()
enu = 0
for idx, x in enumerate(sorted_videos):
try:
hh = x.embeddings
except Exception as e:
hh = None
if hh is not None:
n_emb = FTPVideo.vec_norm(hh['embeds'])
all_cat.append(n_emb)
all_idx.append( enu * np.ones(n_emb.shape[0], dtype=np.int64) )
all_source.append(x.real_path)
all_tstamps.append( [x.timestamp() for x in hh['frame_time']])
enu +=1
message = {'task':'VECTOR_CALC_IN_FOLDER_BUMP', 'when': c_dir, 'progress': idx+1, 'how_many': len(sorted_videos), 'time': dt.datetime.now().timestamp()}
r.rpush(redis_key, json.dumps(message))
if len(all_cat) == 0:
return []
all_embeds = np.vstack(all_cat)
all_embeds = FTPVideo.vec_norm(all_embeds)
all_idces = np.hstack(all_idx)
all_times = np.hstack(all_tstamps)
np.savez(vec_rep_file, embeds = all_embeds, idces= all_idces, timestamps = all_times, source_files = all_source)
message = {'task':'VECTOR_CALC_IN_FOLDER_DONE', 'when': str(c_dir), 'time': dt.datetime.now().timestamp()}
r.rpush(redis_key, json.dumps(message))
return dict( embeds = all_embeds, idces= all_idces, timestamps = all_times, source_files = all_source)
def get_scores_embedding_c_dir(c_dir, query_vector, redis_key = 'compute_log'):
vec_rep = get_vector_representation(c_dir, redis_key=redis_key)
query_scores = (query_vector @ vec_rep['embeds'].T).squeeze()
return vec_rep, query_scores
@functools.lru_cache
def get_query_vector(query):
vec_form = requests.get('http://192.168.1.242:53004/encode',params={'query':query}).json()['vector'][0]
vec_search = np.asarray(vec_form)
query_vector = FTPVideo.vec_norm(vec_search[None,:])
return query_vector
def calculate_embedding_score_in_folders(c_dirs, threshold, query = None, query_vector = None, redis_key = 'compute_log'):
result_list = list()
query_vector = None
if query_vector is None:
query_vector = get_query_vector(query)
# kwargs = [{'c_dir':x, 'threshold':threshold, 'query': query} for x in c_dirs]
args = [(x, threshold, query, None, logger, redis_key) for x in c_dirs]
logger.info(f"CALCULATING FOR {args}")
with Pool(processes=8) as pool:
out = pool.starmap(calculate_embedding_score_in_folder, args)
logger.info(f"DONE CALCULATING FOR {args}")
for x in out:
try:
result_list.extend(x['videos'])
except Exception as e:
print(e, x)
return {'videos':result_list}
def collapse_scores_to_maxmin_avg(folder_scores):
result = list()
for c_data in folder_scores['videos']:
new_d = c_data.copy()
scores = new_d['embed_scores']['score']
max_score = max(scores)
min_score = min(scores)
max_score_idx = scores.index(max_score)
min_score_idx = scores.index(min_score)
max_score_time = new_d['embed_scores']['time'][max_score_idx]
min_score_time = new_d['embed_scores']['time'][min_score_idx]
new_d['embed_scores']['score'] = [min_score, max_score, max_score_time, min_score_time]
new_d['embed_scores']['time'] = max(new_d['embed_scores']['time'])
result.append(new_d)
return result
# c_data = {'file_name': str(s_file), 'start_time':start_time, 'end_time':end_time, 'embed_scores':{'time':frame_time, 'score':embed_scores}}
# video_json_info.append(c_data)
# to_write = {'source_files': vec_rep['source_files'], 'videos': video_json_info}
# with open(cache_file_loc, 'wb') as f:
# logger.info(f"WRITING EMBEDDING SCORE TO CACHE {cache_file_loc}")
# pickle.dump(to_write, f)
def calculate_embedding_score_in_folder(og_dir, threshold, query = None, query_vector = None, logger = logger, redis_key = 'compute_log'):
message = {'task':'SCORE_CALC_IN_FOLDER_START', 'when': str(og_dir), 'time': dt.datetime.now().timestamp()}
r.rpush(redis_key, json.dumps(message))
if query_vector is None:
query_vector = get_query_vector(query)
candidate_dirs = list()
candidate_dirs.append(og_dir)
candidate_dirs.append(og_dir.replace('/srv/ftp_tcc','/mnt/hdd_24tb_1/videos/ftp'))
candidate_dirs.append(og_dir.replace('/srv/ftp','/mnt/hdd_24tb_1/videos/ftp'))
c_dir = None
for candidate in candidate_dirs:
if os.path.exists(candidate):
c_dir = candidate
break
if c_dir is None:
return []
vec_cache_str = md5(query_vector).hexdigest()
cache_file_loc = os.path.join(c_dir, 'embedding_scores@'+str(threshold)+'@'+vec_cache_str+'.pkl')
if os.path.exists(cache_file_loc):
logger.info(f"TRYING TO LOAD CACHE {cache_file_loc}")
try:
with open(cache_file_loc, 'rb') as f:
video_json_info = pickle.load(f)
files_in_cache = {os.path.splitext(os.path.basename(x))[0] for x in video_json_info.get('source_files',[])}
lsd_dir = os.listdir(c_dir)
files_on_disk = {x.split(".")[0] for x in lsd_dir if x.endswith('oclip_embeds.npz')}
if files_on_disk == files_in_cache:
logger.info(f"LOADED EMBEDDING SCORE FROM CACHE {cache_file_loc}")
message = {'task':'SCORE_CALC_IN_FOLDER_DONE', 'when': str(c_dir), 'time': dt.datetime.now().timestamp(), 'precomputed': True}
r.rpush(redis_key, json.dumps(message))
return video_json_info
else:
logger.info(f"CACHE FILE IS OLD, DELETING VEC REP FILE AND RECREATING {cache_file_loc}")
os.remove( get_vec_rep_file_loc(c_dir))
except Exception as e:
logger.info(f"CACHE FILE IS CORRUPT, RECREATING {cache_file_loc} {e}")
os.remove(cache_file_loc)
pass
# vec_rep = get_vector_representation(c_dir, redis_key = redis_key)
vec_rep, query_scores = get_scores_embedding_c_dir(c_dir, tuple(query_vector.tolist()[0]), redis_key = redis_key)
video_json_info = list()
idces_keep = np.where(query_scores > threshold)[0]
video_id = vec_rep['idces'][idces_keep]
videos_that_match = np.unique(video_id)
# subset_timestampsF = list()
# for s in videos_that_match:
# idces_entry = np.where(vec_rep['idces'] == s)[0]
# min_idces = idces_entry[0]
# max_idces = idces_entry[-1]
# subset_timestampsF.append( [ vec_rep['timestamps'][min_idces], vec_rep['timestamps'][max_idces]])
id_extract_video_level = np.where(np.isin(vec_rep['idces'], videos_that_match))[0]
idces_split = np.where(np.diff(vec_rep['idces'][id_extract_video_level]) !=0)[0] + 1
subset_timestampsF = np.split(vec_rep['timestamps'][id_extract_video_level], idces_split)
for subset_t in subset_timestampsF:
if len(subset_t) == 0:
continue
min_t = min(subset_t)
max_t = max(subset_t)
idces_curr = np.where(np.logical_and(vec_rep['timestamps'] > min_t , vec_rep['timestamps'] < max_t))[0]
if len(idces_curr) == 0:
continue
unq_vids = np.unique(vec_rep['idces'][idces_curr])
subset_idx = np.where(np.isin(vec_rep['idces'],unq_vids))[0]
subset_idces = vec_rep['idces'][subset_idx]
subset_timestamps = vec_rep['timestamps'][subset_idx]
subset_scores = query_scores[subset_idx]
idx_split = np.where(np.diff(vec_rep['idces'][subset_idx]) !=0)[0]+1
split_idces = np.split(subset_idces, idx_split)
split_timestamps = np.split(subset_timestamps, idx_split)
split_scores = np.split(subset_scores, idx_split)
split_files = [vec_rep['source_files'][x[0]] for x in split_idces]
for s_file, s_scores, s_tstamps, s_idces in zip(split_files, split_scores, split_timestamps, split_idces):
start_time = float(min(s_tstamps))
end_time = float(max(s_tstamps))
frame_time = (s_tstamps - start_time).tolist()
embed_scores = s_scores.tolist()
c_data = {'file_name': str(s_file), 'start_time':start_time, 'end_time':end_time, 'embed_scores':{'time':frame_time, 'score':embed_scores}}
video_json_info.append(c_data)
message = {'task':'SCORE_CALC_IN_FOLDER_DONE', 'when': str(c_dir), 'time': dt.datetime.now().timestamp()}
r.rpush(redis_key, json.dumps(message))
to_write = {'source_files': vec_rep['source_files'], 'videos': video_json_info}
with open(cache_file_loc, 'wb') as f:
logger.info(f"WRITING EMBEDDING SCORE TO CACHE {cache_file_loc}")
pickle.dump(to_write, f)
return to_write
def get_matching_file_given_filename(web_name, folder_scores):
file_name = None
for x in folder_scores['videos']:
if x['file_name'].endswith(web_name):
file_name = x['file_name']
candidate_files = list()
candidate_files.append(file_name)
candidate_files.append(file_name.replace('/srv/ftp_tcc','/mnt/hdd_24tb_1/videos/ftp'))
candidate_files.append(file_name.replace('/srv/ftp','/mnt/hdd_24tb_1/videos/ftp'))
file_name = None
for candidate in candidate_files:
if os.path.exists(candidate):
file_name = candidate
break
return file_name
#c_dirs = ['/mnt/hdd_24tb_1/videos/ftp/leopards2/2025/08/26','/srv/ftp_tcc/leopards1/2025/08/27','/srv/ftp_tcc/leopards1/2025/08/28','/srv/ftp_tcc/leopards1/2025/08/29']
#op = calculate_embedding_score_in_folders( tuple(c_dirs), 0.10, query = 'A cat and human')
def add_breaks_between_videos(op, threshold_to_split_seconds = 30*60): # 30 minutes):
ranges = list()
for vids in op['videos']:
ranges.append( (vids['start_time'], vids['end_time']) )
breaks = list()
for idx in range(len(ranges)-1):
current_range = ranges[idx]
next_range = ranges[idx+1]
end_now = current_range[1]
start_next = next_range[0]
if (start_next - end_now) > threshold_to_split_seconds:
breaks.append((end_now, start_next))
return breaks