Testing and Development¶
BSPump notebooks support interactive testing during development.
Interactive Testing¶
Define test events before auto_pipeline to test your processing logic:
# Imports and connections
from bspump.jupyter import *
import bspump.kafka
import json
@register_connection
def connection(app):
return bspump.kafka.KafkaConnection(app, "KafkaConnection")
# TEST EVENT - Define sample data for development
event = json.dumps({
"id": "test-123",
"sender": "+1234567890",
"recipient": "+0987654321",
"subject": "Test Fax"
}).encode("utf8")
auto_pipeline(
source=lambda app, pipeline: bspump.kafka.KafkaSource(
app, pipeline, connection="KafkaConnection"
),
sink=lambda app, pipeline: bspump.kafka.KafkaSink(
app, pipeline, connection="KafkaConnection"
),
name="TestPipeline",
)
# Processing - test with the sample event above
data = json.loads(event.decode("utf8"))
print(f"Processing: {data}")
data["processed"] = True
event = json.dumps(data).encode("utf8")
Now you can run the processing cells to test with your sample data.
Ignore Cells¶
Cells that should not be deployed can be marked with #ignore:
#ignore
import os
os.chdir("/home/coder/workspace/my-pipeline")
#ignore
# Install development dependencies
import sys
!{sys.executable} -m pip install some-package
#ignore
# Load secrets for local development
from dotenv import load_dotenv
load_dotenv("../secrets/dev")
These cells run in Jupyter but are excluded from the deployed automation.
Loading Secrets Locally¶
For local development, load secrets manually:
#ignore
from dotenv import load_dotenv
import os
# Load multiple secret groups
groups = ["kafka", "api", "discord"]
for group in groups:
secrets_path = os.path.join(
os.environ.get("BITSWAN_GITOPS_DIR", ".."),
"secrets",
group
)
if os.path.exists(secrets_path):
load_dotenv(secrets_path)
In production, secrets are loaded automatically based on automation.toml.
Debugging Output¶
Use print() for debugging - output appears in Jupyter and in logs:
data = json.loads(event.decode("utf8"))
print(f"Received event: {data['id']}")
print(f"Sender: {data['sender']}")
print(f"Recipient: {data['recipient']}")
# Processing...
data["processed"] = True
print(f"Processed successfully")
event = json.dumps(data).encode("utf8")
Using PPrintSink for Development¶
For scheduled tasks or debugging, use PPrintSink to see output:
auto_pipeline(
source=lambda app, pipeline: ScheduledSource(app, pipeline).on(
CronTrigger(app, "*/5 * * * *")
),
sink=lambda app, pipeline: bspump.common.PPrintSink(app, pipeline),
name="DebugPipeline",
)
Controlling Event Flow¶
BSPump provides two special exceptions for controlling event flow in your
processing cells. These provide a cleaner alternative to event = None
and conditional logic.
SkipEvent - Drop an event without processing it further:
from bspump.jupyter import SkipEvent
# Filter out unwanted events early
if event.get("type") == "spam":
raise SkipEvent()
# This code only runs for non-spam events
event["processed"] = True
FinalizeEvent - Send an event to the sink immediately, skipping remaining cells:
from bspump.jupyter import FinalizeEvent
# Early exit for cached results
if event.get("cached"):
raise FinalizeEvent(event)
# This code only runs for non-cached events
event["result"] = expensive_computation(event)
These exceptions are especially useful when you have multiple processing cells and want to exit early without complex conditional logic in every cell.
Example: Filtering and Early Exit¶
# Cell 1: Validate and filter
if not event.get("required_field"):
print(f"Missing required field, dropping event")
raise SkipEvent()
# Cell 2: Check cache
cached_result = lookup_cache(event["id"])
if cached_result:
event["result"] = cached_result
raise FinalizeEvent(event)
# Cell 3: Expensive processing (only runs if not cached)
event["result"] = expensive_api_call(event)
Error Handling¶
Handle errors gracefully in your processing:
import json
try:
data = json.loads(event.decode("utf8"))
except json.JSONDecodeError as e:
print(f"Invalid JSON: {e}")
event = None # Drop invalid events
# Continue only if event wasn't dropped
if event is None:
pass # Skip remaining processing
else:
# Process valid event
data["validated"] = True
event = json.dumps(data).encode("utf8")
For critical errors that should alert operators:
import requests
import os
def send_alert(message):
webhook_url = os.getenv("DISCORD_WEBHOOK_URL")
requests.post(webhook_url, json={"content": message})
try:
result = risky_operation(data)
except Exception as e:
send_alert(f"Pipeline error: {e}")
event = None # Drop event after alerting
Testing Async Operations¶
Async operations work directly in notebook cells:
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get("https://api.example.com/status") as response:
status = await response.json()
print(f"API Status: {status}")
event["api_status"] = status
You can also use await directly:
import asyncio
# Wait before processing (e.g., for rate limiting)
if data.get("requeue"):
await asyncio.sleep(60 * 5) # Wait 5 minutes
# Continue processing
event = json.dumps(data).encode("utf8")