auto_pipeline Reference

The auto_pipeline function creates a pipeline where notebook cells after the call become the processing logic.

Function Signature

auto_pipeline(
    source,      # Lambda returning a Source
    sink,        # Lambda returning a Sink
    name,        # Pipeline name (string)
    processors=None  # Optional list of explicit processors
)

Parameters

source (required)

A lambda function that receives (app, pipeline) and returns a Source instance.

source=lambda app, pipeline: bspump.kafka.KafkaSource(
    app, pipeline, connection="KafkaConnection"
)
sink (required)

A lambda function that receives (app, pipeline) and returns a Sink instance.

sink=lambda app, pipeline: bspump.kafka.KafkaSink(
    app, pipeline, connection="KafkaConnection"
)
name (required)

A string identifier for the pipeline.

name="MyProcessingPipeline"
processors (optional)

A list of lambdas returning explicit Processor instances. These run before the notebook cell processing.

processors=[
    lambda app, pipeline: MyProcessor(app, pipeline)
]

Common Source Patterns

Kafka Source

auto_pipeline(
    source=lambda app, pipeline: bspump.kafka.KafkaSource(
        app, pipeline, connection="KafkaConnection"
    ),
    ...
)

HTTP Webhook Source

auto_pipeline(
    source=lambda app, pipeline: bspump.http.web.source.WebHookSource(
        app, pipeline,
        config={
            "port": 8080,
            "path": "/webhook",
            "secret_qparam": os.environ.get("WEBHOOK_SECRET"),
        }
    ),
    ...
)

Cron Trigger Source

from bspump.abc.source import TriggerSource
from bspump.trigger import CronTrigger
from datetime import datetime

class ScheduledSource(TriggerSource):
    async def cycle(self, *args, **kwargs):
        await self.Pipeline.ready()
        await self.Pipeline.process({"triggered_at": datetime.now()})

auto_pipeline(
    source=lambda app, pipeline: ScheduledSource(app, pipeline).on(
        CronTrigger(app, "*/15 * * * *")  # Every 15 minutes
    ),
    ...
)

Common Sink Patterns

Kafka Sink

auto_pipeline(
    ...
    sink=lambda app, pipeline: bspump.kafka.KafkaSink(
        app, pipeline, connection="KafkaConnection"
    ),
)

PPrint Sink (for debugging/scheduled tasks)

auto_pipeline(
    ...
    sink=lambda app, pipeline: bspump.common.PPrintSink(app, pipeline),
)

Null Sink (for tasks with side effects only)

auto_pipeline(
    ...
    sink=lambda app, pipeline: bspump.common.NullSink(app, pipeline),
)

Connection Registration

Register connections before auto_pipeline using the @register_connection decorator:

from bspump.jupyter import *
import bspump.kafka

@register_connection
def kafka_connection(app):
    return bspump.kafka.KafkaConnection(app, "KafkaConnection")

Multiple connections:

@register_connection
def kafka_connection(app):
    return bspump.kafka.KafkaConnection(app, "KafkaConnection")

@register_connection
def postgres_connection(app):
    return bspump.postgresql.PostgreSQLConnection(app, "PostgreSQLConnection")

Processing After auto_pipeline

Cells after auto_pipeline process each event. The event variable contains the incoming data.

Basic transformation:

import json

# Parse JSON
event = json.loads(event.decode("utf8"))

# Transform
event["processed"] = True

# Serialize back
event = json.dumps(event).encode("utf8")

Filtering (drop events):

data = json.loads(event.decode("utf8"))

if data.get("type") == "spam":
    event = None  # Drop this event
else:
    event = json.dumps(data).encode("utf8")

Async operations:

import aiohttp

data = json.loads(event.decode("utf8"))

async with aiohttp.ClientSession() as session:
    async with session.post(API_URL, json=data) as response:
        result = await response.json()
        data["api_result"] = result

event = json.dumps(data).encode("utf8")

Multiple cells:

Each cell runs in sequence. The event variable persists between cells:

# Cell 1: Parse
event = json.loads(event.decode("utf8"))
# Cell 2: Validate
if "id" not in event:
    raise ValueError("Missing id")
# Cell 3: Transform
event["validated"] = True
# Cell 4: Serialize
event = json.dumps(event).encode("utf8")

Environment Variables

Access secrets via environment variables:

import os

api_token = os.getenv("API_TOKEN")
webhook_url = os.getenv("DISCORD_WEBHOOK_URL")

See Secrets Management for configuring secrets in BitSwan.