Kafka Integration

BSPump provides comprehensive Kafka integration through bspump.kafka.

Installation

pip install aiokafka

Components

  • KafkaConnection: Shared connection to Kafka cluster

  • KafkaSource: Consumes messages from Kafka topics

  • KafkaSink: Produces messages to Kafka topics

KafkaConnection

import bspump.kafka

connection = bspump.kafka.KafkaConnection(app, "KafkaConnection")

Configuration:

[connection:KafkaConnection]
bootstrap_servers=kafka1:9092,kafka2:9092,kafka3:9092
security_protocol=PLAINTEXT
group_id=my-consumer-group

# SSL configuration (optional)
# security_protocol=SSL
# ssl_cafile=/path/to/ca.pem
# ssl_certfile=/path/to/cert.pem
# ssl_keyfile=/path/to/key.pem

# SASL configuration (optional)
# security_protocol=SASL_SSL
# sasl_mechanism=PLAIN
# sasl_plain_username=user
# sasl_plain_password=${KAFKA_PASSWORD}

KafkaSource

Consumes messages from one or more Kafka topics.

import bspump.kafka

source = bspump.kafka.KafkaSource(
    app, pipeline,
    connection="KafkaConnection"
)

Configuration:

[pipeline:MyPipeline:KafkaSource]
topic=my-topic
# Or multiple topics
# topics=topic1,topic2,topic3

# Consumer settings
auto_offset_reset=earliest
# Options: earliest, latest

# Batch settings
max_poll_records=500
max_poll_interval_ms=300000

KafkaSink

Produces messages to a Kafka topic.

import bspump.kafka

sink = bspump.kafka.KafkaSink(
    app, pipeline,
    connection="KafkaConnection"
)

Configuration:

[pipeline:MyPipeline:KafkaSink]
topic=output-topic

# Producer settings
acks=all
# Options: 0, 1, all

# Batching
batch_size=16384
linger_ms=0

Dynamic Topic Routing

Set the topic dynamically in a processor:

class RouterProcessor(bspump.Processor):
    def process(self, context, event):
        event_type = event.get("type", "default")
        context["kafka_topic"] = f"events-{event_type}"
        return event

Message Keys

Set a message key for partitioning:

class KeyProcessor(bspump.Processor):
    def process(self, context, event):
        # Events with the same key go to the same partition
        context["kafka_key"] = event.get("user_id", "").encode()
        return event

Complete Example

from bspump.jupyter import *
import bspump.kafka
import json

@register_connection
def kafka_connection(app):
    return bspump.kafka.KafkaConnection(app, "KafkaConnection")

auto_pipeline(
    source=lambda app, pipeline: bspump.kafka.KafkaSource(
        app, pipeline, connection="KafkaConnection"
    ),
    sink=lambda app, pipeline: bspump.kafka.KafkaSink(
        app, pipeline, connection="KafkaConnection"
    ),
    name="KafkaPipeline",
)

# Process events
event = json.loads(event.decode("utf-8"))
event["processed"] = True
event = json.dumps(event).encode("utf-8")

Configuration Reference

Connection Options

Option

Default

Description

bootstrap_servers

localhost:9092

Comma-separated list of brokers

security_protocol

PLAINTEXT

PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL

group_id

bspump

Consumer group ID

Source Options

Option

Default

Description

topic

(required)

Topic to consume from

auto_offset_reset

latest

Where to start: earliest or latest

max_poll_records

500

Max records per poll

Sink Options

Option

Default

Description

topic

(required)

Topic to produce to

acks

1

Acknowledgment level: 0, 1, or all