Source code for bspump.kafka.topic_initializer

import json
import yaml
import logging
import typing

import confluent_kafka.admin

import bspump.asab as asab
import bspump
import bspump.abc.sink
import bspump.abc.source


#

L = logging.getLogger(__name__)

#

_TOPIC_CONFIG_OPTIONS = {
    "compression.type",
    "leader.replication.throttled.replicas",
    "message.downconversion.enable",
    "min.insync.replicas",
    "segment.jitter.ms",
    "cleanup.policy",
    "flush.ms",
    "follower.replication.throttled.replicas",
    "segment.bytes",
    "retention.ms",
    "flush.messages",
    "message.format.version",
    "max.compaction.lag.ms",
    "file.delete.delay.ms",
    "max.message.bytes",
    "min.compaction.lag.ms",
    "message.timestamp.type",
    "preallocate",
    "min.cleanable.dirty.ratio",
    "index.interval.bytes",
    "unclean.leader.election.enable",
    "retention.bytes",
    "delete.retention.ms",
    "segment.ms",
    "message.timestamp.difference.max.ms",
    "segment.index.bytes",
}


def _is_kafka_component(component):
    if isinstance(component, bspump.kafka.KafkaSource) or isinstance(
        component, bspump.kafka.KafkaSink
    ):
        return True
    return False


