Kafka Processing Pipeline¶
This pattern demonstrates consuming from Kafka, processing events, and producing to another Kafka topic.
Use Cases¶
Data transformation and enrichment
Filtering and routing events
Stream processing and aggregation
Event-driven microservices
Architecture¶
Kafka Input Topic
│
▼
┌───────────────┐
│ KafkaSource │
└───────────────┘
│
▼
┌───────────────┐
│ Processors │ Transform, filter, enrich
└───────────────┘
│
▼
┌───────────────┐
│ KafkaSink │
└───────────────┘
│
▼
Kafka Output Topic
Jupyter Implementation¶
from bspump.jupyter import *
import bspump.kafka
@register_connection
def kafka_connection(app):
return bspump.kafka.KafkaConnection(app, "KafkaConnection")
auto_pipeline(
source=lambda app, pipeline: bspump.kafka.KafkaSource(
app, pipeline, connection="KafkaConnection"
),
sink=lambda app, pipeline: bspump.kafka.KafkaSink(
app, pipeline, connection="KafkaConnection"
),
name="ProcessingPipeline",
)
Process events in notebook cells:
import json
# Parse JSON event
event = json.loads(event.decode("utf-8"))
# Transform the event
event["processed"] = True
event["processed_at"] = datetime.now().isoformat()
# Filter: drop events missing required fields
if "user_id" not in event:
event = None
# Serialize back to JSON bytes
if event:
event = json.dumps(event).encode("utf-8")
Standalone Application¶
import bspump
import bspump.kafka
import json
from datetime import datetime
class JsonParser(bspump.Processor):
def process(self, context, event):
try:
return json.loads(event.decode("utf-8"))
except (json.JSONDecodeError, UnicodeDecodeError):
return None
class TransformProcessor(bspump.Processor):
def process(self, context, event):
event["processed"] = True
event["processed_at"] = datetime.now().isoformat()
return event
class FilterProcessor(bspump.Processor):
def process(self, context, event):
if "user_id" not in event:
return None
return event
class JsonSerializer(bspump.Processor):
def process(self, context, event):
return json.dumps(event).encode("utf-8")
class KafkaProcessingPipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.kafka.KafkaSource(app, self, connection="KafkaConnection"),
JsonParser(app, self),
TransformProcessor(app, self),
FilterProcessor(app, self),
JsonSerializer(app, self),
bspump.kafka.KafkaSink(app, self, connection="KafkaConnection"),
)
if __name__ == "__main__":
app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
svc.add_connection(
bspump.kafka.KafkaConnection(app, "KafkaConnection")
)
svc.add_pipeline(KafkaProcessingPipeline(app, "KafkaProcessingPipeline"))
app.run()
Configuration¶
[connection:KafkaConnection]
bootstrap_servers=kafka:9092
group_id=my-consumer-group
[pipeline:KafkaProcessingPipeline:KafkaSource]
topic=input-topic
auto_offset_reset=earliest
[pipeline:KafkaProcessingPipeline:KafkaSink]
topic=output-topic
Multiple Output Topics¶
Route events to different topics based on content:
class RoutingProcessor(bspump.Processor):
def process(self, context, event):
# Set the output topic based on event type
event_type = event.get("type", "default")
context["kafka_topic"] = f"events-{event_type}"
return event
Consumer Groups¶
Multiple instances of the same pipeline share a consumer group for horizontal scaling:
[connection:KafkaConnection]
group_id=processing-group
Kafka automatically distributes partitions among group members.
Best Practices¶
Use consumer groups: Enable horizontal scaling
Commit offsets after processing: Ensure at-least-once delivery
Handle poison messages: Use dead-letter queues for unprocessable events
Monitor lag: Track consumer lag for performance monitoring
Design idempotent processors: Handle duplicate deliveries gracefully