bump
This commit is contained in:
BIN
__pycache__/deepstream_obj_det.cpython-310.pyc
Normal file
BIN
__pycache__/deepstream_obj_det.cpython-310.pyc
Normal file
Binary file not shown.
370
deepstream_obj_det.py
Executable file
370
deepstream_obj_det.py
Executable file
@@ -0,0 +1,370 @@
|
||||
|
||||
import sys
|
||||
sys.path.append('/opt/nvidia/deepstream/deepstream/sources/deepstream_python_apps/apps')
|
||||
import os
|
||||
import gi
|
||||
gi.require_version('Gst', '1.0')
|
||||
from gi.repository import GLib, Gst
|
||||
from common.platform_info import PlatformInfo
|
||||
from common.bus_call import bus_call
|
||||
import numpy as np
|
||||
import ctypes
|
||||
import pyds
|
||||
from functools import partial
|
||||
from CommonCode.settings import get_logger, LogColorize
|
||||
import argparse
|
||||
pfm = LogColorize.watch_and_fix_permissions
|
||||
|
||||
logger = get_logger(__name__,'/var/log/ml_vision_logs/00_watch_and_fix_permissions', stdout=True, systemd=False)
|
||||
|
||||
|
||||
target_width = 1280
|
||||
target_height = 720
|
||||
os.environ.pop("DISPLAY",None)
|
||||
MUXER_BATCH_TIMEOUT_USEC = 1000000
|
||||
def embedder_results_probe(pad,info,u_data, list_add, frame_num = 0):
|
||||
gst_buffer = info.get_buffer()
|
||||
if not gst_buffer:
|
||||
print("Unable to get GstBuffer ")
|
||||
return
|
||||
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
|
||||
l_frame = batch_meta.frame_meta_list
|
||||
|
||||
while l_frame is not None:
|
||||
try:
|
||||
# Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
|
||||
# The casting also keeps ownership of the underlying memory
|
||||
# in the C code, so the Python garbage collector will leave
|
||||
# it alone.
|
||||
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
|
||||
except StopIteration:
|
||||
break
|
||||
|
||||
frame_number=frame_meta.frame_num
|
||||
|
||||
|
||||
l_user = frame_meta.frame_user_meta_list
|
||||
while l_user is not None:
|
||||
try:
|
||||
# Note that l_user.data needs a cast to pyds.NvDsUserMeta
|
||||
# The casting also keeps ownership of the underlying memory
|
||||
# in the C code, so the Python garbage collector will leave
|
||||
# it alone.
|
||||
user_meta = pyds.NvDsUserMeta.cast(l_user.data)
|
||||
except StopIteration:
|
||||
break
|
||||
|
||||
if (
|
||||
user_meta.base_meta.meta_type
|
||||
!= pyds.NvDsMetaType.NVDSINFER_TENSOR_OUTPUT_META
|
||||
):
|
||||
continue
|
||||
|
||||
tensor_meta = pyds.NvDsInferTensorMeta.cast(user_meta.user_meta_data)
|
||||
|
||||
# Boxes in the tensor meta should be in network resolution which is
|
||||
# found in tensor_meta.network_info. Use this info to scale boxes to
|
||||
# the input frame resolution.
|
||||
layers_info = []
|
||||
if True:
|
||||
for i in range(tensor_meta.num_output_layers):
|
||||
layer = pyds.get_nvds_LayerInfo(tensor_meta, i)
|
||||
if layer.layerName=='output0':
|
||||
|
||||
ptr = ctypes.cast(pyds.get_ptr(layer.buffer), ctypes.POINTER(ctypes.c_float))
|
||||
num_elements = layer.inferDims.numElements
|
||||
v = list(np.ctypeslib.as_array(ptr, shape=(num_elements,)))
|
||||
v = [float(x) for x in v]
|
||||
|
||||
list_add.append({'frame_number':frame_number, 'vector':v})
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
try:
|
||||
l_user = l_user.next
|
||||
except StopIteration:
|
||||
break
|
||||
|
||||
try:
|
||||
# indicate inference is performed on the frame
|
||||
frame_meta.bInferDone = True
|
||||
l_frame = l_frame.next
|
||||
except StopIteration:
|
||||
break
|
||||
|
||||
|
||||
return Gst.PadProbeReturn.OK
|
||||
|
||||
|
||||
|
||||
|
||||
def detector_results_probe(pad,info,u_data, list_add, frame_num = 0):
|
||||
frame_number=0
|
||||
num_rects=0
|
||||
got_fps = False
|
||||
|
||||
gst_buffer = info.get_buffer()
|
||||
if not gst_buffer:
|
||||
print("Unable to get GstBuffer ")
|
||||
return
|
||||
|
||||
|
||||
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
|
||||
l_frame = batch_meta.frame_meta_list
|
||||
|
||||
while l_frame is not None:
|
||||
try:
|
||||
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
|
||||
except StopIteration:
|
||||
break
|
||||
|
||||
frame_number=frame_meta.frame_num
|
||||
l_obj=frame_meta.obj_meta_list
|
||||
num_rects = frame_meta.num_obj_meta
|
||||
|
||||
l_user = frame_meta.frame_user_meta_list
|
||||
|
||||
|
||||
|
||||
while l_obj is not None:
|
||||
try:
|
||||
# Casting l_obj.data to pyds.NvDsObjectMeta
|
||||
obj_meta=pyds.NvDsObjectMeta.cast(l_obj.data)
|
||||
except StopIteration:
|
||||
break
|
||||
|
||||
# param_extract = ['left','top','width','height']
|
||||
# strc = ''
|
||||
# for param in param_extract:
|
||||
# strc+=str(getattr(obj_meta.rect_params, param))
|
||||
# strc+=' '
|
||||
|
||||
# target_width
|
||||
# target_height
|
||||
score = obj_meta.confidence
|
||||
label = obj_meta.obj_label
|
||||
left = obj_meta.rect_params.left
|
||||
top = obj_meta.rect_params.top
|
||||
width = obj_meta.rect_params.width
|
||||
height = obj_meta.rect_params.height
|
||||
frame_number = frame_number
|
||||
class_id = obj_meta.class_id
|
||||
|
||||
d_add = {'score':score, 'label':label, 'left':left, 'top':top, 'width':width, 'height':height, 'frame_number':frame_number, 'class_id': class_id}
|
||||
list_add.append(d_add)
|
||||
|
||||
if frame_number % 100 == 0:
|
||||
str_pr = 'FRAME_PROGRESS: '+pfm(str(frame_number) + '/' + str(frame_num))
|
||||
logger.info(str_pr)
|
||||
|
||||
|
||||
try:
|
||||
l_obj=l_obj.next
|
||||
except StopIteration:
|
||||
break
|
||||
|
||||
# Update frame rate through this probe
|
||||
stream_index = "stream{0}".format(frame_meta.pad_index)
|
||||
|
||||
try:
|
||||
l_frame=l_frame.next
|
||||
except StopIteration:
|
||||
break
|
||||
|
||||
return Gst.PadProbeReturn.OK
|
||||
|
||||
|
||||
|
||||
def cb_newpad(decodebin, decoder_src_pad,data):
|
||||
print("In cb_newpad\n")
|
||||
caps=decoder_src_pad.get_current_caps()
|
||||
if not caps:
|
||||
caps = decoder_src_pad.query_caps()
|
||||
gststruct=caps.get_structure(0)
|
||||
gstname=gststruct.get_name()
|
||||
source_bin=data
|
||||
features=caps.get_features(0)
|
||||
|
||||
# Need to check if the pad created by the decodebin is for video and not
|
||||
# audio.
|
||||
print("gstname=",gstname)
|
||||
if(gstname.find("video")!=-1):
|
||||
# Link the decodebin pad only if decodebin has picked nvidia
|
||||
# decoder plugin nvdec_*. We do this by checking if the pad caps contain
|
||||
# NVMM memory features.
|
||||
print("features=",features)
|
||||
if features.contains("memory:NVMM"):
|
||||
# Get the source bin ghost pad
|
||||
bin_ghost_pad=source_bin.get_static_pad("src")
|
||||
if not bin_ghost_pad.set_target(decoder_src_pad):
|
||||
sys.stderr.write("Failed to link decoder src pad to source bin ghost pad\n")
|
||||
else:
|
||||
sys.stderr.write(" Error: Decodebin did not pick nvidia decoder plugin.\n")
|
||||
|
||||
def decodebin_child_added(child_proxy,Object,name,user_data):
|
||||
print("Decodebin child added:", name, "\n")
|
||||
if(name.find("decodebin") != -1):
|
||||
Object.connect("child-added",decodebin_child_added,user_data)
|
||||
|
||||
if "source" in name:
|
||||
source_element = child_proxy.get_by_name("source")
|
||||
if source_element.find_property('drop-on-latency') != None:
|
||||
Object.set_property("drop-on-latency", True)
|
||||
|
||||
|
||||
def create_source_bin(uri):
|
||||
print("Creating source bin")
|
||||
|
||||
# Create a source GstBin to abstract this bin's content from the rest of the
|
||||
# pipeline
|
||||
bin_name="source-bin-any-format"
|
||||
print(bin_name)
|
||||
nbin=Gst.Bin.new(bin_name)
|
||||
if not nbin:
|
||||
sys.stderr.write(" Unable to create source bin \n")
|
||||
|
||||
# Source element for reading from the uri.
|
||||
# We will use decodebin and let it figure out the container format of the
|
||||
# stream and the codec and plug the appropriate demux and decode plugins.
|
||||
uri_decode_bin=Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
|
||||
if not uri_decode_bin:
|
||||
sys.stderr.write(" Unable to create uri decode bin \n")
|
||||
# We set the input uri to the source element
|
||||
uri_decode_bin.set_property("uri",uri)
|
||||
# Connect to the "pad-added" signal of the decodebin which generates a
|
||||
# callback once a new pad for raw data has beed created by the decodebin
|
||||
uri_decode_bin.connect("pad-added",cb_newpad,nbin)
|
||||
uri_decode_bin.connect("child-added",decodebin_child_added,nbin)
|
||||
|
||||
# We need to create a ghost pad for the source bin which will act as a proxy
|
||||
# for the video decoder src pad. The ghost pad will not have a target right
|
||||
# now. Once the decode bin creates the video decoder and generates the
|
||||
# cb_newpad callback, we will set the ghost pad target to the video decoder
|
||||
# src pad.
|
||||
Gst.Bin.add(nbin,uri_decode_bin)
|
||||
bin_pad=nbin.add_pad(Gst.GhostPad.new_no_target("src",Gst.PadDirection.SRC))
|
||||
if not bin_pad:
|
||||
sys.stderr.write(" Failed to add ghost pad in source bin \n")
|
||||
return None
|
||||
return nbin
|
||||
|
||||
|
||||
def run_inference(file_path):
|
||||
os.environ.pop("DISPLAY",None)
|
||||
if not file_path.startswith('file://'):
|
||||
file_path = 'file://'+file_path
|
||||
|
||||
platform_info = PlatformInfo()
|
||||
Gst.init(None)
|
||||
|
||||
pipeline = Gst.Pipeline()
|
||||
|
||||
streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
|
||||
|
||||
nugget_detector = Gst.ElementFactory.make("nvinfer", "primary-inference")
|
||||
nugget_embedder = Gst.ElementFactory.make("nvinfer", "secondary-inference")
|
||||
|
||||
streammux.set_property('width', target_width)
|
||||
streammux.set_property('height', target_height)
|
||||
streammux.set_property('batched-push-timeout', MUXER_BATCH_TIMEOUT_USEC)
|
||||
streammux.set_property('enable-padding',1)
|
||||
streammux.set_property('batch-size', 1)
|
||||
|
||||
|
||||
nugget_detector.set_property('config-file-path', "/home/thebears/DeepStream-Yolo/detector.txt")
|
||||
nugget_embedder.set_property('config-file-path', "/home/thebears/DeepStream-Yolo/embedder.txt")
|
||||
|
||||
fakesink1 = Gst.ElementFactory.make("fakesink","fakesink")
|
||||
fakesink1.set_property('enable-last-sample', 0)
|
||||
fakesink1.set_property('sync', 1)
|
||||
pipeline.add(fakesink1)
|
||||
|
||||
fakesink2 = Gst.ElementFactory.make("fakesink","fakesink2")
|
||||
fakesink2.set_property('enable-last-sample', 0)
|
||||
fakesink2.set_property('sync', 1)
|
||||
pipeline.add(fakesink2)
|
||||
|
||||
pipeline.add(streammux)
|
||||
pipeline.add(nugget_detector)
|
||||
pipeline.add(nugget_embedder)
|
||||
|
||||
|
||||
|
||||
# uri_name = 'file:///home/thebears/railing.mp4'
|
||||
# uri_name = 'file:///home/thebears/railing_00_20250213094806.mp4'
|
||||
source_file=create_source_bin(file_path)
|
||||
|
||||
pipeline.add(source_file)
|
||||
|
||||
stream_pad = streammux.request_pad_simple("sink_0")
|
||||
source_pad = source_file.get_static_pad("src")
|
||||
source_pad.link(stream_pad)
|
||||
|
||||
tee=Gst.ElementFactory.make("tee", "nvsink-tee")
|
||||
pipeline.add(tee)
|
||||
|
||||
queue1=Gst.ElementFactory.make("queue", "nvtee-que1")
|
||||
queue2=Gst.ElementFactory.make("queue", "nvtee-que2")
|
||||
pipeline.add(queue1)
|
||||
pipeline.add(queue2)
|
||||
|
||||
streammux.link(tee)
|
||||
|
||||
|
||||
tee.link(queue1)
|
||||
tee.link(queue2)
|
||||
|
||||
queue1.link(nugget_detector)
|
||||
queue2.link(nugget_embedder)
|
||||
|
||||
|
||||
cmd = f'/usr/bin/ffprobe -v error -select_streams v:0 -count_packets -show_entries stream=nb_read_packets -of csv=p=0 {file_path}'#/srv/ftp/railing/2025/02/28/railing_00_20250228115800.mp4
|
||||
try:
|
||||
frames = int(os.popen(cmd).read().strip())
|
||||
except:
|
||||
frames = 0
|
||||
|
||||
logger.info(f"TOTAL_FRAMES: {frames}")
|
||||
|
||||
embedder_list = list()
|
||||
|
||||
|
||||
embedder_results = partial(embedder_results_probe, list_add=embedder_list, frame_num = frames)
|
||||
nugget_embedder.get_static_pad("src").add_probe(Gst.PadProbeType.BUFFER, embedder_results, 0)
|
||||
|
||||
|
||||
detector_list = list()
|
||||
|
||||
|
||||
detector_results = partial(detector_results_probe, list_add = detector_list, frame_num = frames)
|
||||
nugget_detector.get_static_pad("src").add_probe(Gst.PadProbeType.BUFFER, detector_results, 0)
|
||||
|
||||
|
||||
|
||||
nugget_detector.link(fakesink1)
|
||||
nugget_embedder.link(fakesink2)
|
||||
|
||||
|
||||
# create an event loop and feed gstreamer bus mesages to it
|
||||
loop = GLib.MainLoop()
|
||||
bus = pipeline.get_bus()
|
||||
bus.add_signal_watch()
|
||||
bus.connect ("message", bus_call, loop)
|
||||
|
||||
# start play back and listen to events
|
||||
print("Starting pipeline \n")
|
||||
pipeline.set_state(Gst.State.PLAYING)
|
||||
try:
|
||||
loop.run()
|
||||
except:
|
||||
pass
|
||||
# cleanup
|
||||
pipeline.set_state(Gst.State.NULL)
|
||||
return detector_list, embedder_list
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(run_inference(sys.argv[1]))
|
||||
|
||||
5
test.py
Normal file
5
test.py
Normal file
@@ -0,0 +1,5 @@
|
||||
import time
|
||||
|
||||
while True:
|
||||
print("Hello from Orin")
|
||||
time.sleep(0.25)
|
||||
36
test_kafka.py
Normal file
36
test_kafka.py
Normal file
@@ -0,0 +1,36 @@
|
||||
from kafka import TopicPartition
|
||||
from kafka.structs import OffsetAndMetadata
|
||||
from CommonCode import kwq
|
||||
|
||||
input_topic = kwq.TOPICS.videos_to_score_detection
|
||||
producer = kwq.producer
|
||||
topic_produce = kwq.TOPICS.videos_scored_detection
|
||||
|
||||
|
||||
|
||||
client_id = 'hello_world2'
|
||||
group_id = client_id
|
||||
|
||||
|
||||
consumer = kwq.create_consumer(input_topic, group_id = group_id, client_id = client_id)
|
||||
|
||||
|
||||
c_part = TopicPartition(input_topic, 0)
|
||||
consumer.assign([c_part])
|
||||
|
||||
|
||||
|
||||
c_committed = consumer.committed(c_part)
|
||||
logger.info(f"KAFKA_POSITION_IS: {str(consumer.position(c_part))}")
|
||||
|
||||
if c_committed is None:
|
||||
logger.info(f"KAFKA_POSITION_NOT_COMMITTED")
|
||||
else:
|
||||
logger.info(f"KAFKA_POSITION_COMMITTED_IS: {c_committed}")
|
||||
consumer.seek(c_part, c_committed)
|
||||
logger.info("START POLLING")
|
||||
|
||||
# %%
|
||||
|
||||
for c in consumer:
|
||||
print(c.offset)
|
||||
1048
trimmed.json.orin
Normal file
1048
trimmed.json.orin
Normal file
File diff suppressed because it is too large
Load Diff
BIN
trimmed.mp4
Normal file
BIN
trimmed.mp4
Normal file
Binary file not shown.
5165
trimmed.oclip.orin
Normal file
5165
trimmed.oclip.orin
Normal file
File diff suppressed because it is too large
Load Diff
168
wait_for_new_messages.py
Normal file
168
wait_for_new_messages.py
Normal file
@@ -0,0 +1,168 @@
|
||||
|
||||
from CommonCode import kwq
|
||||
import time
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from CommonCode.settings import get_logger, LogColorize
|
||||
from kafka import TopicPartition
|
||||
from kafka.structs import OffsetAndMetadata
|
||||
pfm = LogColorize.score_obj_det_orin
|
||||
logger = get_logger(__name__,'/var/log/ml_vision_logs/01_score_obj_det_orin', stdout=True, systemd=False)
|
||||
|
||||
os.system("sudo /usr/bin/systemctl restart --now systemd-journal-upload.service")
|
||||
|
||||
logger.info(pfm(f"Starting wait_for_new_messages.py on orin for scoring object detection"))
|
||||
input_topic = kwq.TOPICS.videos_to_score_detection
|
||||
producer = kwq.producer
|
||||
topic_produce = kwq.TOPICS.videos_scored_detection
|
||||
|
||||
client_id = 'obj_detector_orin_3'
|
||||
group_id = client_id
|
||||
# %%
|
||||
|
||||
|
||||
import json
|
||||
|
||||
|
||||
logger.debug("Starting Kafka Consumer")
|
||||
|
||||
from deepstream_obj_det import run_inference, target_width, target_height
|
||||
import os
|
||||
|
||||
os.environ.pop("DISPLAY",None)
|
||||
|
||||
def run_inference_for_file(file_path):
|
||||
start_time = time.time()
|
||||
|
||||
end_time = time.time()
|
||||
|
||||
pre_path, _ = os.path.splitext(file_path)
|
||||
|
||||
det_path = pre_path + '.json.orin'
|
||||
emb_path = pre_path + '.oclip.orin'
|
||||
if os.path.exists(det_path) and os.path.exists(emb_path):
|
||||
return "Already scored"
|
||||
|
||||
if not os.path.exists(file_path):
|
||||
return "Movie does not exist"
|
||||
|
||||
# %%
|
||||
|
||||
cmd = f'/usr/bin/ffprobe -v error -select_streams v:0 -count_packets -show_entries stream=nb_read_packets -of csv=p=0 {file_path}'#/srv/ftp/railing/2025/02/28/railing_00_20250228115800.mp4
|
||||
try:
|
||||
frames = int(os.popen(cmd).read().strip())
|
||||
except:
|
||||
frames = 0
|
||||
|
||||
logger.info(f"TOTAL_FRAMES: {frames}")
|
||||
if frames < 30:
|
||||
logger.info(f"TOTAL_FRAMES_SKIPPING: {file_path}")
|
||||
return "FAILED, NOT ENOUGH FRAMES"
|
||||
|
||||
while True:
|
||||
try:
|
||||
with open(det_path,'w') as ff:
|
||||
out = ff.write(' '*100)
|
||||
except OSError as e:
|
||||
logger.error(f"NO_SPACE :{det_path}:{e}")
|
||||
else:
|
||||
break
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
try:
|
||||
os.remove(det_path)
|
||||
except:
|
||||
pass
|
||||
logger.info(f"ENOUGH SPACE, STARTING INFERENCE")
|
||||
detector_results, embedder_results = run_inference(file_path)
|
||||
obj_det_dict = dict()
|
||||
obj_det_dict['meta'] = {'model_version':'orin_v1'}
|
||||
obj_det_dict['scoring'] = {'start_time':start_time, 'end_time': end_time}
|
||||
obj_det_dict['json'] ={'path':det_path}
|
||||
obj_det_dict['video'] = {'path':file_path, 'target_w': target_width, 'target_h': target_height}
|
||||
|
||||
|
||||
|
||||
|
||||
by_frame_num = dict()
|
||||
for idx, sc in enumerate(detector_results):
|
||||
c_res = dict()
|
||||
c_frame = sc['frame_number']
|
||||
c_res['score'] = sc['score']
|
||||
c_res['L'] = sc['left']
|
||||
c_res['T'] = sc['top']
|
||||
c_res['W'] = sc['width']
|
||||
c_res['H'] = sc['height']
|
||||
c_res['name'] = sc['label']
|
||||
c_res['idx'] = sc['class_id']
|
||||
if c_frame not in by_frame_num:
|
||||
by_frame_num[c_frame] = list()
|
||||
|
||||
by_frame_num[c_frame].append(c_res)
|
||||
|
||||
obj_det_dict['scores'] = [{'frame':key, 'detections':val} for key,val in by_frame_num.items()]
|
||||
|
||||
|
||||
with open(det_path,'w') as ff:
|
||||
json.dump(obj_det_dict, ff, indent=4)
|
||||
|
||||
|
||||
emb_dict = dict()
|
||||
emb_dict['meta'] = {'model_version':'ViT-L-16-SigLIP2-512','host':'orin'}
|
||||
emb_dict['scoring'] = {'start_time':start_time, 'end_time': end_time}
|
||||
emb_dict['json'] ={'path':det_path}
|
||||
emb_dict['video'] = {'path':file_path, 'target_w': target_width, 'target_h': target_height}
|
||||
emb_dict['scores'] = list()
|
||||
|
||||
|
||||
for c_score in embedder_results:
|
||||
fr_num = c_score['frame_number']
|
||||
vect = c_score['vector']
|
||||
emb_dict['scores'].append({'score':vect, 'frame':fr_num})
|
||||
|
||||
with open(emb_path,'w') as ff:
|
||||
json.dump(emb_dict, ff, indent=4)
|
||||
|
||||
return "Success"
|
||||
|
||||
# %%
|
||||
consumer = kwq.create_consumer(input_topic, group_id = group_id, client_id = client_id)
|
||||
#consumer.subscribe(input_topic)
|
||||
|
||||
c_part = TopicPartition(input_topic, 0)
|
||||
consumer.assign([c_part])
|
||||
|
||||
|
||||
c_committed = consumer.committed(c_part)
|
||||
logger.info(f"KAFKA_POSITION_IS: {str(consumer.position(c_part))}")
|
||||
|
||||
if c_committed is None:
|
||||
logger.info(f"KAFKA_POSITION_NOT_COMMITTED")
|
||||
else:
|
||||
logger.info(f"KAFKA_POSITION_COMMITTED_IS: {c_committed}")
|
||||
consumer.seek(c_part, c_committed)
|
||||
logger.info("START POLLING")
|
||||
|
||||
#while True:
|
||||
# out = consumer.poll(timeout_ms=5000 , update_offsets = False)
|
||||
# msgs = list()
|
||||
# logger.info(f"KAFKA_POSITION_COMMITTED_IS: {str(consumer.committed(c_part))}")
|
||||
# logger.info(f"KAFKA_POSITION_IS: {str(consumer.position(c_part))}")
|
||||
#
|
||||
# for k, v in out.items():
|
||||
# msgs.extend(v)
|
||||
# for message in msgs:
|
||||
for message in consumer:
|
||||
logger.info(f"KAFKA_POSITION_COMMITTED_IS: {str(consumer.committed(c_part))}")
|
||||
logger.info(f"KAFKA_POSITION_IS: {str(consumer.position(c_part))}")
|
||||
|
||||
logger.info(f"MSG_RECEIVED :{message}")
|
||||
logger.info(f"INFERENCE_START: {pfm(message.key)}")
|
||||
result = run_inference_for_file(message.key)
|
||||
logger.info(f"INFERENCE_DONE:{pfm(result)} {message.key}")
|
||||
oandm = OffsetAndMetadata(message.offset,'')
|
||||
consumer.commit({c_part:oandm})
|
||||
producer.send(topic_produce, value=message.value, key=message.key)
|
||||
|
||||
Reference in New Issue
Block a user