bspump.kafka

Apache Kafka integration for BSPump.

class bspump.kafka.KafkaConnection(app, id=None, config=None)[source]

Bases: Connection

KafkaConnection serves to connect BSPump application with an instance of Apache Kafka messaging system. It can later be used by processors to consume or provide user-defined messages.

config = {"bootstrap_servers": "localhost:9092"}
app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
svc.add_connection(
        bspump.kafka.KafkaConnection(app, "KafkaConnection", config)
)

Standard Kafka configuration options can be used,
as specified in librdkafka library,
where the options are simply passed to:

https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
ConfigDefaults: dict = {}
__init__(app, id=None, config=None)[source]

Description:

Parameters

appApplication

Specification of an Application.

id : default None

configJSON or other compatible format, default None

It contains important information and data responsible for creating a connection.

class bspump.kafka.KafkaSource(app, pipeline, connection, id=None, config=None)[source]

Bases: Source

KafkaSource object consumes messages from an Apache Kafka system, which is configured in the KafkaConnection object. It then passes them to other processors in the pipeline.

class KafkaPipeline(bspump.Pipeline):

                def __init__(self, app, pipeline_id):
                                super().__init__(app, pipeline_id)
                                self.build(
                                                bspump.kafka.KafkaSource(app, self, "KafkaConnection", config={'topic': 'messages'}),
                                                bspump.kafka.KafkaSink(app, self, "KafkaConnection", config={'topic': 'messages2'}),
                                )

To ensure that after restart, pump will continue receiving messages where it left of, group_id has to
be provided in the configuration.

When the group_id is set, the consumer group is created and the Kafka server will then operate
in the producer-consumer mode. It means that every consumer with the same group_id will be assigned
unique set of partitions, hence all messages will be divided among them and thus unique.

Long-running synchronous operations should be avoided or places inside the OOBGenerator in the asynchronous
way or on thread using ASAB Proactor service (see bspump-oob-proactor.py example in "examples" folder).
Otherwise, the session_timeout_ms should be raised to prevent Kafka from disconnecting the consumer
from the partition, thus causing rebalance.

Standard Kafka configuration options can be used,
as specified in librdkafka library,
where the options are simply passed to:

https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
ConfigDefaults: dict = {'auto.commit.interval.ms': '1000', 'auto.offset.reset': 'smallest', 'enable.auto.commit': 'true', 'group.id': 'bspump', 'refresh_topics': 0, 'topic': 'unconfigured'}
__init__(app, pipeline, connection, id=None, config=None)[source]

Initializes parameters.

Parameters

appApplication

Name of the Application.

pipelinePipeline

Name of the Pipeline.

connectionConnection

information needed to create a connection.

id : , default = None

config : , default = None

async main()[source]

Can be implemented for additional features, else will raise NotImplementedError and _main is called instead.

class bspump.kafka.KafkaSink(app, pipeline, connection, id=None, config=None)[source]

Bases: Sink

Description: KafkaSink is a sink processor that forwards the event to a Apache Kafka specified by a KafkaConnection object.

KafkaSink expects bytes as an input. If the input is string or dictionary, it is automatically transformed to bytes using encoding charset specified in the configuration.

class KafkaPipeline(bspump.Pipeline):

                def __init__(self, app, pipeline_id):
                                super().__init__(app, pipeline_id)
                                self.build(
                                                bspump.kafka.KafkaSource(app, self, "KafkaConnection", config={'topic': 'messages'}),
                                                bspump.kafka.KafkaSink(app, self, "KafkaConnection", config={'topic': 'messages2'}),
                )

There are two ways to use KafkaSink:
- Specify a single topic in KafkaSink config - topic, to be used for all the events in pipeline.
- Specify topic separately for each event in event context - context['kafka_topic'].
                Topic from configuration is than used as a default topic.
                To provide business logic for event distribution, you can create topic selector processor.
Processor example:
class KafkaTopicSelector(bspump.Processor):

                def process(self, context, event):
                                if event.get("weight") > 10:
                                                context["kafka_topic"] = "heavy"
                                else:
                                                context["kafka_topic"] = "light"

                                return event

Every kafka message can be a key:value pair. Key is read from event context - context['kafka_key'].
If kafka_key is not provided, key defaults to None.

Standard Kafka configuration options can be used,
as specified in librdkafka library,
where the options are simply passed to:

https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
ConfigDefaults: dict = {'batch.num.messages': '100000', 'batch.size': '1000000', 'linger.ms': '500', 'poll.timeout': '0.2', 'topic': 'unconfigured', 'watermark.high': '90000', 'watermark.low': '40000'}
__init__(app, pipeline, connection, id=None, config=None)[source]

Initializes the Parameters

Parameters

appobject

Application object.

pipelinePipeline

Name of the Pipeline.

idstr, default=None,

ID of the class of config.

configJSON, or other compatible formats, default=None

Configuration file.

process(context, event)[source]

Can be implemented to return event based on a given logic.

Parameters

context :

Additional information passed to the method.

eventData with time stamp stored in any data type, usually it is in JSON.

You can specify an event that is passed to the method.

Parameters:

event (bytes)

class bspump.kafka.KafkaKeyFilter(app, pipeline, keys, id=None, config=None)[source]

Bases: Processor

KafkaKeyFilter reduces the incoming event stream from Kafka based on a key provided in each event.

Every Kafka message has a key, KafkaKeyFilter selects only those events where the event key matches one of provided ‘keys’, other events will be discarded.

