from CommonCode.video_meta import FTPVideo from CommonCode.settings import get_logger import logging import json import datetime as dt import functools import requests import numpy as np from pqdm.processes import pqdm from multiprocessing import Pool import os import lttb import pickle import redis from hashlib import md5 r = redis.Redis(host='localhost', port=6379, db=15) logger = get_logger(__name__,'/var/log/vector_search_logs/util_embed_scores', stdout=True, systemd=False, level = logging.INFO) def get_matching_file_for_tstamp(target_tstamp, folder_scores): matching_file = None for video_file in folder_scores['videos']: start_time = video_file['start_time'] end_time = video_file['end_time'] if target_tstamp > start_time and target_tstamp < end_time: matching_file = video_file if matching_file is not None: fname = matching_file['file_name'] offset = target_tstamp - matching_file['start_time'] else: fname = 'None Found' offset = -1 web_name = 'media/'+os.path.basename(fname) return dict(full_path = fname, path=web_name, timeoffset = offset) def get_vec_rep_file_loc(c_dir): vec_rep_file = os.path.join(c_dir, 'vec_rep.npz') return vec_rep_file def get_vector_representation(c_dir, force_compute = False, redis_key = 'compute_log'): message = {'task':'VECTOR_CALC_IN_FOLDER_START', 'when': str(c_dir), 'time': dt.datetime.now().timestamp()} r.rpush(redis_key, json.dumps(message)) vec_rep_file = get_vec_rep_file_loc(c_dir) if os.path.exists(vec_rep_file) and not force_compute: try: result = dict(np.load(vec_rep_file)) message = {'task':'VECTOR_CALC_IN_FOLDER_DONE', 'when': str(c_dir), 'time': dt.datetime.now().timestamp(), 'precomputed':True} r.rpush(redis_key, json.dumps(message)) return result except: os.remove(vec_rep_file) ff = list() for root, dirs, files in os.walk(c_dir): for f in files: if f.endswith('.mp4') and '_reduced' not in f: ff.append(os.path.join(root, f)) videos = list() for x in ff: cvid = FTPVideo(x) videos.append(FTPVideo(x)) sorted_videos = sorted(videos) all_cat = list() all_idx = list() all_source = list() all_tstamps = list() enu = 0 for idx, x in enumerate(sorted_videos): try: hh = x.embeddings except Exception as e: hh = None if hh is not None: n_emb = FTPVideo.vec_norm(hh['embeds']) all_cat.append(n_emb) all_idx.append( enu * np.ones(n_emb.shape[0], dtype=np.int64) ) all_source.append(x.real_path) all_tstamps.append( [x.timestamp() for x in hh['frame_time']]) enu +=1 message = {'task':'VECTOR_CALC_IN_FOLDER_BUMP', 'when': c_dir, 'progress': idx+1, 'how_many': len(sorted_videos), 'time': dt.datetime.now().timestamp()} r.rpush(redis_key, json.dumps(message)) if len(all_cat) == 0: return [] all_embeds = np.vstack(all_cat) all_embeds = FTPVideo.vec_norm(all_embeds) all_idces = np.hstack(all_idx) all_times = np.hstack(all_tstamps) np.savez(vec_rep_file, embeds = all_embeds, idces= all_idces, timestamps = all_times, source_files = all_source) message = {'task':'VECTOR_CALC_IN_FOLDER_DONE', 'when': str(c_dir), 'time': dt.datetime.now().timestamp()} r.rpush(redis_key, json.dumps(message)) return dict( embeds = all_embeds, idces= all_idces, timestamps = all_times, source_files = all_source) def get_scores_embedding_c_dir(c_dir, query_vector, redis_key = 'compute_log'): vec_rep = get_vector_representation(c_dir, redis_key=redis_key) query_scores = (query_vector @ vec_rep['embeds'].T).squeeze() return vec_rep, query_scores @functools.lru_cache def get_query_vector(query): vec_form = requests.get('http://192.168.1.242:53004/encode',params={'query':query}).json()['vector'][0] vec_search = np.asarray(vec_form) query_vector = FTPVideo.vec_norm(vec_search[None,:]) return query_vector def calculate_embedding_score_in_folders(c_dirs, threshold, query = None, query_vector = None, redis_key = 'compute_log'): result_list = list() query_vector = None if query_vector is None: query_vector = get_query_vector(query) # kwargs = [{'c_dir':x, 'threshold':threshold, 'query': query} for x in c_dirs] args = [(x, threshold, query, None, logger, redis_key) for x in c_dirs] # logger.info(f"CALCULATING FOR {args}") with Pool(processes=8) as pool: out = pool.starmap(calculate_embedding_score_in_folder, args) # logger.info(f"DONE CALCULATING FOR {args}") cache_files = list(); for x, cache_file_loc in out: try: result_list.extend(x['videos']) cache_files.append(cache_file_loc); except Exception as e: print(e, x) return {'videos':result_list, 'cache_file_locs': cache_files} def collapse_scores_to_maxmin_avg(folder_scores): result = list() for c_data in folder_scores['videos']: new_d = c_data.copy() scores = new_d['embed_scores']['score'] max_score = max(scores) min_score = min(scores) max_score_idx = scores.index(max_score) min_score_idx = scores.index(min_score) max_score_time = new_d['embed_scores']['time'][max_score_idx] min_score_time = new_d['embed_scores']['time'][min_score_idx] new_d['embed_scores']['score'] = [min_score, max_score, max_score_time, min_score_time] new_d['embed_scores']['time'] = max(new_d['embed_scores']['time']) result.append(new_d) return result # c_data = {'file_name': str(s_file), 'start_time':start_time, 'end_time':end_time, 'embed_scores':{'time':frame_time, 'score':embed_scores}} # video_json_info.append(c_data) # to_write = {'source_files': vec_rep['source_files'], 'videos': video_json_info} # with open(cache_file_loc, 'wb') as f: # logger.info(f"WRITING EMBEDDING SCORE TO CACHE {cache_file_loc}") # pickle.dump(to_write, f) def calculate_embedding_score_in_folder(og_dir, threshold, query = None, query_vector = None, logger = logger, redis_key = 'compute_log'): message = {'task':'SCORE_CALC_IN_FOLDER_START', 'when': str(og_dir), 'time': dt.datetime.now().timestamp()} r.rpush(redis_key, json.dumps(message)) if query_vector is None: query_vector = get_query_vector(query) candidate_dirs = list() candidate_dirs.append(og_dir) candidate_dirs.append(og_dir.replace('/srv/ftp_tcc','/mnt/hdd_24tb_1/videos/ftp')) candidate_dirs.append(og_dir.replace('/srv/ftp','/mnt/hdd_24tb_1/videos/ftp')) c_dir = None for candidate in candidate_dirs: if os.path.exists(candidate): c_dir = candidate break if c_dir is None: return [] vec_cache_str = md5(query_vector).hexdigest() cache_file_loc = os.path.join(c_dir, 'embedding_scores@'+str(threshold)+'@'+vec_cache_str+'.pkl') if os.path.exists(cache_file_loc): logger.info(f"TRYING TO LOAD CACHE {cache_file_loc}") try: with open(cache_file_loc, 'rb') as f: video_json_info = pickle.load(f) files_in_cache = {os.path.splitext(os.path.basename(x))[0] for x in video_json_info.get('source_files',[])} lsd_dir = os.listdir(c_dir) files_on_disk = {x.split(".")[0] for x in lsd_dir if x.endswith('oclip_embeds.npz')} if files_on_disk == files_in_cache: logger.info(f"LOADED EMBEDDING SCORE FROM CACHE {cache_file_loc}") message = {'task':'SCORE_CALC_IN_FOLDER_DONE', 'when': str(c_dir), 'time': dt.datetime.now().timestamp(), 'precomputed': True} r.rpush(redis_key, json.dumps(message)) return (video_json_info, cache_file_loc) else: logger.info(f"CACHE FILE IS OLD, DELETING VEC REP FILE AND RECREATING {cache_file_loc}") os.remove( get_vec_rep_file_loc(c_dir)) except Exception as e: logger.info(f"CACHE FILE IS CORRUPT, RECREATING {cache_file_loc} {e}") os.remove(cache_file_loc) pass # vec_rep = get_vector_representation(c_dir, redis_key = redis_key) vec_rep, query_scores = get_scores_embedding_c_dir(c_dir, tuple(query_vector.tolist()[0]), redis_key = redis_key) video_json_info = list() idces_keep = np.where(query_scores > threshold)[0] video_id = vec_rep['idces'][idces_keep] videos_that_match = np.unique(video_id) # subset_timestampsF = list() # for s in videos_that_match: # idces_entry = np.where(vec_rep['idces'] == s)[0] # min_idces = idces_entry[0] # max_idces = idces_entry[-1] # subset_timestampsF.append( [ vec_rep['timestamps'][min_idces], vec_rep['timestamps'][max_idces]]) id_extract_video_level = np.where(np.isin(vec_rep['idces'], videos_that_match))[0] idces_split = np.where(np.diff(vec_rep['idces'][id_extract_video_level]) !=0)[0] + 1 subset_timestampsF = np.split(vec_rep['timestamps'][id_extract_video_level], idces_split) for subset_t in subset_timestampsF: if len(subset_t) == 0: continue min_t = min(subset_t) max_t = max(subset_t) idces_curr = np.where(np.logical_and(vec_rep['timestamps'] > min_t , vec_rep['timestamps'] < max_t))[0] if len(idces_curr) == 0: continue unq_vids = np.unique(vec_rep['idces'][idces_curr]) subset_idx = np.where(np.isin(vec_rep['idces'],unq_vids))[0] subset_idces = vec_rep['idces'][subset_idx] subset_timestamps = vec_rep['timestamps'][subset_idx] subset_scores = query_scores[subset_idx] idx_split = np.where(np.diff(vec_rep['idces'][subset_idx]) !=0)[0]+1 split_idces = np.split(subset_idces, idx_split) split_timestamps = np.split(subset_timestamps, idx_split) split_scores = np.split(subset_scores, idx_split) split_files = [vec_rep['source_files'][x[0]] for x in split_idces] for s_file, s_scores, s_tstamps, s_idces in zip(split_files, split_scores, split_timestamps, split_idces): start_time = float(min(s_tstamps)) end_time = float(max(s_tstamps)) frame_time = (s_tstamps - start_time).tolist() embed_scores = s_scores.tolist() c_data = {'file_name': str(s_file), 'start_time':start_time, 'end_time':end_time, 'embed_scores':{'time':frame_time, 'score':embed_scores}} video_json_info.append(c_data) message = {'task':'SCORE_CALC_IN_FOLDER_DONE', 'when': str(c_dir), 'time': dt.datetime.now().timestamp()} r.rpush(redis_key, json.dumps(message)) to_write = {'source_files': vec_rep['source_files'], 'videos': video_json_info} with open(cache_file_loc, 'wb') as f: logger.info(f"WRITING EMBEDDING SCORE TO CACHE {cache_file_loc}") pickle.dump(to_write, f) logger.info(f"SAVED EMBEDDING SCORE TO CACHE {cache_file_loc}") return (to_write, cache_file_loc) def get_matching_file_given_filename(web_name, folder_scores): file_name = None for x in folder_scores['videos']: if x['file_name'].endswith(web_name): file_name = x['file_name'] candidate_files = list() candidate_files.append(file_name) candidate_files.append(file_name.replace('/srv/ftp_tcc','/mnt/hdd_24tb_1/videos/ftp')) candidate_files.append(file_name.replace('/srv/ftp','/mnt/hdd_24tb_1/videos/ftp')) file_name = None for candidate in candidate_files: if os.path.exists(candidate): file_name = candidate break return file_name #c_dirs = ['/mnt/hdd_24tb_1/videos/ftp/leopards2/2025/08/26','/srv/ftp_tcc/leopards1/2025/08/27','/srv/ftp_tcc/leopards1/2025/08/28','/srv/ftp_tcc/leopards1/2025/08/29'] #op = calculate_embedding_score_in_folders( tuple(c_dirs), 0.10, query = 'A cat and human') def add_breaks_between_videos(op, threshold_to_split_seconds = 30*60): # 30 minutes): ranges = list() for vids in op['videos']: ranges.append( (vids['start_time'], vids['end_time']) ) breaks = list() for idx in range(len(ranges)-1): current_range = ranges[idx] next_range = ranges[idx+1] end_now = current_range[1] start_next = next_range[0] if (start_next - end_now) > threshold_to_split_seconds: breaks.append((end_now, start_next)) return breaks