Testing and Development

BSPump notebooks support interactive testing during development.

Interactive Testing

Define test events before auto_pipeline to test your processing logic:

# Imports and connections
from bspump.jupyter import *
import bspump.kafka
import json

@register_connection
def connection(app):
    return bspump.kafka.KafkaConnection(app, "KafkaConnection")

# TEST EVENT - Define sample data for development
event = json.dumps({
    "id": "test-123",
    "sender": "+1234567890",
    "recipient": "+0987654321",
    "subject": "Test Fax"
}).encode("utf8")
auto_pipeline(
    source=lambda app, pipeline: bspump.kafka.KafkaSource(
        app, pipeline, connection="KafkaConnection"
    ),
    sink=lambda app, pipeline: bspump.kafka.KafkaSink(
        app, pipeline, connection="KafkaConnection"
    ),
    name="TestPipeline",
)
# Processing - test with the sample event above
data = json.loads(event.decode("utf8"))
print(f"Processing: {data}")

data["processed"] = True
event = json.dumps(data).encode("utf8")

Now you can run the processing cells to test with your sample data.

Ignore Cells

Cells that should not be deployed can be marked with #ignore:

#ignore
import os
os.chdir("/home/coder/workspace/my-pipeline")
#ignore
# Install development dependencies
import sys
!{sys.executable} -m pip install some-package
#ignore
# Load secrets for local development
from dotenv import load_dotenv
load_dotenv("../secrets/dev")

These cells run in Jupyter but are excluded from the deployed automation.

Loading Secrets Locally

For local development, load secrets manually:

#ignore
from dotenv import load_dotenv
import os

# Load multiple secret groups
groups = ["kafka", "api", "discord"]
for group in groups:
    secrets_path = os.path.join(
        os.environ.get("BITSWAN_GITOPS_DIR", ".."),
        "secrets",
        group
    )
    if os.path.exists(secrets_path):
        load_dotenv(secrets_path)

In production, secrets are loaded automatically based on automation.toml.

Debugging Output

Use print() for debugging - output appears in Jupyter and in logs:

data = json.loads(event.decode("utf8"))
print(f"Received event: {data['id']}")
print(f"Sender: {data['sender']}")
print(f"Recipient: {data['recipient']}")

# Processing...
data["processed"] = True
print(f"Processed successfully")

event = json.dumps(data).encode("utf8")

Using PPrintSink for Development

For scheduled tasks or debugging, use PPrintSink to see output:

auto_pipeline(
    source=lambda app, pipeline: ScheduledSource(app, pipeline).on(
        CronTrigger(app, "*/5 * * * *")
    ),
    sink=lambda app, pipeline: bspump.common.PPrintSink(app, pipeline),
    name="DebugPipeline",
)

Controlling Event Flow

BSPump provides two special exceptions for controlling event flow in your processing cells. These provide a cleaner alternative to event = None and conditional logic.

SkipEvent - Drop an event without processing it further:

from bspump.jupyter import SkipEvent

# Filter out unwanted events early
if event.get("type") == "spam":
    raise SkipEvent()

# This code only runs for non-spam events
event["processed"] = True

FinalizeEvent - Send an event to the sink immediately, skipping remaining cells:

from bspump.jupyter import FinalizeEvent

# Early exit for cached results
if event.get("cached"):
    raise FinalizeEvent(event)

# This code only runs for non-cached events
event["result"] = expensive_computation(event)

These exceptions are especially useful when you have multiple processing cells and want to exit early without complex conditional logic in every cell.

Example: Filtering and Early Exit

# Cell 1: Validate and filter
if not event.get("required_field"):
    print(f"Missing required field, dropping event")
    raise SkipEvent()
# Cell 2: Check cache
cached_result = lookup_cache(event["id"])
if cached_result:
    event["result"] = cached_result
    raise FinalizeEvent(event)
# Cell 3: Expensive processing (only runs if not cached)
event["result"] = expensive_api_call(event)

Error Handling

Handle errors gracefully in your processing:

import json

try:
    data = json.loads(event.decode("utf8"))
except json.JSONDecodeError as e:
    print(f"Invalid JSON: {e}")
    event = None  # Drop invalid events
# Continue only if event wasn't dropped
if event is None:
    pass  # Skip remaining processing
else:
    # Process valid event
    data["validated"] = True
    event = json.dumps(data).encode("utf8")

For critical errors that should alert operators:

import requests
import os

def send_alert(message):
    webhook_url = os.getenv("DISCORD_WEBHOOK_URL")
    requests.post(webhook_url, json={"content": message})

try:
    result = risky_operation(data)
except Exception as e:
    send_alert(f"Pipeline error: {e}")
    event = None  # Drop event after alerting

Testing Async Operations

Async operations work directly in notebook cells:

import aiohttp

async with aiohttp.ClientSession() as session:
    async with session.get("https://api.example.com/status") as response:
        status = await response.json()
        print(f"API Status: {status}")

event["api_status"] = status

You can also use await directly:

import asyncio

# Wait before processing (e.g., for rate limiting)
if data.get("requeue"):
    await asyncio.sleep(60 * 5)  # Wait 5 minutes

# Continue processing
event = json.dumps(data).encode("utf8")