#!/usr/bin/env python from enum import Enum import json import sys from confluent_kafka import Producer, Consumer bootstrap_server = "192.168.1.242:19092" class TOPICS(): exit_00_videos_to_score_detection = "00_videos_to_score_detection" exit_10_videos_scored_detection="10_videos_scored_detection" exit_40_videos_with_nuggets="40_videos_with_nuggets" exit_40_videos_without_nuggets="40_videos_without_nuggets" exit_50_videos_modified="50_videos_modified" enter_60_videos_embed_priority="60_videos_embed_priority" # Higher prioriy queue exit_60_videos_invalid="60_videos_invalid_file" exit_60_embedding_failed="60_embedding_failed" exit_60_videos_embedded="60_videos_scored_embedding" videos_no_json="videos_no_json" videos_embedding_in_db="videos_embed_in_db" videos_embedding_in_db_fail = "videos_embed_in_db_fail" class KafkaClient: def __init__(self): self.producer = Producer({ 'bootstrap.servers': bootstrap_server, 'security.protocol': 'SASL_PLAINTEXT', # change to SASL_SSL if TLS is enabled 'sasl.mechanism': 'SCRAM-SHA-256', 'sasl.username': 'superuser', 'sasl.password': 'marybear', 'request.timeout.ms': 15000, 'retries': 3, 'retry.backoff.ms': 100, 'linger.ms': 5, 'metadata.max.age.ms': 300000, }) def send(self, topic, key=None, value=None): def delivery_report(err, msg): if err is not None: print(f'Kafka delivery failed: {err}') else: print(f'Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}') key_bytes = json.dumps(key).encode("utf-8") if key is not None else None value_bytes = json.dumps(value).encode("utf-8") if value is not None else None self.producer.produce(topic, key=key_bytes, value=value_bytes, callback=delivery_report) self.producer.flush() def create_consumer(self, group_id=None, client_id=None, auto_offset_reset = 'latest'): conf = { 'bootstrap.servers': bootstrap_server, 'group.id': group_id if group_id else 'default_group', 'auto.offset.reset': auto_offset_reset, 'enable.auto.commit': False, 'security.protocol': 'SASL_PLAINTEXT', 'sasl.mechanism': 'SCRAM-SHA-256', 'sasl.username': 'superuser', 'sasl.password': 'marybear', 'client.id': client_id if client_id else 'default_client', } consumer = Consumer(conf) return consumer # Instantiate a global client for compatibility kafka_client = KafkaClient() producer = kafka_client publish = kafka_client.send def create_consumer(group_id=None, client_id=None, auto_offset_reset = 'latest'): return kafka_client.create_consumer(group_id, client_id, auto_offset_reset)