Kafka Processing Pipeline

This pattern demonstrates consuming from Kafka, processing events, and producing to another Kafka topic.

Use Cases

  • Data transformation and enrichment

  • Filtering and routing events

  • Stream processing and aggregation

  • Event-driven microservices

Architecture

Kafka Input Topic
      │
      ▼
┌───────────────┐
│  KafkaSource  │
└───────────────┘
      │
      ▼
┌───────────────┐
│  Processors   │ Transform, filter, enrich
└───────────────┘
      │
      ▼
┌───────────────┐
│  KafkaSink    │
└───────────────┘
      │
      ▼
Kafka Output Topic

Jupyter Implementation

from bspump.jupyter import *
import bspump.kafka

@register_connection
def kafka_connection(app):
    return bspump.kafka.KafkaConnection(app, "KafkaConnection")

auto_pipeline(
    source=lambda app, pipeline: bspump.kafka.KafkaSource(
        app, pipeline, connection="KafkaConnection"
    ),
    sink=lambda app, pipeline: bspump.kafka.KafkaSink(
        app, pipeline, connection="KafkaConnection"
    ),
    name="ProcessingPipeline",
)

Process events in notebook cells:

import json

# Parse JSON event
event = json.loads(event.decode("utf-8"))
# Transform the event
event["processed"] = True
event["processed_at"] = datetime.now().isoformat()

# Filter: drop events missing required fields
if "user_id" not in event:
    event = None
# Serialize back to JSON bytes
if event:
    event = json.dumps(event).encode("utf-8")

Standalone Application

import bspump
import bspump.kafka
import json
from datetime import datetime

class JsonParser(bspump.Processor):
    def process(self, context, event):
        try:
            return json.loads(event.decode("utf-8"))
        except (json.JSONDecodeError, UnicodeDecodeError):
            return None

class TransformProcessor(bspump.Processor):
    def process(self, context, event):
        event["processed"] = True
        event["processed_at"] = datetime.now().isoformat()
        return event

class FilterProcessor(bspump.Processor):
    def process(self, context, event):
        if "user_id" not in event:
            return None
        return event

class JsonSerializer(bspump.Processor):
    def process(self, context, event):
        return json.dumps(event).encode("utf-8")

class KafkaProcessingPipeline(bspump.Pipeline):
    def __init__(self, app, pipeline_id):
        super().__init__(app, pipeline_id)
        self.build(
            bspump.kafka.KafkaSource(app, self, connection="KafkaConnection"),
            JsonParser(app, self),
            TransformProcessor(app, self),
            FilterProcessor(app, self),
            JsonSerializer(app, self),
            bspump.kafka.KafkaSink(app, self, connection="KafkaConnection"),
        )

if __name__ == "__main__":
    app = bspump.BSPumpApplication()
    svc = app.get_service("bspump.PumpService")

    svc.add_connection(
        bspump.kafka.KafkaConnection(app, "KafkaConnection")
    )
    svc.add_pipeline(KafkaProcessingPipeline(app, "KafkaProcessingPipeline"))

    app.run()

Configuration

[connection:KafkaConnection]
bootstrap_servers=kafka:9092
group_id=my-consumer-group

[pipeline:KafkaProcessingPipeline:KafkaSource]
topic=input-topic
auto_offset_reset=earliest

[pipeline:KafkaProcessingPipeline:KafkaSink]
topic=output-topic

Multiple Output Topics

Route events to different topics based on content:

class RoutingProcessor(bspump.Processor):
    def process(self, context, event):
        # Set the output topic based on event type
        event_type = event.get("type", "default")
        context["kafka_topic"] = f"events-{event_type}"
        return event

Consumer Groups

Multiple instances of the same pipeline share a consumer group for horizontal scaling:

[connection:KafkaConnection]
group_id=processing-group

Kafka automatically distributes partitions among group members.

Best Practices

  1. Use consumer groups: Enable horizontal scaling

  2. Commit offsets after processing: Ensure at-least-once delivery

  3. Handle poison messages: Use dead-letter queues for unprocessable events

  4. Monitor lag: Track consumer lag for performance monitoring

  5. Design idempotent processors: Handle duplicate deliveries gracefully