commit 4cbdb00e14c71c7c9954e4610ecc4ab7544773ef Author: Ishan S. Patel Date: Thu Sep 4 22:48:15 2025 -0400 YACWC diff --git a/.track.py.swp b/.track.py.swp new file mode 100644 index 0000000..2a45726 Binary files /dev/null and b/.track.py.swp differ diff --git a/__init__.py b/__init__.py new file mode 100755 index 0000000..71d1704 --- /dev/null +++ b/__init__.py @@ -0,0 +1,2 @@ +from .data_structures import CircularDict +#from CommonCode import util diff --git a/__pycache__/__init__.cpython-310.pyc b/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 0000000..845686d Binary files /dev/null and b/__pycache__/__init__.cpython-310.pyc differ diff --git a/__pycache__/__init__.cpython-311.pyc b/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000..131f617 Binary files /dev/null and b/__pycache__/__init__.cpython-311.pyc differ diff --git a/__pycache__/__init__.cpython-312.pyc b/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..540fcb2 Binary files /dev/null and b/__pycache__/__init__.cpython-312.pyc differ diff --git a/__pycache__/__init__.cpython-313.pyc b/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..6703add Binary files /dev/null and b/__pycache__/__init__.cpython-313.pyc differ diff --git a/__pycache__/data_structures.cpython-310.pyc b/__pycache__/data_structures.cpython-310.pyc new file mode 100644 index 0000000..c7a7d14 Binary files /dev/null and b/__pycache__/data_structures.cpython-310.pyc differ diff --git a/__pycache__/data_structures.cpython-311.pyc b/__pycache__/data_structures.cpython-311.pyc new file mode 100644 index 0000000..eb0cb22 Binary files /dev/null and b/__pycache__/data_structures.cpython-311.pyc differ diff --git a/__pycache__/data_structures.cpython-312.pyc b/__pycache__/data_structures.cpython-312.pyc new file mode 100644 index 0000000..52e80a2 Binary files /dev/null and b/__pycache__/data_structures.cpython-312.pyc differ diff --git a/__pycache__/data_structures.cpython-313.pyc b/__pycache__/data_structures.cpython-313.pyc new file mode 100644 index 0000000..3b284bf Binary files /dev/null and b/__pycache__/data_structures.cpython-313.pyc differ diff --git a/__pycache__/db_conn.cpython-310.pyc b/__pycache__/db_conn.cpython-310.pyc new file mode 100644 index 0000000..b07631f Binary files /dev/null and b/__pycache__/db_conn.cpython-310.pyc differ diff --git a/__pycache__/db_conn.cpython-311.pyc b/__pycache__/db_conn.cpython-311.pyc new file mode 100644 index 0000000..cd1aa54 Binary files /dev/null and b/__pycache__/db_conn.cpython-311.pyc differ diff --git a/__pycache__/db_conn.cpython-312.pyc b/__pycache__/db_conn.cpython-312.pyc new file mode 100644 index 0000000..494cb90 Binary files /dev/null and b/__pycache__/db_conn.cpython-312.pyc differ diff --git a/__pycache__/db_conn.cpython-313.pyc b/__pycache__/db_conn.cpython-313.pyc new file mode 100644 index 0000000..5a658b1 Binary files /dev/null and b/__pycache__/db_conn.cpython-313.pyc differ diff --git a/__pycache__/kwq.cpython-310.pyc b/__pycache__/kwq.cpython-310.pyc new file mode 100644 index 0000000..5434fb4 Binary files /dev/null and b/__pycache__/kwq.cpython-310.pyc differ diff --git a/__pycache__/kwq.cpython-312.pyc b/__pycache__/kwq.cpython-312.pyc new file mode 100644 index 0000000..9cb26c5 Binary files /dev/null and b/__pycache__/kwq.cpython-312.pyc differ diff --git a/__pycache__/kwq.cpython-313.pyc b/__pycache__/kwq.cpython-313.pyc new file mode 100644 index 0000000..9f2d65f Binary files /dev/null and b/__pycache__/kwq.cpython-313.pyc differ diff --git a/__pycache__/naming.cpython-311.pyc b/__pycache__/naming.cpython-311.pyc new file mode 100644 index 0000000..dc28e6f Binary files /dev/null and b/__pycache__/naming.cpython-311.pyc differ diff --git a/__pycache__/naming.cpython-312.pyc b/__pycache__/naming.cpython-312.pyc new file mode 100644 index 0000000..35b7306 Binary files /dev/null and b/__pycache__/naming.cpython-312.pyc differ diff --git a/__pycache__/settings.cpython-310.pyc b/__pycache__/settings.cpython-310.pyc new file mode 100644 index 0000000..9e81c05 Binary files /dev/null and b/__pycache__/settings.cpython-310.pyc differ diff --git a/__pycache__/settings.cpython-312.pyc b/__pycache__/settings.cpython-312.pyc new file mode 100644 index 0000000..8a68dc1 Binary files /dev/null and b/__pycache__/settings.cpython-312.pyc differ diff --git a/__pycache__/settings.cpython-313.pyc b/__pycache__/settings.cpython-313.pyc new file mode 100644 index 0000000..1be1650 Binary files /dev/null and b/__pycache__/settings.cpython-313.pyc differ diff --git a/__pycache__/util.cpython-310.pyc b/__pycache__/util.cpython-310.pyc new file mode 100644 index 0000000..07205e1 Binary files /dev/null and b/__pycache__/util.cpython-310.pyc differ diff --git a/__pycache__/util.cpython-311.pyc b/__pycache__/util.cpython-311.pyc new file mode 100644 index 0000000..054849b Binary files /dev/null and b/__pycache__/util.cpython-311.pyc differ diff --git a/__pycache__/util.cpython-312.pyc b/__pycache__/util.cpython-312.pyc new file mode 100644 index 0000000..349b8d7 Binary files /dev/null and b/__pycache__/util.cpython-312.pyc differ diff --git a/__pycache__/util.cpython-313.pyc b/__pycache__/util.cpython-313.pyc new file mode 100644 index 0000000..f8ed45b Binary files /dev/null and b/__pycache__/util.cpython-313.pyc differ diff --git a/__pycache__/video_meta.cpython-312.pyc b/__pycache__/video_meta.cpython-312.pyc new file mode 100644 index 0000000..2faf677 Binary files /dev/null and b/__pycache__/video_meta.cpython-312.pyc differ diff --git a/__pycache__/video_meta.cpython-313.pyc b/__pycache__/video_meta.cpython-313.pyc new file mode 100644 index 0000000..c128990 Binary files /dev/null and b/__pycache__/video_meta.cpython-313.pyc differ diff --git a/__pycache__/wq.cpython-310.pyc b/__pycache__/wq.cpython-310.pyc new file mode 100644 index 0000000..5610b1f Binary files /dev/null and b/__pycache__/wq.cpython-310.pyc differ diff --git a/__pycache__/wq.cpython-312.pyc b/__pycache__/wq.cpython-312.pyc new file mode 100644 index 0000000..cb909be Binary files /dev/null and b/__pycache__/wq.cpython-312.pyc differ diff --git a/__pycache__/wq.cpython-313 (SFConflict ispatel@live.com 2025-04-15-10-12-32).pyc b/__pycache__/wq.cpython-313 (SFConflict ispatel@live.com 2025-04-15-10-12-32).pyc new file mode 100644 index 0000000..ceed374 Binary files /dev/null and b/__pycache__/wq.cpython-313 (SFConflict ispatel@live.com 2025-04-15-10-12-32).pyc differ diff --git a/__pycache__/wq.cpython-313.pyc b/__pycache__/wq.cpython-313.pyc new file mode 100644 index 0000000..2023256 Binary files /dev/null and b/__pycache__/wq.cpython-313.pyc differ diff --git a/baseObjects.py b/baseObjects.py new file mode 100755 index 0000000..417ef57 --- /dev/null +++ b/baseObjects.py @@ -0,0 +1,15 @@ +from datetime import datetime, timedelta + +class BaseSensorPost(object): + def __init__(self, sq, sensor_type): + self.last_values = [None,None] + self.last_measurement = None + self.last_insert = datetime.now() + self.sq = sq + self.heartbeat_interval = 15*60 + self.sensor = sensor_type + + def insert_heartbeat(self, time_init, sensor): + ins = self.sq['t']['heartbeats'].insert().values( + when = time_init, what = sensor) + self.sq['s'].execute(ins) diff --git a/data_structures.py b/data_structures.py new file mode 100644 index 0000000..63d8232 --- /dev/null +++ b/data_structures.py @@ -0,0 +1,13 @@ + +from collections import UserDict, deque +class CircularDict(UserDict): + def __init__(self,init_data = None, max_len = 50): + self.data = init_data if init_data is not None else dict() + self.fkkeys = deque(self.data.keys(), max_len) + self.max_len = max_len + def __setitem__(self, k,v): + self.fkkeys.append(k) + self.data[k] = v + to_rem = self.data.keys() - set(self.fkkeys) + [self.data.pop(k) for k in to_rem] + diff --git a/db_conn.py b/db_conn.py new file mode 100755 index 0000000..6fd6c9a --- /dev/null +++ b/db_conn.py @@ -0,0 +1,356 @@ + + +#tables_meta = ['motionlogs', 'floor2temperature', 'horizontalwheel', 'verticalwheel','heartbeats', 'activities','food_weight','weight','daily_counts','hourly_temperature','food_dispenser', 'food_forager', 'touch_surface_grid'] +import inspect +import clickhouse_driver as ch +from functools import partial +import json + + + +# %% + +c_to_ch_dtype_map = {'uint8': 'UInt8', 'uint16':'UInt16', 'float32': 'Float32','datetime':'DateTime', 'point':'Point','str':'String', 'float':'Float32','int':'Int16'} +class db_api(): + def __init__(self): + host = '192.168.1.242' + host = update_host(do_loopback, host) + self.conn = ch.connect('clickhouse://'+host); + self.client = ch.Client(host=host) + self.cursor = self.conn.cursor() + kff = ['altitude','humidity','pressure','temperature','weight','hydroponics_ph','hydroponics_ec','hydroponics_rtd'] + + for kv in kff: + func_name = 'insert_'+kv + func_set = partial(self.generic_when_where, first_key = kv) + setattr(self, func_name, func_set) + + + def get_query_time_filtered(self, table, time_start, time_end): + cquery = f"select * from {table} where when > '{str(time_start)}' and when < '{str(time_end)}' order by when asc" + return cquery + + + def expand_to_list(self, vals): #dict of lists -> list of dicts + max_len = 1 + for k,v in vals.items(): + if isinstance(v,list): + max_len = max(max_len, len(v)) + + output = list() + for i in range(max_len): + output.append(dict()) + + for k,v in vals.items(): + if isinstance(v, list): + for idx, val in enumerate(v): + output[idx][k] = val + else: + for f in output: + f[k] = v + + return output + + def get_insert_statement(self,table_name, keys): + kvars = ', '.join(keys) + return f'INSERT INTO {table_name} ({kvars}) values ' + + def insert(self, dd, table, schema = None): + if schema is not None: + table = schema + '.' + table + + if isinstance(dd, dict): + insert_this = self.expand_to_list( dd) + else: + insert_this = dd + + keys = insert_this[0].keys() + insert_statement = self.get_insert_statement(table,keys) + + self.cursor.executemany(insert_statement, insert_this) + + + def query(self, query, no_results = False, **kwargs): + self.cursor.execute(query, **kwargs) + if no_results: + return None + else: + return self.cursor.fetchall() + def generic_when_where(self, v_in , when = None, where = None, first_key = None): + keys = [first_key,'where','when'] + table = 'nuggets.'+first_key + cb = locals() + cb[first_key] = v_in + dd = {x:cb[x] for x in keys} + + self.insert(dd, table) + + + def get_table_create_statement(self, schema, table_name, dtypes, insert_when = False, nullable = set()): + + entries = list() + if insert_when: + entries.append('`when` DateTime64(3)') + + for field_name, data_type in dtypes.items(): + ch_dtype = c_to_ch_dtype_map[data_type] + if field_name in nullable: + ch_dtype = 'Nullable('+ch_dtype+')' + entries.append( f' `{field_name}` {ch_dtype}') + + + dtype_text = ',\n'.join(entries) + + create_header = f'create table {schema}.{table_name} ' + + + create_footer = f' \nENGINE = ReplacingMergeTree\nORDER BY when\nSETTINGS index_granularity = 8192' + create_statement = create_header + '\n(\n' + dtype_text + '\n)\n' + create_footer + + return create_statement + + def check_if_table_exists(self, schema, table_name): + query = f'''SELECT count(*) as num_tables + FROM information_schema.tables where table_schema == '{schema}' and table_name == '{table_name}' ''' + + return self.query(query)[0][0]== 1 + + + def get_table_contents(self, table, make_map = False, make_inv_map = False): + query = f'select * from {table}' + self.cursor.execute(query); + results = self.cursor.fetchall() + + if make_map: + cm = dict() + + for f in results: + cm[f[0]] = f[1] + return cm + + if make_inv_map: + cm = dict() + for f in results: + cm[f[1]] = f[0] + return cm + + + return results + + + + # def insert_temperature(self, temperature, when = None, where = None): + # keys = ['temperature','who','when'] + # table = 'nuggets.temperature' + # cb = locals() + # dd = {x:cb[x] for x in keys} + # self.insert(dd, table, keys) + + + # def insert_pressure(self, pressure, when = None, where = None): + # keys = ['temperature','who','when'] + # table = 'nuggets.pressure' + # cb = locals() + # dd = {x:cb[x] for x in keys} + # self.insert(dd, table, keys) + + # def insert_weight(self, weight, when=None, where=None): + # keys = ['weight','who','when'] + # table = 'nuggets.weight' + # cb = locals() + # dd = {x:cb[x] for x in keys} + # self.insert(dd, table, keys) +from sqlalchemy.dialects import postgresql +from sqlalchemy.engine import create_engine +from sqlalchemy.schema import MetaData +from sqlalchemy.orm import Session +import os +import json +import socket +if socket.gethostname() == 'tree': + do_loopback = True +else: + do_loopback = False + +def update_host(do_loopback, ip_addr): + if do_loopback and ip_addr=='192.168.1.242': +# print('Updating host to be 127.0.0.1') + return '127.0.0.1' + else: + return ip_addr + +def connect( user='postgres', password='', db='nuggets', host='192.168.1.242', port=5432, echo = False): + + host = update_host(do_loopback, host) + + + from sqlalchemy.engine import create_engine + from sqlalchemy.schema import MetaData + from sqlalchemy.orm import Session + import os + import json + + if False: + + user='postgres' + password='' + db='nuggets' + host='192.168.1.242' + port=5432 + echo = False + + url = 'postgresql://{}:{}@{}:{}/{}' + url = url.format(user, password, host, port, db) + + conn = create_engine(url, connect_args={"application_name":"python_commoncode"}, echo = echo, future=True) + if db == 'nuggets': + schemas =['public','health','videos'] + elif db == 'winslow': + schemas=['body','notes','sensors','video'] + + + + metadata = MetaData() + for schema in schemas: + metadata.reflect(conn, schema,views=True) + + + session = Session(conn) + + tables = {k.split('.')[-1]:v for k,v in metadata.tables.items()} + + from sqlalchemy import text + + def execute_sub(query_in, has_data = None): + + if has_data is not None: + with conn.connect() as cox: + result = cox.execute(query_in, has_data) + try: + cox.commit() + except: + pass + + if hasattr(query_in, 'compile'): + + + compiled = query_in.compile(dialect=postgresql.dialect()) + + with conn.connect() as cox: + result = cox.execute(query_in, compiled.params) + try: + cox.commit() + except: + pass + + + + else: + with conn.connect() as cox: + result = cox.execute(text(query_in)) + try: + cox.commit() + except: + pass + + return result + + setattr(conn, 'execute',execute_sub) + + +# with conn.connect() as ce: + if True: + ce = conn + pid = ce.execute('select pg_backend_pid()').fetchall()[0][0] + if not os.path.exists('/dev/shm/pg/'): + try: + os.mkdir('/dev/shm/pg') + except: + pass +# os.chmod('/dev/shm/pg',0o777) + +# fname = '/dev/shm/pg/'+str(pid) + + details = list() + for x in inspect.stack(): + details.append({'filename':x.filename,'function':x.function,'lineno':x.lineno}) + + # with open(fname,'w') as ff: + # json.dump(details, ff, indent=4, sort_keys = False) +# os.chmod(fname,0o777) +# %% + + return {'s':session, 't':tables, 'c':conn, 'm':metadata} + + + + + +def old_connect( user='postgres', password='', db='nuggets', host='192.168.1.242', port=5432): + + if False: + user = 'postgres' + password='' + db='winslow' + host='192.168.1.242' + port=5432 + import time + + + + from sqlalchemy import and_, func, Table, MetaData, create_engine, inspect + from sqlalchemy.orm import Session, load_only + from sqlalchemy.ext.automap import automap_base + from sqlalchemy.pool import NullPool + + url = 'postgresql://{}:{}@{}:{}/{}' + url = url.format(user, password, host, port, db) + + + conn = create_engine(url, client_encoding='utf8',poolclass=NullPool, future=True) + + + def get_tables_in_schema(schema_name): + + + output = conn.execute("SELECT table_name FROM information_schema.tables WHERE table_schema='"+schema_name+"'").fetchall() + return [x[0] for x in output] + + + schemas = [x.strip() for x in conn.execute('show search_path').fetchall()[0][0].split(',')] + + + tables_meta = list() + for schema in schemas: + tables_meta.extend(get_tables_in_schema(schema)) + + + materialized_tables = [x[0] for x in conn.execute('select matviewname from pg_matviews')] + tables_meta.extend(materialized_tables) + + meta = MetaData(bind=conn)#, reflect = True) + meta.reflect(conn) + + + base = automap_base() + base.prepare(conn, reflect=True) + + session = Session(conn) + tables = dict() + + + + for table in tables_meta: + try: + tables[table] = Table(table, meta, #MetaData(), + autoload=False, autoload_with=conn) + except: + print(table, 'broke') + pass + + + + return {'s':session, 't':tables, 'c':conn, 'm':meta, 'b':base} + + + diff --git a/distanceTracker.py b/distanceTracker.py new file mode 100755 index 0000000..f324d3f --- /dev/null +++ b/distanceTracker.py @@ -0,0 +1,103 @@ +import sys +import serial +import time +import math +import traceback +from subprocess import check_output, call +import syslog +from db_conn import connect +import dateutil.parser +from datetime import timedelta, datetime +sys.path.append('/home/thebears/Nextcloud/Designs/NuggetTracker/CommonCode/') +from track import Temperature, VerticalWheel, ActivityLogger, Weight, BaseSensorPost, HorizontalWheel +import os +from sqlalchemy import func +import syslog +import redis + + + + +do_break = False +while True: + try: + sq = connect() + do_break = True; + except: + syslog.syslog("Failed to connect, waiting 1 second and trying again") + time.sleep(1) + + if do_break: + break + + +curr_time = datetime.now() +last_date = datetime(curr_time.year, curr_time.month, curr_time.day, 18, 0, 0) + +if last_date > curr_time: + curr_time = datetime.now() - timedelta(days=1) + last_date = datetime(curr_time.year, curr_time.month, curr_time.day, 18, 0, 0) + +r = redis.StrictRedis(host='192.168.1.242', port=6379, db=1) + + +os.makedirs('/dev/shm/winslow/', exist_ok=True) + +call(['chmod','777','/dev/shm/winslow/']) + + +res = sq['s'].query(func.count(sq['t']['horizontalwheel'].c.transittime)).filter(sq['t']['horizontalwheel'].c.timestamp > last_date).all() +total_counts = res[0][0] + +r.set('nugget_run_counts', total_counts) + + + + +#1500 turns is 1/4 mile + +threshold_for_next_mealie = 1500 +flipped_stuff = list() + +bins_per_day = 18 + + +last_iter = min([int(math.floor(total_counts / threshold_for_next_mealie)),bins_per_day]) + +while True: + do_rotate = False + next_date = last_date + timedelta(days = 1) + + if datetime.now() > next_date: + if next_date not in flipped_stuff: + do_rotate = True + + if do_rotate is True: + curr_time = datetime.now() + last_date = datetime(curr_time.year, curr_time.month, curr_time.day, 18, 0, 0) + flipped_stuff.append(last_date) + syslog.syslog('Resetting time to '+str(last_date)) + last_iter = 0 + r.set('nugget_run_counts', 0) + + + + +# res = sq['s'].query(func.count(sq['t']['horizontalwheel'].c.transittime)).filter(sq['t']['horizontalwheel'].c.timestamp > last_date).all() +# total_counts = res[0][0] + + total_counts = int(r.get('nugget_run_counts')) + curr_iter = min([int(math.floor(total_counts / threshold_for_next_mealie)),bins_per_day]) + syslog.syslog('Total: '+str(total_counts)+' Current bin: '+str(curr_iter)+ ' Redis Count:' + str(total_counts)) + + if curr_iter != last_iter: + last_iter = curr_iter + open('/dev/shm/winslow/mealies2_open.txt','w').close() + call(['chmod','777','/dev/shm/winslow']) + syslog.syslog("OPENING MEALIES") + + + + + + time.sleep(5) diff --git a/frozen b/frozen new file mode 100644 index 0000000..23f6c5d --- /dev/null +++ b/frozen @@ -0,0 +1,73 @@ +annotated-types==0.7.0 +anyio==4.10.0 +asttokens==3.0.0 +bounded-pool-executor==0.0.3 +certifi==2025.8.3 +charset-normalizer==3.4.3 +click==8.2.1 +colored==2.3.1 +decorator==5.2.1 +dnspython==2.7.0 +email-validator==2.3.0 +executing==2.2.1 +fastapi==0.116.1 +fastapi-cli==0.0.10 +fastapi-cloud-cli==0.1.5 +fastapi-server-session==0.0.1 +h11==0.16.0 +httpcore==1.0.9 +httptools==0.6.4 +httpx==0.28.1 +idna==3.10 +ipython==9.5.0 +ipython_pygments_lexers==1.1.1 +jedi==0.19.2 +Jinja2==3.1.6 +kafka-python==2.2.15 +lttb==0.3.2 +lxml==6.0.1 +markdown-it-py==4.0.0 +MarkupSafe==3.0.2 +matplotlib-inline==0.1.7 +mdurl==0.1.2 +## !! Could not determine repository location +-e /home/thebears/Seafile/Designs/Code/Python +numpy==1.26.4 +parso==0.8.5 +pexpect==4.9.0 +pqdm==0.2.0 +prompt_toolkit==3.0.52 +psutil==7.0.0 +ptyprocess==0.7.0 +pure_eval==0.2.3 +pydantic==2.11.7 +pydantic_core==2.33.2 +Pygments==2.19.2 +pystemd==0.13.4 +python-dotenv==1.1.1 +python-multipart==0.0.20 +python-systemd==0.0.9 +PyYAML==6.0.2 +redis==6.4.0 +redis-cli==1.0.1 +requests==2.32.5 +rich==14.1.0 +rich-toolkit==0.15.0 +rignore==0.6.4 +sentry-sdk==2.35.2 +shellingham==1.5.4 +sniffio==1.3.1 +stack-data==0.6.3 +starlette==0.47.3 +systemd-python @ git+https://github.com/systemd/python-systemd.git@903142423452c4dd18110b7f8a953dabb2031e49 +tqdm==4.67.1 +traitlets==5.14.3 +typer==0.17.3 +typing-inspection==0.4.1 +typing_extensions==4.15.0 +urllib3==2.5.0 +uvicorn==0.35.0 +uvloop==0.21.0 +watchfiles==1.1.0 +wcwidth==0.2.13 +websockets==15.0.1 diff --git a/get_amts.py b/get_amts.py new file mode 100755 index 0000000..0d9d528 --- /dev/null +++ b/get_amts.py @@ -0,0 +1,15 @@ +from db_conn import connect +from sqlalchemy import func +sys.path.append('/home/thebears/Nextcloud/Designs/NuggetTracker/CommonCode/') + +sq = connect() + +curr_time = datetime.now() +last_date = datetime(curr_time.year, curr_time.month, curr_time.day, 18, 0, 0) + + +last_beat = {i:last_date for i in range(4)} +threshold_for_next_mealie = 25 +min_time = 10; + +res = sq['s'].query(func.count(sq['t']['horizontalwheel'].c.transittime)).filter(sq['t']['horizontalwheel'].c.timestamp > last_date).all() diff --git a/kwq.py b/kwq.py new file mode 100644 index 0000000..b748a2a --- /dev/null +++ b/kwq.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python +from enum import Enum +import json +import sys +from kafka import KafkaProducer, KafkaConsumer + + +bootstrap_server = ["192.168.1.242:19092"] + + +class TOPICS(): + videos_to_score_detection = "videos_to_score_detection" + videos_scored_detection="videos_scored_detection" + videos_with_nuggets="videos_with_nuggets" + videos_no_json="videos_no_json" + videos_without_nuggets="videos_without_nuggets" + videos_embedding_in_db="videos_embed_in_db" + videos_embedding_in_db_fail = "videos_embed_in_db_fail" + + +serializer = lambda v: json.dumps(v).encode("utf-8") +deserializer = json.loads + +producer = KafkaProducer( + bootstrap_servers=bootstrap_server, + key_serializer=serializer, + value_serializer=serializer, + request_timeout_ms=15000, # 15s (keep small) + max_block_ms=10000, # 10s max blocking + metadata_max_age_ms=300000, + retry_backoff_ms=100, + linger_ms=5, + retries=3, + security_protocol='SASL_PLAINTEXT', # change to SASL_SSL if TLS is enabled + sasl_mechanism='SCRAM-SHA-256', + sasl_plain_username='superuser', + sasl_plain_password='marybear' +) + + +def create_consumer(group_id = None, client_id = None): + return KafkaConsumer( + bootstrap_servers=bootstrap_server, + key_deserializer=deserializer, + value_deserializer=deserializer, + enable_auto_commit = False, + group_id=group_id, + client_id = client_id, + auto_offset_reset = 'earliest', + security_protocol='SASL_PLAINTEXT', # change to SASL_SSL if TLS is enabled + sasl_mechanism='SCRAM-SHA-256', + sasl_plain_username='superuser', + sasl_plain_password='marybear' + ) diff --git a/kwq_test.py b/kwq_test.py new file mode 100644 index 0000000..f482a67 --- /dev/null +++ b/kwq_test.py @@ -0,0 +1,6 @@ +from CommonCode import kwq +topic = 'whatsup' +kwq.producer.send("hello",'world') +for x in range(100): + kwq.producer.send(topic, 'num_'+str(x)) + diff --git a/naming.py b/naming.py new file mode 100755 index 0000000..1b00376 --- /dev/null +++ b/naming.py @@ -0,0 +1,54 @@ + + + +class str_with_attr(str): + def __new__(cls, val): + obj = str.__new__(cls, val) + return obj + + +def get_floor_redis_keys(floor_num): + keys = dict() + + swa = str_with_attr + + keys['vis_status'] = swa( 'nugget_house_lighting_'+str(floor_num)+'_vis_status' ) + keys['vis_status'].default = 'off' + keys['vis_status'].dtype = str + + keys['ir_status'] = swa( 'nugget_house_lighting_'+str(floor_num)+'_ir_status' ) + keys['ir_status'].default = 'on' + keys['ir_status'].dtype = str + + keys['vis_intensity'] = swa( 'nugget_house_lighting_'+str(floor_num)+'_vis_intensity' ) + keys['vis_intensity'].default = 0 + keys['vis_intensity'].dtype = int + + keys['ir_intensity'] = swa( 'nugget_house_lighting_'+str(floor_num)+'_ir_intensity') + keys['ir_intensity'].default = 100 + keys['ir_intensity'].dtype = int + + + keys['has_changed'] = swa('nugget_house_lighting_'+str(floor_num)+'_has_changed') + keys['has_changed'].default = True + keys['has_changed'].dtype = bool + + return keys + + + +def get_redis_values(r, keys): + out = dict() + for k,v in keys.items(): + redis_val = r.get(v) + if redis_val is None: + redis_val = 'None' + else: + redis_val = v.dtype(redis_val.decode('UTF-8')) + + out[k] = redis_val + + return out + + + diff --git a/settings.py b/settings.py new file mode 100644 index 0000000..f718d22 --- /dev/null +++ b/settings.py @@ -0,0 +1,54 @@ +import logging +import sys, os +from systemd.journal import JournalHandler +from colored import stylize, fore, Style, back +from functools import partial +from colored import set_tty_aware + + +log_format = '%(asctime)s|%(levelname)s|%(filename)s⮞%(funcName)s⮞%(lineno)d|%(message)s' + + +def get_logger(_name, file_path = None, stdout=False, systemd=False, level = logging.DEBUG): + logger = logging.getLogger() + logger.handlers.clear() + env_level = os.getenv("LOGLEVEL") + if env_level is not None: + level = env_level + + if level is not None: + logger.setLevel(level) + + formatter = logging.Formatter(log_format) + if file_path is not None: + file_handler = logging.FileHandler(file_path, mode='a') + file_handler.setFormatter(formatter) + file_handler.setLevel(logging.DEBUG) + logger.addHandler(file_handler) + + if stdout: + stdout_handler = logging.StreamHandler(sys.stdout) + stdout_handler.setFormatter(formatter) + stdout_handler.setLevel(logging.INFO) + logger.addHandler(stdout_handler) + + if systemd: + systemd_handler = JournalHandler() + systemd_handler.setFormatter(formatter) + logger.addHandler(systemd_handler) + + return logger + + + +def stylize_interface(text, fore_256=None, back_256=None, style=None): + return fore(fore_256)+back(back_256)+style+text+Style.reset + +set_tty_aware(False) + +class LogColorize(object): + watch_and_fix_permissions = partial(stylize_interface,fore_256='red',back_256='white', style=Style.BOLD) + score_obj_det_orin = partial(stylize_interface, fore_256='red', back_256='green', style=Style.BOLD) + remove_without_nuggets = partial(stylize_interface, fore_256='red', back_256='grey_78', style=Style.BOLD) + embeds_in_db =partial(stylize_interface, fore_256='green', back_256='black', style=Style.BOLD) + video_meta = partial(stylize_interface, fore_256='blue', back_256='black', style=Style.BOLD) diff --git a/takeSnapshot.py b/takeSnapshot.py new file mode 100755 index 0000000..3d064e4 --- /dev/null +++ b/takeSnapshot.py @@ -0,0 +1,37 @@ +import os +from datetime import datetime, timedelta +import cv2 +import time + + +while True: + + + cdate = datetime.now() + try: + base_path = cdate.strftime('/home/thebears/Videos/Winslow/%Y%m%d/snapshots/') + prestr = base_path + cdate.strftime('%Y%m%d_%H%M%S') + + if not os.path.exists(base_path): + os.makedirs(base_path) + + + video_obj = dict() + video_obj[1] = cv2.VideoCapture('http://localhost:6082/frame.mjpg') + video_obj[2] = cv2.VideoCapture('http://localhost:6083/frame.mjpg') + video_obj[3] = cv2.VideoCapture('http://localhost:6084/frame.mjpg') + video_obj[4] = cv2.VideoCapture('http://localhost:6085/frame.mjpg') + video_obj[5] = cv2.VideoCapture('http://localhost:6086/frame.mjpg') + + + for idx, obj in video_obj.items(): + filepath = prestr + '_' + str(idx) +'.jpg' + print(filepath) + (success, image) = obj.read() + cv2.imwrite(filepath, image) + obj.release() + except Exception as E: + print(E) + + + time.sleep(15) diff --git a/test_dbconn.py b/test_dbconn.py new file mode 100644 index 0000000..d2cc374 --- /dev/null +++ b/test_dbconn.py @@ -0,0 +1 @@ +from CommonCode.db_conn import connect, db_api diff --git a/track.py b/track.py new file mode 100755 index 0000000..13e472c --- /dev/null +++ b/track.py @@ -0,0 +1,81 @@ +from datetime import datetime, timedelta +import redis +r = redis.StrictRedis(host='192.168.1.242', port=6379, db=1) +class BaseSensorPost(object): + def __init__(self, sq, sensor_type, table,**kwargs): + self.last_values = [None,None] + self.last_measurement = None + self.last_insert = datetime.utcnow() + self.sq = sq + self.heartbeat_interval = 15*60 + self.sensor = sensor_type + self.table = table; + self.kwargs = kwargs + + def insert_heartbeat(self, time_init): + return True +# ins = self.sq['t'][self.table].insert().values( +# when_timestamp = time_init, what = self.sensor) +# self.sq['s'].execute(ins) + + +class Temperature(BaseSensorPost): + + def insert_temperature(self, time_init, temperature): + if time_init is not None and temperature is not None: + self.sq.executemany('INSERT INTO nuggets.temperature (when, temperature, where) values ',[{'when':time_init, 'temperature':temperature, 'where':self.kwargs['where']}]) + + # ins = self.sq['t'][self.table].insert().values(temperature = temperature, when_timestamp = time_init, where_id=self.kwargs['where']) +# self.sq['s'].execute(ins) + + + + def parse(self, time_init,string): + string = string.strip(','); + string = string.split(':')[1]; + curr_temp = float(string) + self.insert_temperature(time_init, curr_temp) +# self.insert_temperature(time_init,string) + +# if (time_init - self.last_insert).seconds > self.heartbeat_interval: +# self.insert_heartbeat(time_init) +# self.last_insert = time_init + + + + +class Odometer(BaseSensorPost): + + def insert_speeds(self, time_init, transittime = None, heartbeat = False): + + if transittime is None or transittime < 0: + isclockwise = 0 + else: + isclockwise = 1 + if time_init is not None: + self.sq.executemany('INSERT INTO nuggets.odometer (when, who, speed, clockwise) values ',[{'when':time_init, 'who':self.kwargs['who'], 'speed':abs(transittime), 'clockwise':isclockwise}]) + + # ins = self.sq['t'][self.table].insert().values( +# when_timestamp = time_init, speed = abs(transittime), who_id = self.kwargs['who'], +# clockwise = isclockwise ) +# self.sq['s'].execute(ins) + + + def parse(self, time_init,string): + string = string.strip(','); + string = string.split(':')[1]; + string = string.split(',') + for speed, offset in zip(string[0::2],string[1::2]): + speed = int(speed) + if speed != 0: + time_actual = time_init + timedelta(milliseconds = int(offset)) + self.insert_speeds(time_actual, transittime = speed) + r.incr('nugget_run_counts') + + if (time_init - self.last_insert).seconds > self.heartbeat_interval: + self.insert_heartbeat(time_init ) + self.last_insert = time_init + + self.last_measurement = time_init + +# %% diff --git a/trackSerial.py b/trackSerial.py new file mode 100755 index 0000000..0437f24 --- /dev/null +++ b/trackSerial.py @@ -0,0 +1,129 @@ +import sys +import serial +import time +import traceback +import syslog +from db_conn import connect +import dateutil.parser +from datetime import timedelta, datetime, timezone +sys.path.append('/home/thebears/Nextcloud/Designs/NuggetTracker/CommonCode/') +from track import Temperature, VerticalWheel, ActivityLogger, Weight, BaseSensorPost, HorizontalWheel, Paired +import os + +sq = connect() + +commit_interval = 60 +code_heartbeat = 10*60 + +def now(): + return datetime.now(timezone.utc) + +last_commit = now() +last_heartbeat = now() + +func_ref = {'T:':Temperature(sq, 'floor2temperature'), 'V:':VerticalWheel(sq, 'verticalwheel'), 'M:':ActivityLogger(sq,'activities'), + 'W':Weight(sq,'food_weight'), 'S:':HorizontalWheel(sq, 'horizontalwheel'), 'D':ActivityLogger(sq,'food_dispenser'), 'A':Paired(sq,'food_forager')}; + +code_start = BaseSensorPost(sq, 'started') +code_start.insert_heartbeat(now()) + + +heartbeat = BaseSensorPost(sq, 'code') + +ser = serial.Serial('/dev/winslowMONITOR', 9600, timeout=None) +#ser = serial.Serial('/dev/ttyACM0',57600,timeout=None) +while True: + try: + if os.path.exists('/dev/shm/mealies_open.txt'): + os.remove('/dev/shm/mealies_open.txt') + ser.write(b'n') + + if os.path.exists('/dev/shm/mealies_close.txt'): + os.remove('/dev/shm/mealies_close.txt') + ser.write(b'o') + + if os.path.exists('/dev/shm/mealies_next.txt'): + os.remove('/dev/shm/mealies_next.txt') + ser.write(b'p') + + + + + + if os.path.exists('/dev/shm/mealies2_open.txt'): + os.remove('/dev/shm/mealies2_open.txt') + ser.write(b'c') + + if os.path.exists('/dev/shm/mealies2_reset.txt'): + os.remove('/dev/shm/mealies2_reset.txt') + ser.write(b'd') + + if os.path.exists('/dev/shm/mealies2_open_all.txt'): + os.remove('/dev/shm/mealies2_open_all.txt') + ser.write(b'e') + + + + + if os.path.exists('/dev/shm/mealies_reinit.txt'): + os.remove('/dev/shm/mealies_reinit.txt') + ser.write(b'q') + + + if os.path.exists('/dev/shm/i2c_reinit.txt'): + os.remove('/dev/shm/i2c_reinit.txt') + ser.write(b'r') + + if os.path.exists('/dev/shm/winslow/mealies2_reset.txt'): + os.remove('/dev/shm/winslow/mealies2_reset.txt') + ser.write(b'd') + + except: + e = traceback.format_exc() + print(e) + syslog.syslog(syslog.LOG_ERR,e) + + + + + try: + + data = ser.readline() + string = data.decode('UTF-8') + print(string) + syslog.syslog(syslog.LOG_INFO,string) + parse_str = string.strip() + time_init = now() + for startchars, function in func_ref.items(): + if parse_str.startswith(startchars): + print(parse_str, function) + function.parse(time_init,parse_str) + + + sq['s'].commit() + curr_time = now() + if (curr_time - last_heartbeat).seconds > code_heartbeat: + heartbeat.insert_heartbeat(curr_time) + + + + except: + e = traceback.format_exc() + print(e) + syslog.syslog(syslog.LOG_ERR,e) + + + + + + + + + + + + + + + + diff --git a/trackSerial_fullduplex (SFConflict ispatel@live.com 2025-06-22-10-59-32).py b/trackSerial_fullduplex (SFConflict ispatel@live.com 2025-06-22-10-59-32).py new file mode 100755 index 0000000..26917d8 --- /dev/null +++ b/trackSerial_fullduplex (SFConflict ispatel@live.com 2025-06-22-10-59-32).py @@ -0,0 +1,155 @@ +import sys +import serial +import time +import traceback +#from CommonCode.db_conn import connect +import dateutil.parser +from datetime import timedelta, datetime +from pytz import timezone +from CommonCode.track import Temperature, Odometer, BaseSensorPost +import os +import threading +import redis + +#sq = connect() +r = redis.StrictRedis(host='192.168.1.242', port=6379, db=1) + +def now(): + return datetime.now() +# return datetime.now(timezone('US/Eastern')) + +commit_interval = 60 +code_heartbeat = 10*60 + +last_commit = now() +last_heartbeat = now() + +#func_ref = {'T:':Temperature(sq, 'floor2temperature'), 'V:':VerticalWheel(sq, 'verticalwheel'), 'M:':ActivityLogger(sq,'activities'), +# 'W':Weight(sq,'food_weight'), 'S:':HorizontalWheel(sq, 'horizontalwheel_sonic'), 'D':ActivityLogger(sq,'food_dispenser'), 'A':Paired(sq,'food_forager')}; + + +from clickhouse_driver import connect +from datetime import datetime +conn = connect('clickhouse://192.168.1.242'); +sq = conn.cursor(); + + +func_ref = {'T':Temperature(sq, 'floor_3_temperature','temperature',where=1),'S':Odometer(sq,'i2c_odometer','odometer',who=4)} +code_start = BaseSensorPost(sq, 'started','heartbeats') +code_start.insert_heartbeat(now()) + + +heartbeat = BaseSensorPost(sq, 'code','heartbeats') + +ser = serial.Serial('/dev/serial_i2c', 9600, timeout=None) + +def write(): + while True: + time.sleep(0.2) + try: + if os.path.exists('/dev/shm/mealies_open.txt'): + os.remove('/dev/shm/mealies_open.txt') + ser.write(b'n') + + if os.path.exists('/dev/shm/mealies_close.txt'): + os.remove('/dev/shm/mealies_close.txt') + ser.write(b'o') + + if os.path.exists('/dev/shm/mealies_next.txt'): + os.remove('/dev/shm/mealies_next.txt') + ser.write(b'p') + + + + + + if os.path.exists('/dev/shm/winslow/mealies2_open.txt'): + os.remove('/dev/shm/winslow/mealies2_open.txt') + ser.write(b'c') + + if os.path.exists('/dev/shm/mealies2_reset.txt'): + os.remove('/dev/shm/mealies2_reset.txt') + ser.write(b'd') + + if os.path.exists('/dev/shm/mealies2_open_all.txt'): + os.remove('/dev/shm/mealies2_open_all.txt') + ser.write(b'e') + + + + + if os.path.exists('/dev/shm/mealies_reinit.txt'): + os.remove('/dev/shm/mealies_reinit.txt') + ser.write(b'q') + + + if os.path.exists('/dev/shm/i2c_reinit.txt'): + os.remove('/dev/shm/i2c_reinit.txt') + ser.write(b'r') + + if os.path.exists('/dev/shm/winslow/mealies2_reset.txt'): + os.remove('/dev/shm/winslow/mealies2_reset.txt') + ser.write(b'd') + + except: + e = traceback.format_exc() + print(e) + + +#t1 = threading.Thread(target=write, args=()) +#t1.start() + + +while True: + + # try: + if True: + time.sleep(0.2) + data = ser.readline() + string = data.decode('UTF-8') + print(string) + parse_str = string.strip() + time_init = now() + for startchars, function in func_ref.items(): + if parse_str.startswith(startchars): + r.set('i2c_'+startchars, str(time.time()) + ':' + parse_str) + try: + function.parse(time_init,parse_str) + except IndexError: + print('Index error') + except Exception as ff: + raise Exception(ff) + + if (time_init - last_commit).seconds > commit_interval: +# sq['s'].commit() + last_commit = time_init + +# sq['s'].commit() + curr_time = now() + if (curr_time - last_heartbeat).seconds > code_heartbeat: + heartbeat.insert_heartbeat(curr_time) + last_heartbeat = curr_time + + last_heartbeat = curr_time + + + +# except: +# e = traceback.format_exc() +# print(e) + + + + + + + + + + + + + + + + diff --git a/trackSerial_fullduplex.py b/trackSerial_fullduplex.py new file mode 100755 index 0000000..30a56e1 --- /dev/null +++ b/trackSerial_fullduplex.py @@ -0,0 +1,154 @@ +import sys +import serial +import time +import traceback +import syslog +#from CommonCode.db_conn import connect +import dateutil.parser +from datetime import timedelta, datetime +from pytz import timezone +from CommonCode.track import Temperature, Odometer, BaseSensorPost +import os +import threading +import redis + +#sq = connect() +r = redis.StrictRedis(host='192.168.1.242', port=6379, db=1) + +def now(): + return datetime.now() +# return datetime.now(timezone('US/Eastern')) + +commit_interval = 60 +code_heartbeat = 10*60 + +last_commit = now() +last_heartbeat = now() + +#func_ref = {'T:':Temperature(sq, 'floor2temperature'), 'V:':VerticalWheel(sq, 'verticalwheel'), 'M:':ActivityLogger(sq,'activities'), +# 'W':Weight(sq,'food_weight'), 'S:':HorizontalWheel(sq, 'horizontalwheel_sonic'), 'D':ActivityLogger(sq,'food_dispenser'), 'A':Paired(sq,'food_forager')}; + + +from clickhouse_driver import connect +from datetime import datetime +conn = connect('clickhouse://192.168.1.242'); +sq = conn.cursor(); + + +func_ref = {'T':Temperature(sq, 'floor_3_temperature','temperature',where=1),'S':Odometer(sq,'i2c_odometer','odometer',who=4)} +code_start = BaseSensorPost(sq, 'started','heartbeats') +code_start.insert_heartbeat(now()) + + +heartbeat = BaseSensorPost(sq, 'code','heartbeats') + +ser = serial.Serial('/dev/serial_i2c', 9600, timeout=None) + +def write(): + while True: + time.sleep(0.2) + try: + if os.path.exists('/dev/shm/mealies_open.txt'): + os.remove('/dev/shm/mealies_open.txt') + ser.write(b'n') + + if os.path.exists('/dev/shm/mealies_close.txt'): + os.remove('/dev/shm/mealies_close.txt') + ser.write(b'o') + + if os.path.exists('/dev/shm/mealies_next.txt'): + os.remove('/dev/shm/mealies_next.txt') + ser.write(b'p') + + + + + + if os.path.exists('/dev/shm/winslow/mealies2_open.txt'): + os.remove('/dev/shm/winslow/mealies2_open.txt') + ser.write(b'c') + + if os.path.exists('/dev/shm/mealies2_reset.txt'): + os.remove('/dev/shm/mealies2_reset.txt') + ser.write(b'd') + + if os.path.exists('/dev/shm/mealies2_open_all.txt'): + os.remove('/dev/shm/mealies2_open_all.txt') + ser.write(b'e') + + + + + if os.path.exists('/dev/shm/mealies_reinit.txt'): + os.remove('/dev/shm/mealies_reinit.txt') + ser.write(b'q') + + + if os.path.exists('/dev/shm/i2c_reinit.txt'): + os.remove('/dev/shm/i2c_reinit.txt') + ser.write(b'r') + + if os.path.exists('/dev/shm/winslow/mealies2_reset.txt'): + os.remove('/dev/shm/winslow/mealies2_reset.txt') + ser.write(b'd') + + except: + e = traceback.format_exc() + print(e) + syslog.syslog(syslog.LOG_ERR,e) + + +t1 = threading.Thread(target=write, args=()) +t1.start() + + +while True: + + try: +# if True: + time.sleep(0.2) + data = ser.readline() + string = data.decode('UTF-8') + print(string) + syslog.syslog(syslog.LOG_INFO,string) + parse_str = string.strip() + time_init = now() + for startchars, function in func_ref.items(): + if parse_str.startswith(startchars): + r.set('i2c_'+startchars, str(time.time()) + ':' + parse_str) + function.parse(time_init,parse_str) + + if (time_init - last_commit).seconds > commit_interval: +# sq['s'].commit() + last_commit = time_init + +# sq['s'].commit() + curr_time = now() + if (curr_time - last_heartbeat).seconds > code_heartbeat: + heartbeat.insert_heartbeat(curr_time) + last_heartbeat = curr_time + + last_heartbeat = curr_time + + + + except: + e = traceback.format_exc() + print(e) + syslog.syslog(syslog.LOG_ERR,e) + + + + + + + + + + + + + + + + diff --git a/util.py b/util.py new file mode 100644 index 0000000..6f6f364 --- /dev/null +++ b/util.py @@ -0,0 +1,112 @@ + +import glob +import json +import textwrap +import sys +import os +from collections import defaultdict +def json_per_row_to_json(fpath = '/home/thebears/Videos/Winslow/saved/marks'): + fid = open(fpath,'r') + d = list() + for x in fid: + d.append(json.loads(x)) + + json.dump(d,open(fpath+'.json','w'),indent=4) + + +def is_ipython(): + import __main__ as main_ff + if hasattr(main_ff,'__file__'): + return False + else: + return True + +def exit_if_not_ipython(): + import __main__ as main_ff + if hasattr(main_ff,'__file__'): + print('Running in non-interactive mode, exiting...') + sys.exit() + else: + print('Running in interactive mode, not exiting...') + + + +def append_json_no_file_read(obj_append, jpath = '/home/thebears/Videos/Winslow/saved/marks.json'): + fid = open(jpath,'r+') + fid.seek(0,2) + position = fid.tell() -2 + fid.seek(position) + fid.write(",\n") + fid.write(textwrap.indent(json.dumps(obj_append, indent=4),' ')) + fid.write('\n]') + fid.close() + + + +def get_cset_match(input_in): + if not os.path.exists(input_in): + return {} + + if os.path.isdir(input_in): + return get_cset_match_dir(input_in) + else: + dir_list = os.path.dirname(input_in) + summ = get_cset_match_dir(dir_list) + name, ext = os.path.splitext(input_in) + tstamp = int(name.replace('_trimmed','').split('_')[-1]) + return summ[tstamp] + +def get_cset_for_file_matching(file_in): + name, ext = os.path.splitext(file_in) + cset = dict() + files = glob.glob(name+"*") + for f in files: + [f_name, f_ext ] =f.split(".",1) + cset['.'+f_ext] = f + + return cset + + +def get_cset_match_dir(rtdir): + summary = dict() + + other_files = dict() + + for f in os.walk(rtdir): + c_rt = f[0] + c_files = f[2] + + for c_file in c_files: + name, ext = os.path.splitext(c_file) + try: + tstamp = int(name.replace('_trimmed','').split('_')[-1]) + if ext in {'.mp4'}: + summary[tstamp] = dict() + summary[tstamp][ext] = os.path.join(c_rt, c_file) + else: + if tstamp not in other_files: + other_files[tstamp] = dict() + other_files[tstamp][ext] = os.path.join(c_rt, c_file) + except: + pass + + mp4_tstamps = sorted(summary) + other_tstamps = sorted(other_files) + idx_other = 0 + + merge_map = defaultdict(list) + merged = 0 + for idx_mp4, m_t in enumerate(mp4_tstamps): +# for idx_o_test in range(len(other_tstamps))) + for idx_o_test, o_t in enumerate(other_tstamps): + if abs(m_t - o_t) < 25: + merge_map[m_t].append(o_t) + + + + for k, v in merge_map.items(): + for v_join in v: + summary[k].update(other_files[v_join]) + + + return summary diff --git a/video_meta.py b/video_meta.py new file mode 100644 index 0000000..24a0dbb --- /dev/null +++ b/video_meta.py @@ -0,0 +1,199 @@ +import datetime as dt +import os +import numpy as np +import subprocess +import pickle +from CommonCode.settings import get_logger, LogColorize +import logging +import numpy +pfm = LogColorize.video_meta + +if not ('__file__' in vars() or '__file__' in globals()): + __file__ = '/home/thebears/Seafile/Designs/Code/Python/VideoProcessing/VideoMeta.py' + +logger = get_logger(__name__, stdout=True, systemd=False) + +orig_prefices = ("/srv/ftp", "/srv/ftp_tcc") +new_prefices = "/mnt/hdd_24tb_1/videos/ftp" + + +def get_info_from_ftp_filename(cpath): + date_str_format = "%Y%m%d%H%M%S" + bname = os.path.basename(cpath) + froot, ext = os.path.splitext(bname) + cam_name, index, time_str = froot.split("_") + time_stamp = dt.datetime.strptime(time_str, date_str_format) + file_info = { + "name": cam_name, + "index": index, + "timestamp": time_stamp, + "path": os.path.abspath(cpath), + } + return file_info + +def get_cache_loc(cpath): + fpathroot, ext = os.path.splitext(cpath) + return fpathroot + '.timestamp_cache' + + +class FTPVideo: + def __init__(self, cpath): + self.cpath = cpath + self.file_info = get_info_from_ftp_filename(cpath) + self._real_path = None + self._frame_info = None + self._embeds = None + self._embed_scores = None + + def __lt__(self, other): + comp_val = other; + if hasattr(other, 'timestamp'): + comp_val = other.timestamp + + return self.timestamp < comp_val + + @staticmethod + def vec_norm(vec_in): + return vec_in / np.linalg.norm(vec_in, axis=1)[:,None] + + @property + def embed_scores(self): + return self._embed_scores + + def attach_embedding_score(self, scores): + self._embed_scores = scores + + + @property + def embeddings(self): + embeds_path = os.path.splitext(self.real_path)[0] + '.oclip_embeds.npz' + if not os.path.exists(embeds_path): + return None + + if self._embeds is None: + npz_contents = np.load(embeds_path) + self._embeds = npz_contents + + npz_contents = self._embeds + + ret_dict = {}; + ret_dict['embeds'] = npz_contents['embeds'] + + ret_dict['frame_numbers'] = [int(x) for x in npz_contents['frame_numbers']] + ret_dict['frame_offsets'] = [self.frames_info[x]['offset'] for x in ret_dict['frame_numbers']] + ret_dict['frame_time'] = [self.frames_info[x]['time'] for x in ret_dict['frame_numbers']] + e_scores = self.embed_scores + if self.embed_scores is not None: + ret_dict['embed_scores'] = e_scores + + + return ret_dict + + def try_cache_read(self): + cache_loc = get_cache_loc(self.real_path) + if os.path.exists(cache_loc): + logger.info(pfm(f'ATTEMPT READING FROM CACHE: {cache_loc}')) + try: + with open(cache_loc,'rb') as ff: + data = pickle.load(ff) + logger.info(pfm(f'READ FROM CACHE: {cache_loc}')) + return data + except Exception as e: + logger.warn(pfm(f'READ FROM CACHE FAILED: {e} while reading {cache_loc}')) + + + return None + + def try_cache_write(self, data): + cache_loc = get_cache_loc(self.real_path) + logger.info(pfm(f'ATTEMPTING WRITING TO CACHE: {cache_loc}')) + try: + with open(cache_loc, 'wb') as ff: + pickle.dump(data, ff) + logger.info(pfm(f'WROTE TO CACHE: {cache_loc}')) + except Exception as e: + logger.warn(pfm(f'WRITE TO CACHE FAILED: {e} while writing {cache_loc}')) + + + @property + def frames_info(self): + if self._frame_info is None: + self._frame_info = self.try_cache_read(); + + if self._frame_info is not None: + return self._frame_info + + self._frame_info = self.get_frames_info() + self.try_cache_write(self._frame_info) + + return self._frame_info + + def get_frames_info(self): + if self._frame_info is not None: + return self._frame_info + + fpath = self.real_path + cmd = f"ffprobe -select_streams v:0 -show_entries packet=pts_time,flags -of csv {fpath}" + logger.info(pfm(f'RUNNING FFPROBE FOR {fpath}')) + + try: + out = subprocess.check_output(cmd.split(), stderr=subprocess.DEVNULL) + except Exception as e: + logger.warn(pfm(f'RUNNING FFPROBE FAILED FOR {e} on {fpath}')) + + logger.info(pfm(f'RAN FFPROBE SUCCESSFULLY FOR {fpath}')) + + timestamps = list() + for line in out.decode("UTF-8").split("\n"): + if "," not in line: + continue + + _, fr_s, fr_type = line.split(",") + fr_s = float(fr_s) + + timestamps.append( + { + "offset": fr_s, + "type": fr_type, + "time": dt.timedelta(seconds=fr_s) + self.timestamp, + } + ) + + self._frame_info = timestamps + return timestamps + + @property + def timestamp(self): + return self.file_info["timestamp"] + + @property + def camera_name(self): + return self.file_info["name"] + + @property + def path(self): + return self.cpath + + @property + def real_path(self): + if self._real_path is not None: + logger.debug(pfm(f'FOUND REAL PATH AS {self._real_path}')) + return self._real_path + + + + cand_path = self.path + if os.path.exists(self.path): + self._real_path = cand_path + return cand_path + for og_p in orig_prefices: + for new_p in new_prefices: + new_cand_path = cand_path.replace(og_p, new_p) + logger.debug(pfm(f'TRYING PATH AS {new_cand_path}')) + if os.path.exists(new_cand_path): + self._real_path = new_cand_path + logger.debug(pfm(f'FOUND REAL PATH AS {new_cand_path}')) + return new_cand_path + + + diff --git a/wq.py b/wq.py new file mode 100644 index 0000000..56a9d24 --- /dev/null +++ b/wq.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python +import redis, pickle +from enum import Enum +import json + +class TOPICS(Enum): + ml_vision_to_score = 'ml_vision_to_score' + ml_vision_to_score_ext = 'ml_vision_to_score_ext' + ml_vision_objdet_failed = 'ml_vision_objdet_failed' + ml_vision_objdet_success = 'ml_vision_objdet_success' + ml_vision_objdet_skipped = 'ml_vision_objdet_skipped' + ml_vision_objdet_results_db_success = 'ml_vision_objdet_db_upload_success' + ml_vision_objdet_results_db_failed = 'ml_vision_objdet_db_upload_failed' + ml_vision_objdet_results_pg_success = 'ml_vision_objdet_pg_upload_success' + ml_vision_objdet_results_pg_failed = 'ml_vision_objdet_pg_upload_failed' + ml_vision_objdet_results_purge_success = 'ml_vision_objdet_purge_success' + ml_vision_objdet_results_purge_failed = 'ml_vision_objdet_purge_failed' + ml_vision_objdet_results_purge_skipped = 'ml_vision_objdet_purge_skipped' + ml_vision_videos_modify_success = 'ml_vision_videos_modify_success' + ml_vision_videos_modify_failed = 'ml_vision_videos_modify_failed' + ml_vision_embedding_success = 'ml_vision_embedding_success' + ml_vision_embedding_fail = 'ml_vision_embedding_fail' + ml_vision_embedding_skipped = 'ml_vision_embedding_skipped' + + +r = redis.StrictRedis() + +def publish(topic, message): + if isinstance(topic, TOPICS): + topic = topic.value + + + if isinstance(message, str): + r.rpush(topic, message) + else: + r.rpush(topic, json.dumps(message)) + + +def parse_message(msg): + try: + return json.loads(msg) + except: + return msg.decode() + +def consume(topic): + if isinstance(topic, TOPICS): + topic = topic.value + + msg = r.rpop(topic) + if msg is None: + return None + else: + return parse_message(msg) + +def peek(topic): + if isinstance(topic, TOPICS): + topic = topic.value + + vals = r.lrange(topic,0, 0) + if len(vals) == 0: + return None + else: + return parse_message(vals[0]) + +# from kafka import KafkaProducer, KafkaConsumer +# import pickle + +# producer = KafkaProducer(bootstrap_servers=["192.168.1.242:9092"]) +# consumers = dict() +# # %% + +# def publish(topic, message): +# producer.send(topic, value=pickle.dumps(message)) +# producer.flush() + + +# def get_consumer_by_topic(topic): +# if topic in consumers: +# return consumers[topic] + +# csumer = KafkaConsumer( +# bootstrap_servers="localhost:9092", +# group_id="wq", +# auto_offset_reset="earliest", +# enable_auto_commit=True, +# consumer_timeout_ms=1000, +# ) +# csumer.subscribe(topic) +# consumers[topic] = csumer + + +# def get_a_message(topic): +# csumer = get_consumer_by_topic(topic) +# return csumer + +