Source code for bspump.kafka.source

import asyncio
import logging

import confluent_kafka

from ..abc.source import Source

#

L = logging.getLogger(__name__)

#


[docs] class KafkaSource(Source): """ KafkaSource object consumes messages from an Apache Kafka system, which is configured in the KafkaConnection object. It then passes them to other processors in the pipeline. .. code:: python class KafkaPipeline(bspump.Pipeline): def __init__(self, app, pipeline_id): super().__init__(app, pipeline_id) self.build( bspump.kafka.KafkaSource(app, self, "KafkaConnection", config={'topic': 'messages'}), bspump.kafka.KafkaSink(app, self, "KafkaConnection", config={'topic': 'messages2'}), ) To ensure that after restart, pump will continue receiving messages where it left of, group_id has to be provided in the configuration. When the group_id is set, the consumer group is created and the Kafka server will then operate in the producer-consumer mode. It means that every consumer with the same group_id will be assigned unique set of partitions, hence all messages will be divided among them and thus unique. Long-running synchronous operations should be avoided or places inside the OOBGenerator in the asynchronous way or on thread using ASAB Proactor service (see bspump-oob-proactor.py example in "examples" folder). Otherwise, the session_timeout_ms should be raised to prevent Kafka from disconnecting the consumer from the partition, thus causing rebalance. Standard Kafka configuration options can be used, as specified in librdkafka library, where the options are simply passed to: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md """ ConfigDefaults = { "topic": "unconfigured", "refresh_topics": 0, "enable.auto.commit": "true", "auto.commit.interval.ms": "1000", "auto.offset.reset": "smallest", "group.id": "bspump", }
[docs] def __init__(self, app, pipeline, connection, id=None, config=None): """ Initializes parameters. **Parameters** app : Application Name of the `Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_. pipeline : Pipeline Name of the Pipeline. connection : Connection information needed to create a connection. id : , default = None config : , default = None """ super().__init__(app, pipeline, id=id, config=config) self.App = app self.Connection = self.Pipeline.locate_connection(app, connection) self.Sleep = 100 / 1000.0 self.ConsumerConfig = {} self.SpecialKeys = frozenset(["oauth_cb"]) # Copy connection options for key, value in self.Connection.Config.items(): if key in self.SpecialKeys: self.ConsumerConfig[key] = value else: self.ConsumerConfig[key.replace("_", ".")] = value # Copy configuration options, avoid the topic for key, value in self.Config.items(): if key == "topic" or key == "refresh_topics": continue if key in self.SpecialKeys: self.ConsumerConfig[key] = value else: self.ConsumerConfig[key.replace("_", ".")] = value # Create subscription list self.Subscribe = [] for s in self.Config["topic"].split(" "): if ":" not in s: self.Subscribe.append(s) else: self.Subscribe.append(s.rsplit(":", 1)) self.Running = True # For refreshing of topics/subscription self.RefreshTopics = int(self.Config["refresh_topics"]) self.LastRefreshTopicsTime = self.App.time()
[docs] async def main(self): while self.Running: try: c = confluent_kafka.Consumer(self.ConsumerConfig, logger=L) except BaseException as e: L.exception("Error when connecting to Kafka") self.Pipeline.set_error(None, None, e) return c.subscribe(self.Subscribe) try: while 1: await self.Pipeline.ready() current_time = self.App.time() if ( self.RefreshTopics > 0 and current_time > self.LastRefreshTopicsTime + self.RefreshTopics ): L.info("Topics refreshed in '{}'.".format(self.Id)) c.unsubscribe() c.close() self.LastRefreshTopicsTime = current_time break m = c.poll(0.2) if m is None: await asyncio.sleep(self.Sleep) continue if m.error(): L.error( "The following error occurred while polling for messages: '{}'.".format( m.error() ) ) await asyncio.sleep(self.Sleep) continue await self.process( m.value(), context={ "kafka_key": m.key(), "kafka_headers": m.headers(), "_kafka_topic": m.topic(), "_kafka_partition": m.partition(), "_kafka_offset": m.offset(), }, ) except asyncio.CancelledError: self.Running = False except BaseException as e: L.exception("Error when processing Kafka message") self.Pipeline.set_error(None, None, e)