522 lines
16 KiB
Python
522 lines
16 KiB
Python
from CommonCode.video_meta import FTPVideo
|
|
from CommonCode.settings import get_logger
|
|
from CommonCode import kwq
|
|
import logging
|
|
import json
|
|
import datetime as dt
|
|
import functools
|
|
import requests
|
|
import numpy as np
|
|
import time
|
|
|
|
from pqdm.processes import pqdm
|
|
from multiprocessing import Pool
|
|
import os
|
|
import lttb
|
|
import pickle
|
|
import redis
|
|
from hashlib import md5
|
|
|
|
|
|
|
|
r = redis.Redis(host="localhost", port=6379, db=15)
|
|
|
|
logger = get_logger(
|
|
__name__,
|
|
"/var/log/vector_search_logs/util_embed_scores",
|
|
stdout=True,
|
|
systemd=False,
|
|
level=logging.INFO,
|
|
)
|
|
|
|
|
|
|
|
def get_embed_cache_file_search_path(file_path):
|
|
return os.path.splitext(file_path)[0] + ".oclip_embeds.npz"
|
|
|
|
|
|
def get_embeddings_for_a_file(file_path, frame_interval=1):
|
|
if not os.path.exists(file_path):
|
|
return {"Error": f"No file exists: {file_path}"}
|
|
file_search_path = get_embed_cache_file_search_path(file_path)
|
|
force_score = False
|
|
llvec = None
|
|
|
|
logger.error(f"GETTING EMBEDDINGS FOR A FILE {file_path}")
|
|
if os.path.exists(file_search_path):
|
|
llvec = np.load(file_search_path)
|
|
frs = llvec["frame_numbers"]
|
|
if set(np.unique(np.diff(frs))) != {frame_interval}:
|
|
force_score = True
|
|
llvec = None
|
|
|
|
if not os.path.exists(file_search_path) or force_score:
|
|
kwq.publish(
|
|
kwq.TOPICS.enter_60_videos_embed_priority,
|
|
file_path,
|
|
{
|
|
"push_to_db": False,
|
|
"frame_interval": frame_interval,
|
|
"force_score": force_score,
|
|
},
|
|
)
|
|
|
|
if llvec is None:
|
|
for i in range(120):
|
|
print("waiting")
|
|
if os.path.exists(file_search_path):
|
|
print("Found embedding path!")
|
|
llvec = np.load(file_search_path)
|
|
break
|
|
else:
|
|
time.sleep(1)
|
|
|
|
|
|
return llvec
|
|
|
|
|
|
def get_matching_file_for_tstamp(target_tstamp, folder_scores):
|
|
matching_file = None
|
|
for video_file in folder_scores["videos"]:
|
|
start_time = video_file["start_time"]
|
|
end_time = video_file["end_time"]
|
|
|
|
if target_tstamp > start_time and target_tstamp < end_time:
|
|
matching_file = video_file
|
|
|
|
if matching_file is not None:
|
|
fname = matching_file["file_name"]
|
|
offset = target_tstamp - matching_file["start_time"]
|
|
else:
|
|
fname = "None Found"
|
|
offset = -1
|
|
|
|
web_name = "media/" + os.path.basename(fname)
|
|
return dict(full_path=fname, path=web_name, timeoffset=offset)
|
|
|
|
|
|
def get_vec_rep_file_loc(c_dir):
|
|
vec_rep_file = os.path.join(c_dir, "vec_rep.npz")
|
|
return vec_rep_file
|
|
|
|
|
|
def get_vector_representation(c_dir, force_compute=False, redis_key="compute_log"):
|
|
message = {
|
|
"task": "VECTOR_CALC_IN_FOLDER_START",
|
|
"when": str(c_dir),
|
|
"time": dt.datetime.now().timestamp(),
|
|
}
|
|
r.rpush(redis_key, json.dumps(message))
|
|
|
|
vec_rep_file = get_vec_rep_file_loc(c_dir)
|
|
if os.path.exists(vec_rep_file) and not force_compute:
|
|
try:
|
|
result = dict(np.load(vec_rep_file, allow_pickle = True))
|
|
if result['embeds'].ndim == 0:
|
|
result['embeds'] = result['embeds'].tolist()
|
|
|
|
message = {
|
|
"task": "VECTOR_CALC_IN_FOLDER_DONE",
|
|
"when": str(c_dir),
|
|
"time": dt.datetime.now().timestamp(),
|
|
"precomputed": True,
|
|
}
|
|
r.rpush(redis_key, json.dumps(message))
|
|
return result
|
|
except:
|
|
os.remove(vec_rep_file)
|
|
|
|
ff = list()
|
|
for root, dirs, files in os.walk(c_dir):
|
|
for f in files:
|
|
if f.endswith(".mp4") and "_reduced" not in f:
|
|
ff.append(os.path.join(root, f))
|
|
|
|
videos = list()
|
|
for x in ff:
|
|
cvid = FTPVideo(x)
|
|
videos.append(FTPVideo(x))
|
|
|
|
sorted_videos = sorted(videos)
|
|
|
|
all_cat = list()
|
|
all_idx = list()
|
|
all_source = list()
|
|
all_tstamps = list()
|
|
enu = 0
|
|
ts_e = 0
|
|
id_e = 0
|
|
for idx, x in enumerate(sorted_videos):
|
|
try:
|
|
hh = x.embeddings
|
|
except Exception as e:
|
|
hh = None
|
|
|
|
if hh is not None:
|
|
all_source.append(x.real_path)
|
|
enu += 1
|
|
|
|
if hh.get('embeds',None) is not None:
|
|
n_emb = FTPVideo.vec_norm(hh["embeds"])
|
|
all_cat.append(n_emb)
|
|
ts_e+= n_emb.shape[-2]
|
|
arr_app = (enu-1) * np.ones(n_emb.shape[-2], dtype=np.int64)
|
|
all_idx.append(arr_app)
|
|
id_e+= len(arr_app)
|
|
|
|
all_tstamps.append([x.timestamp() for x in hh["frame_time"]])
|
|
|
|
|
|
|
|
message = {
|
|
"task": "VECTOR_CALC_IN_FOLDER_BUMP",
|
|
"when": c_dir,
|
|
"progress": idx + 1,
|
|
"how_many": len(sorted_videos),
|
|
"time": dt.datetime.now().timestamp(),
|
|
}
|
|
r.rpush(redis_key, json.dumps(message))
|
|
|
|
if len(all_cat) == 0:
|
|
return None
|
|
# all_embeds = np.vstack(all_cat)
|
|
|
|
all_embeds = {idx:x for idx,x in enumerate(all_cat)}
|
|
|
|
all_idces = np.hstack(all_idx)
|
|
all_times = np.hstack(all_tstamps)
|
|
|
|
np.savez(
|
|
vec_rep_file,
|
|
embeds=all_embeds,
|
|
idces=all_idces,
|
|
timestamps=all_times,
|
|
source_files=all_source,
|
|
)
|
|
message = {
|
|
"task": "VECTOR_CALC_IN_FOLDER_DONE",
|
|
"when": str(c_dir),
|
|
"time": dt.datetime.now().timestamp(),
|
|
}
|
|
r.rpush(redis_key, json.dumps(message))
|
|
return dict(
|
|
embeds=all_embeds,
|
|
idces=all_idces,
|
|
timestamps=all_times,
|
|
source_files=all_source,
|
|
)
|
|
|
|
|
|
def get_scores_embedding_c_dir(c_dir, query_vector, redis_key="compute_log"):
|
|
query_scores = None
|
|
vec_rep = get_vector_representation(c_dir, redis_key=redis_key)
|
|
if isinstance(vec_rep['embeds'], dict):
|
|
vec_rep['embeds'] = [y for x,y in vec_rep['embeds'].items()]
|
|
|
|
if vec_rep is None:
|
|
return None, None
|
|
|
|
if isinstance(vec_rep['embeds'], list):
|
|
q_scores = list()
|
|
for emb in vec_rep['embeds']:
|
|
d = len(query_vector)
|
|
similarity = (query_vector @ emb.reshape(-1, d).T).reshape(emb.shape[:-1])
|
|
|
|
if similarity.ndim == 2:
|
|
scores = np.max(similarity,axis=0)
|
|
else:
|
|
scores = similarity
|
|
q_scores.append(scores)
|
|
query_scores = np.hstack(q_scores)
|
|
else:
|
|
query_scores = (query_vector @ vec_rep["embeds"].T).squeeze()
|
|
return vec_rep, query_scores
|
|
|
|
@functools.lru_cache
|
|
def get_clip_scores(video_path: str, prompt: str):
|
|
if not os.path.exists(get_embed_cache_file_search_path(video_path)):
|
|
get_embeddings_for_a_file(video_path)
|
|
ff = FTPVideo(video_path, ignore_filename=True)
|
|
res = ff.embeddings
|
|
|
|
prompt_embeds = get_query_vector(prompt)
|
|
video_norm_embeds = FTPVideo.vec_norm(res["embeds"])
|
|
prompt_norm_embed = FTPVideo.vec_norm(prompt_embeds)
|
|
scores = ( prompt_norm_embed @ video_norm_embeds.T).squeeze().tolist()
|
|
|
|
results = np.asarray([res["frame_offsets"], scores])
|
|
return results
|
|
|
|
|
|
@functools.lru_cache
|
|
def get_query_vector(query):
|
|
vec_form = requests.get(
|
|
"http://192.168.1.242:53004/encode", params={"query": query}
|
|
).json()["vector"][0]
|
|
vec_search = np.asarray(vec_form)
|
|
query_vector = FTPVideo.vec_norm(vec_search[None, :])
|
|
return query_vector
|
|
|
|
|
|
def calculate_embedding_score_in_folders(
|
|
c_dirs, threshold, query=None, query_vector=None, redis_key="compute_log"
|
|
):
|
|
result_list = list()
|
|
query_vector = None
|
|
if query_vector is None:
|
|
query_vector = get_query_vector(query)
|
|
|
|
# kwargs = [{'c_dir':x, 'threshold':threshold, 'query': query} for x in c_dirs]
|
|
args = [(x, threshold, query, None, logger, redis_key) for x in c_dirs]
|
|
|
|
# logger.info(f"CALCULATING FOR {args}")
|
|
with Pool(processes=8) as pool:
|
|
out = pool.starmap(calculate_embedding_score_in_folder, args)
|
|
# logger.info(f"DONE CALCULATING FOR {args}")
|
|
|
|
cache_files = list()
|
|
out = [x for x in out if x is not None]
|
|
|
|
|
|
for result in out:
|
|
try:
|
|
x, cache_file_loc = result
|
|
result_list.extend(x["videos"])
|
|
cache_files.append(cache_file_loc)
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
return {"videos": result_list, "cache_file_locs": cache_files}
|
|
|
|
|
|
def collapse_scores_to_maxmin_avg(folder_scores):
|
|
|
|
result = list()
|
|
for c_data in folder_scores["videos"]:
|
|
new_d = c_data.copy()
|
|
|
|
scores = new_d["embed_scores"]["score"]
|
|
max_score = max(scores)
|
|
min_score = min(scores)
|
|
max_score_idx = scores.index(max_score)
|
|
min_score_idx = scores.index(min_score)
|
|
max_score_time = new_d["embed_scores"]["time"][max_score_idx]
|
|
min_score_time = new_d["embed_scores"]["time"][min_score_idx]
|
|
new_d["embed_scores"]["score"] = [
|
|
min_score,
|
|
max_score,
|
|
max_score_time,
|
|
min_score_time,
|
|
]
|
|
new_d["embed_scores"]["time"] = max(new_d["embed_scores"]["time"])
|
|
result.append(new_d)
|
|
|
|
return result
|
|
# c_data = {'file_name': str(s_file), 'start_time':start_time, 'end_time':end_time, 'embed_scores':{'time':frame_time, 'score':embed_scores}}
|
|
# video_json_info.append(c_data)
|
|
|
|
# to_write = {'source_files': vec_rep['source_files'], 'videos': video_json_info}
|
|
# with open(cache_file_loc, 'wb') as f:
|
|
# logger.info(f"WRITING EMBEDDING SCORE TO CACHE {cache_file_loc}")
|
|
# pickle.dump(to_write, f)
|
|
|
|
|
|
def calculate_embedding_score_in_folder(
|
|
og_dir,
|
|
threshold,
|
|
query=None,
|
|
query_vector=None,
|
|
logger=logger,
|
|
redis_key="compute_log",
|
|
):
|
|
message = {
|
|
"task": "SCORE_CALC_IN_FOLDER_START",
|
|
"when": str(og_dir),
|
|
"time": dt.datetime.now().timestamp(),
|
|
}
|
|
r.rpush(redis_key, json.dumps(message))
|
|
|
|
if query_vector is None:
|
|
query_vector = get_query_vector(query)
|
|
|
|
candidate_dirs = list()
|
|
candidate_dirs.append(og_dir)
|
|
candidate_dirs.append(og_dir.replace("/srv/ftp_tcc", "/mnt/hdd_24tb_1/videos/ftp"))
|
|
candidate_dirs.append(og_dir.replace("/srv/ftp", "/mnt/hdd_24tb_1/videos/ftp"))
|
|
|
|
c_dir = None
|
|
for candidate in candidate_dirs:
|
|
if os.path.exists(candidate):
|
|
if len([x for x in os.listdir(candidate) if x.endswith(".mp4")]) > 5:
|
|
c_dir = candidate
|
|
break
|
|
if c_dir is None:
|
|
return []
|
|
|
|
vec_cache_str = md5(query_vector).hexdigest()
|
|
cache_file_loc = os.path.join(
|
|
c_dir, "embedding_scores@" + str(threshold) + "@" + vec_cache_str + ".pkl"
|
|
)
|
|
|
|
if os.path.exists(cache_file_loc):
|
|
logger.info(f"TRYING TO LOAD CACHE {cache_file_loc}")
|
|
try:
|
|
with open(cache_file_loc, "rb") as f:
|
|
video_json_info = pickle.load(f)
|
|
files_in_cache = {
|
|
os.path.splitext(os.path.basename(x))[0]
|
|
for x in video_json_info.get("source_files", [])
|
|
}
|
|
lsd_dir = os.listdir(c_dir)
|
|
files_on_disk = {
|
|
x.split(".")[0] for x in lsd_dir if x.endswith("oclip_embeds.npz")
|
|
}
|
|
if files_on_disk == files_in_cache:
|
|
logger.info(f"LOADED EMBEDDING SCORE FROM CACHE {cache_file_loc}")
|
|
message = {
|
|
"task": "SCORE_CALC_IN_FOLDER_DONE",
|
|
"when": str(c_dir),
|
|
"time": dt.datetime.now().timestamp(),
|
|
"precomputed": True,
|
|
}
|
|
r.rpush(redis_key, json.dumps(message))
|
|
return (video_json_info, cache_file_loc)
|
|
else:
|
|
logger.info(
|
|
f"CACHE FILE IS OLD, DELETING VEC REP FILE AND RECREATING {cache_file_loc}"
|
|
)
|
|
os.remove(get_vec_rep_file_loc(c_dir))
|
|
except Exception as e:
|
|
logger.info(f"CACHE FILE IS CORRUPT, RECREATING {cache_file_loc} {e}")
|
|
os.remove(cache_file_loc)
|
|
|
|
pass
|
|
|
|
# vec_rep = get_vector_representation(c_dir, redis_key = redis_key)
|
|
vec_rep, query_scores = get_scores_embedding_c_dir(
|
|
c_dir, tuple(query_vector.tolist()[0]), redis_key=redis_key
|
|
)
|
|
if vec_rep is None:
|
|
return
|
|
|
|
|
|
video_json_info = list()
|
|
idces_keep = np.where(query_scores > threshold)[0]
|
|
video_id = vec_rep["idces"][idces_keep]
|
|
videos_that_match = np.unique(video_id)
|
|
|
|
# subset_timestampsF = list()
|
|
# for s in videos_that_match:
|
|
# idces_entry = np.where(vec_rep['idces'] == s)[0]
|
|
# min_idces = idces_entry[0]
|
|
# max_idces = idces_entry[-1]
|
|
# subset_timestampsF.append( [ vec_rep['timestamps'][min_idces], vec_rep['timestamps'][max_idces]])
|
|
|
|
id_extract_video_level = np.where(np.isin(vec_rep["idces"], videos_that_match))[0]
|
|
idces_split = (
|
|
np.where(np.diff(vec_rep["idces"][id_extract_video_level]) != 0)[0] + 1
|
|
)
|
|
subset_timestampsF = np.split(
|
|
vec_rep["timestamps"][id_extract_video_level], idces_split
|
|
)
|
|
|
|
for subset_t in subset_timestampsF:
|
|
if len(subset_t) == 0:
|
|
continue
|
|
|
|
min_t = min(subset_t)
|
|
max_t = max(subset_t)
|
|
idces_curr = np.where(
|
|
np.logical_and(vec_rep["timestamps"] > min_t, vec_rep["timestamps"] < max_t)
|
|
)[0]
|
|
if len(idces_curr) == 0:
|
|
continue
|
|
|
|
unq_vids = np.unique(vec_rep["idces"][idces_curr])
|
|
subset_idx = np.where(np.isin(vec_rep["idces"], unq_vids))[0]
|
|
|
|
subset_idces = vec_rep["idces"][subset_idx]
|
|
subset_timestamps = vec_rep["timestamps"][subset_idx]
|
|
subset_scores = query_scores[subset_idx]
|
|
idx_split = np.where(np.diff(vec_rep["idces"][subset_idx]) != 0)[0] + 1
|
|
|
|
split_idces = np.split(subset_idces, idx_split)
|
|
split_timestamps = np.split(subset_timestamps, idx_split)
|
|
split_scores = np.split(subset_scores, idx_split)
|
|
split_files = [vec_rep["source_files"][x[0]] for x in split_idces]
|
|
|
|
for s_file, s_scores, s_tstamps, s_idces in zip(
|
|
split_files, split_scores, split_timestamps, split_idces
|
|
):
|
|
start_time = float(min(s_tstamps))
|
|
end_time = float(max(s_tstamps))
|
|
|
|
frame_time = (s_tstamps - start_time).tolist()
|
|
embed_scores = s_scores.tolist()
|
|
|
|
c_data = {
|
|
"file_name": str(s_file),
|
|
"start_time": start_time,
|
|
"end_time": end_time,
|
|
"embed_scores": {"time": frame_time, "score": embed_scores},
|
|
}
|
|
video_json_info.append(c_data)
|
|
|
|
message = {
|
|
"task": "SCORE_CALC_IN_FOLDER_DONE",
|
|
"when": str(c_dir),
|
|
"time": dt.datetime.now().timestamp(),
|
|
}
|
|
r.rpush(redis_key, json.dumps(message))
|
|
to_write = {"source_files": vec_rep["source_files"], "videos": video_json_info}
|
|
with open(cache_file_loc, "wb") as f:
|
|
logger.info(f"WRITING EMBEDDING SCORE TO CACHE {cache_file_loc}")
|
|
pickle.dump(to_write, f)
|
|
logger.info(f"SAVED EMBEDDING SCORE TO CACHE {cache_file_loc}")
|
|
return (to_write, cache_file_loc)
|
|
|
|
|
|
def get_matching_file_given_filename(web_name, folder_scores):
|
|
file_name = None
|
|
for x in folder_scores["videos"]:
|
|
if x["file_name"].endswith(web_name):
|
|
file_name = x["file_name"]
|
|
|
|
candidate_files = list()
|
|
candidate_files.append(file_name)
|
|
candidate_files.append(
|
|
file_name.replace("/srv/ftp_tcc", "/mnt/hdd_24tb_1/videos/ftp")
|
|
)
|
|
candidate_files.append(file_name.replace("/srv/ftp", "/mnt/hdd_24tb_1/videos/ftp"))
|
|
|
|
file_name = None
|
|
for candidate in candidate_files:
|
|
if os.path.exists(candidate):
|
|
file_name = candidate
|
|
break
|
|
|
|
return file_name
|
|
|
|
|
|
# c_dirs = ['/mnt/hdd_24tb_1/videos/ftp/leopards2/2025/08/26','/srv/ftp_tcc/leopards1/2025/08/27','/srv/ftp_tcc/leopards1/2025/08/28','/srv/ftp_tcc/leopards1/2025/08/29']
|
|
# op = calculate_embedding_score_in_folders( tuple(c_dirs), 0.10, query = 'A cat and human')
|
|
|
|
|
|
def add_breaks_between_videos(op, threshold_to_split_seconds=30 * 60): # 30 minutes):
|
|
ranges = list()
|
|
for vids in op["videos"]:
|
|
ranges.append((vids["start_time"], vids["end_time"]))
|
|
|
|
breaks = list()
|
|
for idx in range(len(ranges) - 1):
|
|
current_range = ranges[idx]
|
|
next_range = ranges[idx + 1]
|
|
|
|
end_now = current_range[1]
|
|
start_next = next_range[0]
|
|
|
|
if (start_next - end_now) > threshold_to_split_seconds:
|
|
breaks.append((end_now, start_next))
|
|
|
|
return breaks
|