340 lines
12 KiB
Python
340 lines
12 KiB
Python
from CommonCode.video_meta import FTPVideo
|
|
from CommonCode.settings import get_logger
|
|
import logging
|
|
import json
|
|
import datetime as dt
|
|
import functools
|
|
import requests
|
|
import numpy as np
|
|
from pqdm.processes import pqdm
|
|
from multiprocessing import Pool
|
|
import os
|
|
import lttb
|
|
import pickle
|
|
import redis
|
|
from hashlib import md5
|
|
|
|
|
|
r = redis.Redis(host='localhost', port=6379, db=15)
|
|
|
|
logger = get_logger(__name__,'/var/log/vector_search_logs/util_embed_scores', stdout=True, systemd=False, level = logging.INFO)
|
|
|
|
|
|
def get_matching_file_for_tstamp(target_tstamp, folder_scores):
|
|
matching_file = None
|
|
for video_file in folder_scores['videos']:
|
|
start_time = video_file['start_time']
|
|
end_time = video_file['end_time']
|
|
|
|
if target_tstamp > start_time and target_tstamp < end_time:
|
|
matching_file = video_file
|
|
|
|
if matching_file is not None:
|
|
fname = matching_file['file_name']
|
|
offset = target_tstamp - matching_file['start_time']
|
|
else:
|
|
fname = 'None Found'
|
|
offset = -1
|
|
|
|
web_name = 'media/'+os.path.basename(fname)
|
|
return dict(full_path = fname, path=web_name, timeoffset = offset)
|
|
|
|
|
|
def get_vec_rep_file_loc(c_dir):
|
|
vec_rep_file = os.path.join(c_dir, 'vec_rep.npz')
|
|
return vec_rep_file
|
|
|
|
def get_vector_representation(c_dir, force_compute = False, redis_key = 'compute_log'):
|
|
message = {'task':'VECTOR_CALC_IN_FOLDER_START', 'when': str(c_dir), 'time': dt.datetime.now().timestamp()}
|
|
r.rpush(redis_key, json.dumps(message))
|
|
|
|
vec_rep_file = get_vec_rep_file_loc(c_dir)
|
|
if os.path.exists(vec_rep_file) and not force_compute:
|
|
try:
|
|
result = dict(np.load(vec_rep_file))
|
|
message = {'task':'VECTOR_CALC_IN_FOLDER_DONE', 'when': str(c_dir), 'time': dt.datetime.now().timestamp(), 'precomputed':True}
|
|
r.rpush(redis_key, json.dumps(message))
|
|
return result
|
|
except:
|
|
os.remove(vec_rep_file)
|
|
|
|
|
|
ff = list()
|
|
for root, dirs, files in os.walk(c_dir):
|
|
for f in files:
|
|
if f.endswith('.mp4') and '_reduced' not in f:
|
|
ff.append(os.path.join(root, f))
|
|
|
|
videos = list()
|
|
for x in ff:
|
|
cvid = FTPVideo(x)
|
|
videos.append(FTPVideo(x))
|
|
|
|
sorted_videos = sorted(videos)
|
|
|
|
all_cat = list()
|
|
all_idx = list()
|
|
all_source = list()
|
|
all_tstamps = list()
|
|
enu = 0
|
|
for idx, x in enumerate(sorted_videos):
|
|
|
|
try:
|
|
hh = x.embeddings
|
|
except Exception as e:
|
|
hh = None
|
|
|
|
if hh is not None:
|
|
n_emb = FTPVideo.vec_norm(hh['embeds'])
|
|
all_cat.append(n_emb)
|
|
all_idx.append( enu * np.ones(n_emb.shape[0], dtype=np.int64) )
|
|
all_source.append(x.real_path)
|
|
all_tstamps.append( [x.timestamp() for x in hh['frame_time']])
|
|
enu +=1
|
|
|
|
message = {'task':'VECTOR_CALC_IN_FOLDER_BUMP', 'progress': idx+1, 'how_many': len(sorted_videos), 'time': dt.datetime.now().timestamp()}
|
|
r.rpush(redis_key, json.dumps(message))
|
|
|
|
if len(all_cat) == 0:
|
|
return []
|
|
all_embeds = np.vstack(all_cat)
|
|
all_embeds = FTPVideo.vec_norm(all_embeds)
|
|
all_idces = np.hstack(all_idx)
|
|
all_times = np.hstack(all_tstamps)
|
|
|
|
np.savez(vec_rep_file, embeds = all_embeds, idces= all_idces, timestamps = all_times, source_files = all_source)
|
|
message = {'task':'VECTOR_CALC_IN_FOLDER_DONE', 'when': str(c_dir), 'time': dt.datetime.now().timestamp()}
|
|
return dict( embeds = all_embeds, idces= all_idces, timestamps = all_times, source_files = all_source)
|
|
|
|
|
|
|
|
|
|
|
|
def get_scores_embedding_c_dir(c_dir, query_vector, redis_key = 'compute_log'):
|
|
vec_rep = get_vector_representation(c_dir, redis_key=redis_key)
|
|
query_scores = (query_vector @ vec_rep['embeds'].T).squeeze()
|
|
return query_scores
|
|
|
|
@functools.lru_cache
|
|
def get_query_vector(query):
|
|
vec_form = requests.get('http://192.168.1.242:53004/encode',params={'query':query}).json()['vector'][0]
|
|
vec_search = np.asarray(vec_form)
|
|
query_vector = FTPVideo.vec_norm(vec_search[None,:])
|
|
return query_vector
|
|
|
|
|
|
|
|
def calculate_embedding_score_in_folders(c_dirs, threshold, query = None, query_vector = None, redis_key = 'compute_log'):
|
|
result_list = list()
|
|
query_vector = None
|
|
if query_vector is None:
|
|
query_vector = get_query_vector(query)
|
|
|
|
|
|
# kwargs = [{'c_dir':x, 'threshold':threshold, 'query': query} for x in c_dirs]
|
|
args = [(x, threshold, query, None, logger, redis_key) for x in c_dirs]
|
|
|
|
logger.info(f"CALCULATING FOR {args}")
|
|
with Pool(processes=8) as pool:
|
|
out = pool.starmap(calculate_embedding_score_in_folder, args)
|
|
logger.info(f"DONE CALCULATING FOR {args}")
|
|
|
|
for x in out:
|
|
try:
|
|
result_list.extend(x['videos'])
|
|
except Exception as e:
|
|
print(e, x)
|
|
|
|
|
|
|
|
return {'videos':result_list}
|
|
|
|
|
|
def collapse_scores_to_maxmin_avg(folder_scores):
|
|
|
|
result = list()
|
|
for c_data in folder_scores['videos']:
|
|
new_d = c_data.copy()
|
|
|
|
scores = new_d['embed_scores']['score']
|
|
max_score = max(scores)
|
|
min_score = min(scores)
|
|
max_score_idx = scores.index(max_score)
|
|
min_score_idx = scores.index(min_score)
|
|
max_score_time = new_d['embed_scores']['time'][max_score_idx]
|
|
min_score_time = new_d['embed_scores']['time'][min_score_idx]
|
|
new_d['embed_scores']['score'] = [min_score, max_score, max_score_time, min_score_time]
|
|
new_d['embed_scores']['time'] = max(new_d['embed_scores']['time'])
|
|
result.append(new_d)
|
|
|
|
return result
|
|
# c_data = {'file_name': str(s_file), 'start_time':start_time, 'end_time':end_time, 'embed_scores':{'time':frame_time, 'score':embed_scores}}
|
|
# video_json_info.append(c_data)
|
|
|
|
# to_write = {'source_files': vec_rep['source_files'], 'videos': video_json_info}
|
|
# with open(cache_file_loc, 'wb') as f:
|
|
# logger.info(f"WRITING EMBEDDING SCORE TO CACHE {cache_file_loc}")
|
|
# pickle.dump(to_write, f)
|
|
|
|
|
|
def calculate_embedding_score_in_folder(og_dir, threshold, query = None, query_vector = None, logger = logger, redis_key = 'compute_log'):
|
|
message = {'task':'SCORE_CALC_IN_FOLDER_START', 'when': str(og_dir), 'time': dt.datetime.now().timestamp()}
|
|
r.rpush(redis_key, json.dumps(message))
|
|
|
|
if query_vector is None:
|
|
query_vector = get_query_vector(query)
|
|
|
|
candidate_dirs = list()
|
|
candidate_dirs.append(og_dir)
|
|
candidate_dirs.append(og_dir.replace('/srv/ftp_tcc','/mnt/hdd_24tb_1/videos/ftp'))
|
|
candidate_dirs.append(og_dir.replace('/srv/ftp','/mnt/hdd_24tb_1/videos/ftp'))
|
|
|
|
c_dir = None
|
|
for candidate in candidate_dirs:
|
|
if os.path.exists(candidate):
|
|
c_dir = candidate
|
|
break
|
|
if c_dir is None:
|
|
return []
|
|
|
|
vec_cache_str = md5(query_vector).hexdigest()
|
|
cache_file_loc = os.path.join(c_dir, 'embedding_scores@'+str(threshold)+'@'+vec_cache_str+'.pkl')
|
|
|
|
if os.path.exists(cache_file_loc):
|
|
logger.info(f"TRYING TO LOAD CACHE {cache_file_loc}")
|
|
try:
|
|
|
|
with open(cache_file_loc, 'rb') as f:
|
|
video_json_info = pickle.load(f)
|
|
files_in_cache = {os.path.splitext(os.path.basename(x))[0] for x in video_json_info.get('source_files',[])}
|
|
lsd_dir = os.listdir(c_dir)
|
|
files_on_disk = {x.split(".")[0] for x in lsd_dir if x.endswith('oclip_embeds.npz')}
|
|
|
|
|
|
if files_on_disk == files_in_cache:
|
|
logger.info(f"LOADED EMBEDDING SCORE FROM CACHE {cache_file_loc}")
|
|
message = {'task':'SCORE_CALC_IN_FOLDER_DONE', 'when': str(c_dir), 'time': dt.datetime.now().timestamp(), 'precomputed': True}
|
|
r.rpush(redis_key, json.dumps(message))
|
|
return video_json_info
|
|
else:
|
|
logger.info(f"CACHE FILE IS OLD, DELETING VEC REP FILE AND RECREATING {cache_file_loc}")
|
|
os.remove( get_vec_rep_file_loc(c_dir))
|
|
except Exception as e:
|
|
logger.info(f"CACHE FILE IS CORRUPT, RECREATING {cache_file_loc} {e}")
|
|
os.remove(cache_file_loc)
|
|
|
|
pass
|
|
|
|
|
|
vec_rep = get_vector_representation(c_dir, redis_key = redis_key)
|
|
query_scores = get_scores_embedding_c_dir(c_dir, tuple(query_vector.tolist()[0]), redis_key = redis_key)
|
|
|
|
video_json_info = list()
|
|
idces_keep = np.where(query_scores > threshold)[0]
|
|
|
|
video_id = vec_rep['idces'][idces_keep]
|
|
videos_that_match = np.unique(video_id)
|
|
|
|
# subset_timestampsF = list()
|
|
# for s in videos_that_match:
|
|
# idces_entry = np.where(vec_rep['idces'] == s)[0]
|
|
# min_idces = idces_entry[0]
|
|
# max_idces = idces_entry[-1]
|
|
# subset_timestampsF.append( [ vec_rep['timestamps'][min_idces], vec_rep['timestamps'][max_idces]])
|
|
|
|
id_extract_video_level = np.where(np.isin(vec_rep['idces'], videos_that_match))[0]
|
|
idces_split = np.where(np.diff(vec_rep['idces'][id_extract_video_level]) !=0)[0] + 1
|
|
subset_timestampsF = np.split(vec_rep['timestamps'][id_extract_video_level], idces_split)
|
|
|
|
|
|
|
|
|
|
for subset_t in subset_timestampsF:
|
|
if len(subset_t) == 0:
|
|
continue
|
|
|
|
min_t = min(subset_t)
|
|
max_t = max(subset_t)
|
|
idces_curr = np.where(np.logical_and(vec_rep['timestamps'] > min_t , vec_rep['timestamps'] < max_t))[0]
|
|
if len(idces_curr) == 0:
|
|
continue
|
|
|
|
unq_vids = np.unique(vec_rep['idces'][idces_curr])
|
|
subset_idx = np.where(np.isin(vec_rep['idces'],unq_vids))[0]
|
|
|
|
subset_idces = vec_rep['idces'][subset_idx]
|
|
subset_timestamps = vec_rep['timestamps'][subset_idx]
|
|
subset_scores = query_scores[subset_idx]
|
|
idx_split = np.where(np.diff(vec_rep['idces'][subset_idx]) !=0)[0]+1
|
|
|
|
split_idces = np.split(subset_idces, idx_split)
|
|
split_timestamps = np.split(subset_timestamps, idx_split)
|
|
split_scores = np.split(subset_scores, idx_split)
|
|
split_files = [vec_rep['source_files'][x[0]] for x in split_idces]
|
|
|
|
for s_file, s_scores, s_tstamps, s_idces in zip(split_files, split_scores, split_timestamps, split_idces):
|
|
start_time = float(min(s_tstamps))
|
|
end_time = float(max(s_tstamps))
|
|
|
|
frame_time = (s_tstamps - start_time).tolist()
|
|
embed_scores = s_scores.tolist()
|
|
|
|
c_data = {'file_name': str(s_file), 'start_time':start_time, 'end_time':end_time, 'embed_scores':{'time':frame_time, 'score':embed_scores}}
|
|
video_json_info.append(c_data)
|
|
|
|
message = {'task':'SCORE_CALC_IN_FOLDER_DONE', 'when': str(c_dir), 'time': dt.datetime.now().timestamp()}
|
|
r.rpush(redis_key, json.dumps(message))
|
|
to_write = {'source_files': vec_rep['source_files'], 'videos': video_json_info}
|
|
with open(cache_file_loc, 'wb') as f:
|
|
logger.info(f"WRITING EMBEDDING SCORE TO CACHE {cache_file_loc}")
|
|
pickle.dump(to_write, f)
|
|
|
|
return to_write
|
|
|
|
|
|
def get_matching_file_given_filename(web_name, folder_scores):
|
|
file_name = None
|
|
for x in folder_scores['videos']:
|
|
if x['file_name'].endswith(web_name):
|
|
file_name = x['file_name']
|
|
|
|
|
|
candidate_files = list()
|
|
candidate_files.append(file_name)
|
|
candidate_files.append(file_name.replace('/srv/ftp_tcc','/mnt/hdd_24tb_1/videos/ftp'))
|
|
candidate_files.append(file_name.replace('/srv/ftp','/mnt/hdd_24tb_1/videos/ftp'))
|
|
|
|
file_name = None
|
|
for candidate in candidate_files:
|
|
if os.path.exists(candidate):
|
|
file_name = candidate
|
|
break
|
|
|
|
|
|
|
|
|
|
return file_name
|
|
|
|
|
|
|
|
#c_dirs = ['/mnt/hdd_24tb_1/videos/ftp/leopards2/2025/08/26','/srv/ftp_tcc/leopards1/2025/08/27','/srv/ftp_tcc/leopards1/2025/08/28','/srv/ftp_tcc/leopards1/2025/08/29']
|
|
#op = calculate_embedding_score_in_folders( tuple(c_dirs), 0.10, query = 'A cat and human')
|
|
|
|
def add_breaks_between_videos(op, threshold_to_split_seconds = 30*60): # 30 minutes):
|
|
ranges = list()
|
|
for vids in op['videos']:
|
|
ranges.append( (vids['start_time'], vids['end_time']) )
|
|
|
|
breaks = list()
|
|
for idx in range(len(ranges)-1):
|
|
current_range = ranges[idx]
|
|
next_range = ranges[idx+1]
|
|
|
|
end_now = current_range[1]
|
|
start_next = next_range[0]
|
|
|
|
if (start_next - end_now) > threshold_to_split_seconds:
|
|
breaks.append((end_now, start_next))
|
|
|
|
return breaks
|