Files
CommonCode/video_meta.py
2025-09-04 22:48:15 -04:00

200 lines
5.9 KiB
Python

import datetime as dt
import os
import numpy as np
import subprocess
import pickle
from CommonCode.settings import get_logger, LogColorize
import logging
import numpy
pfm = LogColorize.video_meta
if not ('__file__' in vars() or '__file__' in globals()):
__file__ = '/home/thebears/Seafile/Designs/Code/Python/VideoProcessing/VideoMeta.py'
logger = get_logger(__name__, stdout=True, systemd=False)
orig_prefices = ("/srv/ftp", "/srv/ftp_tcc")
new_prefices = "/mnt/hdd_24tb_1/videos/ftp"
def get_info_from_ftp_filename(cpath):
date_str_format = "%Y%m%d%H%M%S"
bname = os.path.basename(cpath)
froot, ext = os.path.splitext(bname)
cam_name, index, time_str = froot.split("_")
time_stamp = dt.datetime.strptime(time_str, date_str_format)
file_info = {
"name": cam_name,
"index": index,
"timestamp": time_stamp,
"path": os.path.abspath(cpath),
}
return file_info
def get_cache_loc(cpath):
fpathroot, ext = os.path.splitext(cpath)
return fpathroot + '.timestamp_cache'
class FTPVideo:
def __init__(self, cpath):
self.cpath = cpath
self.file_info = get_info_from_ftp_filename(cpath)
self._real_path = None
self._frame_info = None
self._embeds = None
self._embed_scores = None
def __lt__(self, other):
comp_val = other;
if hasattr(other, 'timestamp'):
comp_val = other.timestamp
return self.timestamp < comp_val
@staticmethod
def vec_norm(vec_in):
return vec_in / np.linalg.norm(vec_in, axis=1)[:,None]
@property
def embed_scores(self):
return self._embed_scores
def attach_embedding_score(self, scores):
self._embed_scores = scores
@property
def embeddings(self):
embeds_path = os.path.splitext(self.real_path)[0] + '.oclip_embeds.npz'
if not os.path.exists(embeds_path):
return None
if self._embeds is None:
npz_contents = np.load(embeds_path)
self._embeds = npz_contents
npz_contents = self._embeds
ret_dict = {};
ret_dict['embeds'] = npz_contents['embeds']
ret_dict['frame_numbers'] = [int(x) for x in npz_contents['frame_numbers']]
ret_dict['frame_offsets'] = [self.frames_info[x]['offset'] for x in ret_dict['frame_numbers']]
ret_dict['frame_time'] = [self.frames_info[x]['time'] for x in ret_dict['frame_numbers']]
e_scores = self.embed_scores
if self.embed_scores is not None:
ret_dict['embed_scores'] = e_scores
return ret_dict
def try_cache_read(self):
cache_loc = get_cache_loc(self.real_path)
if os.path.exists(cache_loc):
logger.info(pfm(f'ATTEMPT READING FROM CACHE: {cache_loc}'))
try:
with open(cache_loc,'rb') as ff:
data = pickle.load(ff)
logger.info(pfm(f'READ FROM CACHE: {cache_loc}'))
return data
except Exception as e:
logger.warn(pfm(f'READ FROM CACHE FAILED: {e} while reading {cache_loc}'))
return None
def try_cache_write(self, data):
cache_loc = get_cache_loc(self.real_path)
logger.info(pfm(f'ATTEMPTING WRITING TO CACHE: {cache_loc}'))
try:
with open(cache_loc, 'wb') as ff:
pickle.dump(data, ff)
logger.info(pfm(f'WROTE TO CACHE: {cache_loc}'))
except Exception as e:
logger.warn(pfm(f'WRITE TO CACHE FAILED: {e} while writing {cache_loc}'))
@property
def frames_info(self):
if self._frame_info is None:
self._frame_info = self.try_cache_read();
if self._frame_info is not None:
return self._frame_info
self._frame_info = self.get_frames_info()
self.try_cache_write(self._frame_info)
return self._frame_info
def get_frames_info(self):
if self._frame_info is not None:
return self._frame_info
fpath = self.real_path
cmd = f"ffprobe -select_streams v:0 -show_entries packet=pts_time,flags -of csv {fpath}"
logger.info(pfm(f'RUNNING FFPROBE FOR {fpath}'))
try:
out = subprocess.check_output(cmd.split(), stderr=subprocess.DEVNULL)
except Exception as e:
logger.warn(pfm(f'RUNNING FFPROBE FAILED FOR {e} on {fpath}'))
logger.info(pfm(f'RAN FFPROBE SUCCESSFULLY FOR {fpath}'))
timestamps = list()
for line in out.decode("UTF-8").split("\n"):
if "," not in line:
continue
_, fr_s, fr_type = line.split(",")
fr_s = float(fr_s)
timestamps.append(
{
"offset": fr_s,
"type": fr_type,
"time": dt.timedelta(seconds=fr_s) + self.timestamp,
}
)
self._frame_info = timestamps
return timestamps
@property
def timestamp(self):
return self.file_info["timestamp"]
@property
def camera_name(self):
return self.file_info["name"]
@property
def path(self):
return self.cpath
@property
def real_path(self):
if self._real_path is not None:
logger.debug(pfm(f'FOUND REAL PATH AS {self._real_path}'))
return self._real_path
cand_path = self.path
if os.path.exists(self.path):
self._real_path = cand_path
return cand_path
for og_p in orig_prefices:
for new_p in new_prefices:
new_cand_path = cand_path.replace(og_p, new_p)
logger.debug(pfm(f'TRYING PATH AS {new_cand_path}'))
if os.path.exists(new_cand_path):
self._real_path = new_cand_path
logger.debug(pfm(f'FOUND REAL PATH AS {new_cand_path}'))
return new_cand_path