[docs] class KafkaTopicInitializer(asab.Configurable): """ KafkaTopicInitializer reads topic configs from file or from Kafka sink/source configs, checks if they exists and creates them if they don't. KafkaAdminClient requires blocking connection, which is why this class doesn't use the connection module from BSPump. Usage: topic_initializer = KafkaTopicInitializer(app, "KafkaConnection") topic_initializer.include_topics(MyPipeline) topic_initializer.initialize_topics() """ ConfigDefaults = { "client_id": "bspump-topic-initializer", "topics_file": "", "num_partitions_default": 1, "replication_factor_default": 1, }
[docs] def __init__( self, app, connection, id: typing.Optional[str] = None, config: dict = None ): """ Initializes the parameters passed to the class. **Parameters** app : Application Name of the `Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_. connection : Connection Information needed to create a connection. id: typing.Optional[str] = None : config: dict = None : JSON configuration file containing important information. """ _id = id if id is not None else self.__class__.__name__ super().__init__(_id, config) # Keeps the topic objects that need to be checked/initialized self.RequiredTopics = {} # Caches the names of existing topics self.ExistingTopics: typing.Optional[set] = None self.BootstrapServers = None self.ClientId = self.Config.get("client_id") self._get_bootstrap_servers(app, connection) topics_file = self.Config.get("topics_file") if len(topics_file) != 0: self.include_topics_from_file(topics_file)
def _get_bootstrap_servers(self, app, connection): svc = app.get_service("bspump.PumpService") self.BootstrapServers = ( svc.Connections[connection].Config["bootstrap_servers"].strip() )
[docs] def include_topics( self, *, topic_config=None, kafka_component=None, pipeline=None, config_file=None, ): """ Includes topic from config file or dict object. It can also scan Pipeline and get topics from Source or Sink. **Parameters** * : topic_config : , default= None Topic config file. kafka_component : , default= None pipeline : , default= None Name of the Pipeline. config_file : , default= None Configuration file. """ # Include topic from config or dict object if topic_config is not None: L.info("Including topics from dictionary") self.include_topics_from_config(topic_config) # Include topic from config file if config_file is not None: L.info("Including topics from '{}'".format(config_file)) self.include_topics_from_file(config_file) # Get topics from Kafka Source or Sink if kafka_component is not None: L.info("Including topics from {}".format(kafka_component.Id)) self.include_topics_from_config(kafka_component.Config) # Scan the pipeline for KafkaSource(s) or KafkaSink if pipeline is not None: for source in pipeline.Sources: if _is_kafka_component(source): L.info("Including topics from {}".format(source.Id)) self.include_topics_from_config(source.Config) sink = pipeline.Processors[0][-1] if _is_kafka_component(sink): L.info("Including topics from {}".format(sink.Id)) self.include_topics_from_config(sink.Config)
[docs] def include_topics_from_file(self, topics_file: str): """ Includes topics from a topic file. **Parameters** topics_file:str : str Name of a topic file we wanted to include. """ # Support yaml and json input ext = topics_file.strip().split(".")[-1].lower() if ext == "json": with open(topics_file, "r") as f: data = json.load(f) elif ext in ("yml", "yaml"): with open(topics_file, "r") as f: data = yaml.safe_load(f) else: L.warning("Unsupported extension: '{}'".format(ext)) for topic in data: if "topic" not in topic and "name" in topic: # BACK-COMPAT L.warning("Topic attribute 'name' is deprecated; use 'topic' instead.") topic["topic"] = topic.pop("name") if "config" not in topic and "topic_configs" in topic: # BACK-COMPAT L.warning( "Topic attribute 'topic_configs' is deprecated; use 'config' instead." ) topic["config"] = topic.pop("topic_configs") if "num_partitions" not in topic: topic["num_partitions"] = int(self.Config.get("num_partitions_default")) if "replication_factor" not in topic: topic["replication_factor"] = int( self.Config.get("replication_factor_default") ) self.RequiredTopics[topic["topic"]] = confluent_kafka.admin.NewTopic( **topic )
[docs] def include_topics_from_config(self, config_object): """ Includes topics using a config **Parameters** config_object : JSON config object containing information about what topics we want to include. """ # Every kafka topic needs to have: name, num_partitions and replication_factor topic_names = config_object.get("topic").split(",") if "num_partitions" in config_object: num_partitions = int(config_object.pop("num_partitions")) else: num_partitions = int(self.Config.get("num_partitions_default")) if "replication_factor" in config_object: replication_factor = int(config_object.pop("replication_factor")) else: replication_factor = int(self.Config.get("replication_factor_default")) # Additional configs are optional topic_configs = config_object.pop("config", {}) for config_option in set(config_object.keys()): if config_option in _TOPIC_CONFIG_OPTIONS: topic_configs[config_option] = config_object.pop(config_option) # Create topic objects for name in topic_names: self.RequiredTopics[name] = confluent_kafka.admin.NewTopic( topic=name, num_partitions=num_partitions, replication_factor=replication_factor, config=topic_configs, )
[docs] def fetch_existing_topics(self): admin_client = confluent_kafka.admin.AdminClient( {"bootstrap.servers": self.BootstrapServers, "client.id": self.ClientId} ) result = admin_client.list_topics() if result is None: raise RuntimeError("Empty response from list_topics") self.ExistingTopics = set(result.topics.keys())
[docs] def check_and_initialize(self): """ Initializes new topics and logs a warning. """ L.warning( "`check_and_initialize()` is obsoleted, use `initialize_topics()` instead" ) self.initialize_topics()
[docs] def initialize_topics(self): """ Initializes topics ?? """ if len(self.RequiredTopics) == 0: L.info("No Kafka topics were required.") return # Fetch topics if not cached if self.ExistingTopics is None: try: self.fetch_existing_topics() except Exception as e: L.error("Failed to fetch Kafka topics: {}".format(e)) return # Filter out the topics that already exist missing_topics = [ topic for topic in self.RequiredTopics.values() if topic.topic not in self.ExistingTopics ] if len(missing_topics) == 0: L.info("No missing Kafka topics to be initialized.") return try: admin_client = confluent_kafka.admin.AdminClient( {"bootstrap.servers": self.BootstrapServers, "client.id": self.ClientId} ) # Create topics # TODO: update configs of existing topics using `admin_client.alter_configs()` tasks = admin_client.create_topics(missing_topics) for topic, task in tasks.items(): task.result() # Update existing topic cache self.ExistingTopics.update(self.RequiredTopics.keys()) L.log( asab.LOG_NOTICE, "Kafka topics created", struct_data={ "topics": ", ".join(topic.topic for topic in missing_topics) }, ) except Exception as e: L.error("Kafka topic initialization failed: {}".format(e))