This commit is contained in:
2025-10-03 14:28:55 -04:00
parent 4cbdb00e14
commit 4c847f7753
13 changed files with 121 additions and 57 deletions

102
kwq.py
View File

@@ -2,53 +2,81 @@
from enum import Enum
import json
import sys
from kafka import KafkaProducer, KafkaConsumer
from confluent_kafka import Producer, Consumer
bootstrap_server = ["192.168.1.242:19092"]
bootstrap_server = "192.168.1.242:19092"
class TOPICS():
videos_to_score_detection = "videos_to_score_detection"
videos_scored_detection="videos_scored_detection"
videos_with_nuggets="videos_with_nuggets"
exit_00_videos_to_score_detection = "00_videos_to_score_detection"
exit_10_videos_scored_detection="10_videos_scored_detection"
exit_40_videos_with_nuggets="40_videos_with_nuggets"
exit_40_videos_without_nuggets="40_videos_without_nuggets"
exit_50_videos_modified="50_videos_modified"
enter_60_videos_embed_priority="60_videos_embed_priority" # Higher prioriy queue
exit_60_videos_invalid="60_videos_invalid_file"
exit_60_embedding_failed="exit_60_embedding_failed"
exit_60_videos_embedded="60_videos_scored_embedding"
videos_no_json="videos_no_json"
videos_without_nuggets="videos_without_nuggets"
videos_embedding_in_db="videos_embed_in_db"
videos_embedding_in_db_fail = "videos_embed_in_db_fail"
serializer = lambda v: json.dumps(v).encode("utf-8")
deserializer = json.loads
class KafkaClient:
def __init__(self):
self.producer = Producer({
'bootstrap.servers': bootstrap_server,
'security.protocol': 'SASL_PLAINTEXT', # change to SASL_SSL if TLS is enabled
'sasl.mechanism': 'SCRAM-SHA-256',
'sasl.username': 'superuser',
'sasl.password': 'marybear',
'request.timeout.ms': 15000,
'retries': 3,
'retry.backoff.ms': 100,
'linger.ms': 5,
'metadata.max.age.ms': 300000,
})
producer = KafkaProducer(
bootstrap_servers=bootstrap_server,
key_serializer=serializer,
value_serializer=serializer,
request_timeout_ms=15000, # 15s (keep small)
max_block_ms=10000, # 10s max blocking
metadata_max_age_ms=300000,
retry_backoff_ms=100,
linger_ms=5,
retries=3,
security_protocol='SASL_PLAINTEXT', # change to SASL_SSL if TLS is enabled
sasl_mechanism='SCRAM-SHA-256',
sasl_plain_username='superuser',
sasl_plain_password='marybear'
)
def send(self, topic, key=None, value=None):
def delivery_report(err, msg):
if err is not None:
print(f'Kafka delivery failed: {err}')
else:
print(f'Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}')
key_bytes = json.dumps(key).encode("utf-8") if key is not None else None
value_bytes = json.dumps(value).encode("utf-8") if value is not None else None
self.producer.produce(topic, key=key_bytes, value=value_bytes, callback=delivery_report)
self.producer.flush()
def create_consumer(self, group_id=None, client_id=None, auto_offset_reset = 'latest'):
conf = {
'bootstrap.servers': bootstrap_server,
'group.id': group_id if group_id else 'default_group',
'auto.offset.reset': auto_offset_reset,
'enable.auto.commit': False,
'security.protocol': 'SASL_PLAINTEXT',
'sasl.mechanism': 'SCRAM-SHA-256',
'sasl.username': 'superuser',
'sasl.password': 'marybear',
'client.id': client_id if client_id else 'default_client',
}
consumer = Consumer(conf)
return consumer
def create_consumer(group_id = None, client_id = None):
return KafkaConsumer(
bootstrap_servers=bootstrap_server,
key_deserializer=deserializer,
value_deserializer=deserializer,
enable_auto_commit = False,
group_id=group_id,
client_id = client_id,
auto_offset_reset = 'earliest',
security_protocol='SASL_PLAINTEXT', # change to SASL_SSL if TLS is enabled
sasl_mechanism='SCRAM-SHA-256',
sasl_plain_username='superuser',
sasl_plain_password='marybear'
)
# Instantiate a global client for compatibility
kafka_client = KafkaClient()
producer = kafka_client
publish = kafka_client.send
def create_consumer(group_id=None, client_id=None, auto_offset_reset = 'latest'):
return kafka_client.create_consumer(group_id, client_id, auto_offset_reset)