Jupyter Integration¶
BSPump provides first-class Jupyter notebook integration for building data pipelines interactively. Notebooks are compiled into production-ready automations.
Overview¶
BSPump notebooks have a specific structure:
Before auto_pipeline: Imports, connections, lookups, helper functions, and test data
The auto_pipeline cell: Defines the source and sink (always in its own cell)
After auto_pipeline: Processing logic that runs for each event
┌─────────────────────────────────────┐
│ BEFORE auto_pipeline │
│ - Imports │
│ - @register_connection │
│ - Helper functions │
│ - Test event data │
└─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ auto_pipeline(...) │
│ - Defines source and sink │
│ - Always in its own cell │
└─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ AFTER auto_pipeline │
│ - Event processing logic │
│ - Becomes an async processor │
│ - Can use await │
└─────────────────────────────────────┘
When deployed, the cells after auto_pipeline become a single async
processor in the pipeline. This allows you to:
Quickly create single-step pipelines
Interactively test computations with sample data
Use regular Python without writing processor classes
Quick Start¶
Here’s a minimal webhook-to-Kafka pipeline:
# Cell 1: Imports and connection
from bspump.jupyter import *
import bspump.http.web.source
import bspump.kafka
import json
@register_connection
def connection(app):
return bspump.kafka.KafkaConnection(app, "KafkaConnection")
# Cell 2: auto_pipeline (always in its own cell)
auto_pipeline(
source=lambda app, pipeline: bspump.http.web.source.WebHookSource(
app, pipeline,
config={"port": 8080, "path": "/webhook"}
),
sink=lambda app, pipeline: bspump.kafka.KafkaSink(
app, pipeline, connection="KafkaConnection"
),
name="Webhook2KafkaPipeline",
)
# Cell 3: Processing logic (runs for each event)
print("Received webhook")
data = json.loads(event)
data["processed"] = True
event = json.dumps(data).encode("utf8")
Interactive Development¶
During development, define a test event before auto_pipeline to test
your processing logic:
# Test event for interactive development
event = {
"id": "test-123",
"sender": "+1234567890",
"recipient": "+0987654321",
}
event = json.dumps(event).encode("utf8")
Now you can run the cells after auto_pipeline to test your processing
logic with this sample data.