bspump.kafka¶
Apache Kafka integration for BSPump.
- class bspump.kafka.KafkaConnection(app, id=None, config=None)[source]¶
Bases:
ConnectionKafkaConnection serves to connect BSPump application with an instance of Apache Kafka messaging system. It can later be used by processors to consume or provide user-defined messages.
config = {"bootstrap_servers": "localhost:9092"} app = bspump.BSPumpApplication() svc = app.get_service("bspump.PumpService") svc.add_connection( bspump.kafka.KafkaConnection(app, "KafkaConnection", config) ) Standard Kafka configuration options can be used, as specified in librdkafka library, where the options are simply passed to: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
- __init__(app, id=None, config=None)[source]¶
Description:
Parameters
- appApplication
Specification of an Application.
id : default None
- configJSON or other compatible format, default None
It contains important information and data responsible for creating a connection.
- class bspump.kafka.KafkaSource(app, pipeline, connection, id=None, config=None)[source]¶
Bases:
SourceKafkaSource object consumes messages from an Apache Kafka system, which is configured in the KafkaConnection object. It then passes them to other processors in the pipeline.
class KafkaPipeline(bspump.Pipeline): def __init__(self, app, pipeline_id): super().__init__(app, pipeline_id) self.build( bspump.kafka.KafkaSource(app, self, "KafkaConnection", config={'topic': 'messages'}), bspump.kafka.KafkaSink(app, self, "KafkaConnection", config={'topic': 'messages2'}), ) To ensure that after restart, pump will continue receiving messages where it left of, group_id has to be provided in the configuration. When the group_id is set, the consumer group is created and the Kafka server will then operate in the producer-consumer mode. It means that every consumer with the same group_id will be assigned unique set of partitions, hence all messages will be divided among them and thus unique. Long-running synchronous operations should be avoided or places inside the OOBGenerator in the asynchronous way or on thread using ASAB Proactor service (see bspump-oob-proactor.py example in "examples" folder). Otherwise, the session_timeout_ms should be raised to prevent Kafka from disconnecting the consumer from the partition, thus causing rebalance. Standard Kafka configuration options can be used, as specified in librdkafka library, where the options are simply passed to: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
- ConfigDefaults: dict = {'auto.commit.interval.ms': '1000', 'auto.offset.reset': 'smallest', 'enable.auto.commit': 'true', 'group.id': 'bspump', 'refresh_topics': 0, 'topic': 'unconfigured'}¶
- __init__(app, pipeline, connection, id=None, config=None)[source]¶
Initializes parameters.
Parameters
- appApplication
Name of the Application.
- pipelinePipeline
Name of the Pipeline.
- connectionConnection
information needed to create a connection.
id : , default = None
config : , default = None
- class bspump.kafka.KafkaSink(app, pipeline, connection, id=None, config=None)[source]¶
Bases:
SinkDescription: KafkaSink is a sink processor that forwards the event to a Apache Kafka specified by a KafkaConnection object.
KafkaSink expects bytes as an input. If the input is string or dictionary, it is automatically transformed to bytes using encoding charset specified in the configuration.
class KafkaPipeline(bspump.Pipeline): def __init__(self, app, pipeline_id): super().__init__(app, pipeline_id) self.build( bspump.kafka.KafkaSource(app, self, "KafkaConnection", config={'topic': 'messages'}), bspump.kafka.KafkaSink(app, self, "KafkaConnection", config={'topic': 'messages2'}), ) There are two ways to use KafkaSink: - Specify a single topic in KafkaSink config - topic, to be used for all the events in pipeline. - Specify topic separately for each event in event context - context['kafka_topic']. Topic from configuration is than used as a default topic. To provide business logic for event distribution, you can create topic selector processor. Processor example:
class KafkaTopicSelector(bspump.Processor): def process(self, context, event): if event.get("weight") > 10: context["kafka_topic"] = "heavy" else: context["kafka_topic"] = "light" return event Every kafka message can be a key:value pair. Key is read from event context - context['kafka_key']. If kafka_key is not provided, key defaults to None. Standard Kafka configuration options can be used, as specified in librdkafka library, where the options are simply passed to: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
- ConfigDefaults: dict = {'batch.num.messages': '100000', 'batch.size': '1000000', 'linger.ms': '500', 'poll.timeout': '0.2', 'topic': 'unconfigured', 'watermark.high': '90000', 'watermark.low': '40000'}¶
- __init__(app, pipeline, connection, id=None, config=None)[source]¶
Initializes the Parameters
Parameters
- process(context, event)[source]¶
Can be implemented to return event based on a given logic.
Parameters
- context :
Additional information passed to the method.
- eventData with time stamp stored in any data type, usually it is in JSON.
You can specify an event that is passed to the method.
- Parameters:
event (bytes)
- class bspump.kafka.KafkaKeyFilter(app, pipeline, keys, id=None, config=None)[source]¶
Bases:
ProcessorKafkaKeyFilter reduces the incoming event stream from Kafka based on a key provided in each event.
Every Kafka message has a key, KafkaKeyFilter selects only those events where the event key matches one of provided ‘keys’, other events will be discarded.
Set filtering keys as a parameter (in bytes) in the KafkaKeyFilter constructor.
KafkaKeyFilter is meant to be inserted after KafkaSource in a Pipeline.
- __init__(app, pipeline, keys, id=None, config=None)[source]¶
Initializes variables
Parameters
- appApplication
Name of the `Application <https://asab.readthedocs.io/en/latest/asab/application.html`_.
- pipelinePipeline
Name of the Pipeline.
- keysbytes
keys used to filter out events from the event stream.
id : , default = None
- configJSON, default = None
configuration file in JSON
- class bspump.kafka.KafkaTopicInitializer(app, connection, id=None, config=None)[source]¶
Bases:
ConfigurableKafkaTopicInitializer reads topic configs from file or from Kafka sink/source configs, checks if they exists and creates them if they don’t.
KafkaAdminClient requires blocking connection, which is why this class doesn’t use the connection module from BSPump.
Usage: topic_initializer = KafkaTopicInitializer(app, “KafkaConnection”) topic_initializer.include_topics(MyPipeline) topic_initializer.initialize_topics()
- ConfigDefaults: dict = {'client_id': 'bspump-topic-initializer', 'num_partitions_default': 1, 'replication_factor_default': 1, 'topics_file': ''}¶
- __init__(app, connection, id=None, config=None)[source]¶
Initializes the parameters passed to the class.
Parameters
- appApplication
Name of the Application.
- connectionConnection
Information needed to create a connection.
id: typing.Optional[str] = None :
- config: dict = NoneJSON
configuration file containing important information.
- include_topics(*, topic_config=None, kafka_component=None, pipeline=None, config_file=None)[source]¶
Includes topic from config file or dict object. It can also scan Pipeline and get topics from Source or Sink.
Parameters
:
- topic_config, default= None
Topic config file.
kafka_component : , default= None
- pipeline, default= None
Name of the Pipeline.
- config_file, default= None
Configuration file.
- include_topics_from_file(topics_file)[source]¶
Includes topics from a topic file.
Parameters
- topics_file:strstr
Name of a topic file we wanted to include.
- Parameters:
topics_file (str)
KafkaConnection¶
Shared connection to a Kafka cluster.
import bspump.kafka
connection = bspump.kafka.KafkaConnection(app, "KafkaConnection")
Configuration:
[connection:KafkaConnection]
bootstrap_servers=kafka:9092
security_protocol=PLAINTEXT
group_id=my-consumer-group
Options:
bootstrap_servers- Comma-separated list of brokerssecurity_protocol- PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSLgroup_id- Consumer group IDssl_cafile- Path to CA certificatessl_certfile- Path to client certificatessl_keyfile- Path to client keysasl_mechanism- PLAIN, SCRAM-SHA-256, SCRAM-SHA-512sasl_plain_username- SASL usernamesasl_plain_password- SASL password
KafkaSource¶
Consumes messages from Kafka topics.
source = bspump.kafka.KafkaSource(
app, pipeline,
connection="KafkaConnection"
)
Configuration:
[pipeline:MyPipeline:KafkaSource]
topic=input-topic
auto_offset_reset=earliest
max_poll_records=500
Options:
topic- Single topic to consumetopics- Comma-separated list of topicsauto_offset_reset- earliest or latestmax_poll_records- Maximum records per pollmax_poll_interval_ms- Maximum time between polls
Context Keys:
The source adds these to the event context:
kafka_topic- Source topickafka_partition- Partition numberkafka_offset- Message offsetkafka_key- Message keykafka_timestamp- Message timestamp
KafkaSink¶
Produces messages to Kafka topics.
sink = bspump.kafka.KafkaSink(
app, pipeline,
connection="KafkaConnection"
)
Configuration:
[pipeline:MyPipeline:KafkaSink]
topic=output-topic
acks=all
Options:
topic- Target topicacks- Acknowledgment level (0, 1, all)batch_size- Batch size in byteslinger_ms- Time to wait for batch
Dynamic Topic:
Set the topic dynamically in a processor:
def process(self, context, event):
context["kafka_topic"] = f"events-{event['type']}"
return event
Message Key:
Set the message key for partitioning:
def process(self, context, event):
context["kafka_key"] = event["user_id"].encode()
return event
Example Pipeline¶
import bspump
import bspump.kafka
class KafkaProcessingPipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.kafka.KafkaSource(app, self, connection="KafkaConnection"),
MyProcessor(app, self),
bspump.kafka.KafkaSink(app, self, connection="KafkaConnection"),
)
app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
svc.add_connection(bspump.kafka.KafkaConnection(app, "KafkaConnection"))
svc.add_pipeline(KafkaProcessingPipeline(app, "Pipeline"))
app.run()