Set filtering keys as a parameter (in bytes) in the KafkaKeyFilter constructor.

KafkaKeyFilter is meant to be inserted after KafkaSource in a Pipeline.

__init__(app, pipeline, keys, id=None, config=None)[source]

Initializes variables

Parameters

appApplication

Name of the `Application <https://asab.readthedocs.io/en/latest/asab/application.html`_.

pipelinePipeline

Name of the Pipeline.

keysbytes

keys used to filter out events from the event stream.

id : , default = None

configJSON, default = None

configuration file in JSON

process(context, event)[source]

Can be implemented to return event based on a given logic.

Parameters

context :

Additional information passed to the method.

eventData with time stamp stored in any data type, usually it is in JSON.

You can specify an event that is passed to the method.

class bspump.kafka.KafkaTopicInitializer(app, connection, id=None, config=None)[source]

Bases: Configurable

KafkaTopicInitializer reads topic configs from file or from Kafka sink/source configs, checks if they exists and creates them if they don’t.

KafkaAdminClient requires blocking connection, which is why this class doesn’t use the connection module from BSPump.

Usage: topic_initializer = KafkaTopicInitializer(app, “KafkaConnection”) topic_initializer.include_topics(MyPipeline) topic_initializer.initialize_topics()

Parameters:
ConfigDefaults: dict = {'client_id': 'bspump-topic-initializer', 'num_partitions_default': 1, 'replication_factor_default': 1, 'topics_file': ''}
__init__(app, connection, id=None, config=None)[source]

Initializes the parameters passed to the class.

Parameters

appApplication

Name of the Application.

connectionConnection

Information needed to create a connection.

id: typing.Optional[str] = None :

config: dict = NoneJSON

configuration file containing important information.

Parameters:
include_topics(*, topic_config=None, kafka_component=None, pipeline=None, config_file=None)[source]

Includes topic from config file or dict object. It can also scan Pipeline and get topics from Source or Sink.

Parameters

  • :

topic_config, default= None

Topic config file.

kafka_component : , default= None

pipeline, default= None

Name of the Pipeline.

config_file, default= None

Configuration file.

include_topics_from_file(topics_file)[source]

Includes topics from a topic file.

Parameters

topics_file:strstr

Name of a topic file we wanted to include.

Parameters:

topics_file (str)

include_topics_from_config(config_object)[source]

Includes topics using a config

Parameters

config_objectJSON

config object containing information about what topics we want to include.

fetch_existing_topics()[source]
check_and_initialize()[source]

Initializes new topics and logs a warning.

initialize_topics()[source]

Initializes topics ??

KafkaConnection

Shared connection to a Kafka cluster.

import bspump.kafka

connection = bspump.kafka.KafkaConnection(app, "KafkaConnection")

Configuration:

[connection:KafkaConnection]
bootstrap_servers=kafka:9092
security_protocol=PLAINTEXT
group_id=my-consumer-group

Options:

  • bootstrap_servers - Comma-separated list of brokers

  • security_protocol - PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL

  • group_id - Consumer group ID

  • ssl_cafile - Path to CA certificate

  • ssl_certfile - Path to client certificate

  • ssl_keyfile - Path to client key

  • sasl_mechanism - PLAIN, SCRAM-SHA-256, SCRAM-SHA-512

  • sasl_plain_username - SASL username

  • sasl_plain_password - SASL password

KafkaSource

Consumes messages from Kafka topics.

source = bspump.kafka.KafkaSource(
    app, pipeline,
    connection="KafkaConnection"
)

Configuration:

[pipeline:MyPipeline:KafkaSource]
topic=input-topic
auto_offset_reset=earliest
max_poll_records=500

Options:

  • topic - Single topic to consume

  • topics - Comma-separated list of topics

  • auto_offset_reset - earliest or latest

  • max_poll_records - Maximum records per poll

  • max_poll_interval_ms - Maximum time between polls

Context Keys:

The source adds these to the event context:

  • kafka_topic - Source topic

  • kafka_partition - Partition number

  • kafka_offset - Message offset

  • kafka_key - Message key

  • kafka_timestamp - Message timestamp

KafkaSink

Produces messages to Kafka topics.

sink = bspump.kafka.KafkaSink(
    app, pipeline,
    connection="KafkaConnection"
)

Configuration:

[pipeline:MyPipeline:KafkaSink]
topic=output-topic
acks=all

Options:

  • topic - Target topic

  • acks - Acknowledgment level (0, 1, all)

  • batch_size - Batch size in bytes

  • linger_ms - Time to wait for batch

Dynamic Topic:

Set the topic dynamically in a processor:

def process(self, context, event):
    context["kafka_topic"] = f"events-{event['type']}"
    return event

Message Key:

Set the message key for partitioning:

def process(self, context, event):
    context["kafka_key"] = event["user_id"].encode()
    return event

Example Pipeline

import bspump
import bspump.kafka

class KafkaProcessingPipeline(bspump.Pipeline):
    def __init__(self, app, pipeline_id):
        super().__init__(app, pipeline_id)
        self.build(
            bspump.kafka.KafkaSource(app, self, connection="KafkaConnection"),
            MyProcessor(app, self),
            bspump.kafka.KafkaSink(app, self, connection="KafkaConnection"),
        )

app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
svc.add_connection(bspump.kafka.KafkaConnection(app, "KafkaConnection"))
svc.add_pipeline(KafkaProcessingPipeline(app, "Pipeline"))
app.run()