Source code for bspump.kafka.sink

import logging

import confluent_kafka

import bspump.asab as asab

from ..abc.sink import Sink

#

L = logging.getLogger(__name__)

#


[docs] class KafkaSink(Sink): """ Description: KafkaSink is a sink processor that forwards the event to a Apache Kafka specified by a KafkaConnection object. KafkaSink expects bytes as an input. If the input is string or dictionary, it is automatically transformed to bytes using encoding charset specified in the configuration. .. code:: python class KafkaPipeline(bspump.Pipeline): def __init__(self, app, pipeline_id): super().__init__(app, pipeline_id) self.build( bspump.kafka.KafkaSource(app, self, "KafkaConnection", config={'topic': 'messages'}), bspump.kafka.KafkaSink(app, self, "KafkaConnection", config={'topic': 'messages2'}), ) There are two ways to use KafkaSink: - Specify a single topic in KafkaSink config - topic, to be used for all the events in pipeline. - Specify topic separately for each event in event context - context['kafka_topic']. Topic from configuration is than used as a default topic. To provide business logic for event distribution, you can create topic selector processor. Processor example: .. code:: python class KafkaTopicSelector(bspump.Processor): def process(self, context, event): if event.get("weight") > 10: context["kafka_topic"] = "heavy" else: context["kafka_topic"] = "light" return event Every kafka message can be a key:value pair. Key is read from event context - context['kafka_key']. If kafka_key is not provided, key defaults to None. Standard Kafka configuration options can be used, as specified in librdkafka library, where the options are simply passed to: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md """ ConfigDefaults = { "topic": "unconfigured", "watermark.low": "40000", "watermark.high": "90000", "batch.num.messages": "100000", "linger.ms": "500", # This settings makes a significant impact on the throughput "batch.size": "1000000", "poll.timeout": "0.2", # "compression.type": "snappy", }
[docs] def __init__(self, app, pipeline, connection, id=None, config=None): super().__init__(app, pipeline, id=id, config=config) connection = pipeline.locate_connection(app, connection) producer_config = {} # Copy connection options for key, value in connection.Config.items(): producer_config[key.replace("_", ".")] = value # Copy configuration options, avoid the topic, watermark and poll.timeout params for key, value in self.Config.items(): if key == "topic" or key.startswith("watermark") or key == "poll.timeout": continue producer_config[key.replace("_", ".")] = value self.Producer = confluent_kafka.Producer(producer_config, logger=L) self.Topic = self.Config["topic"] self.LowWatermark = int(self.Config["watermark.low"]) self.HighWatermark = int(self.Config["watermark.high"]) self.PollTimeout = float(self.Config["poll.timeout"]) self.IsThrottling = False self.ProactorService = self.Pipeline.App.get_service("asab.ProactorService") app.PubSub.subscribe_all(self)
@asab.subscribe("Application.tick!") async def _on_tick(self, event_name): if self.IsThrottling and (len(self.Producer) < self.LowWatermark): self.IsThrottling = False self.Pipeline.throttle(self, False) await self.ProactorService.execute(self.Producer.flush, self.PollTimeout)
[docs] def process(self, context, event: bytes): try: self.Producer.produce( context["kafka_topic"] if "kafka_topic" in context else self.Topic, value=event, key=context["kafka_key"] if "kafka_key" in context else None, headers=( context["kafka_headers"] if "kafka_headers" in context else None ), ) except Exception as e: L.exception("Error occurred when sending data to Kafka: '{}'".format(e)) if not self.IsThrottling and (len(self.Producer) > self.HighWatermark): self.IsThrottling = True self.Pipeline.throttle(self, True)