bspump.jupyter

Jupyter notebook integration for interactive pipeline development.

bspump.jupyter.deploy()[source]
bspump.jupyter.init_bitswan_jupyter(config_path=None)[source]

Helper function to initialize Bitswan in Jupyter environment, needs to be called before any other Bitswan function. Does not need to be called if config file is not needed

Parameters:

config_path (str, optional) – Path to the config file. Defaults to None.

bspump.jupyter.sample_events(*args, **kwargs)[source]
bspump.jupyter.register_app_post_init(func)[source]

Ex: @register_app_post_init def post_init(app):

app.PubSub.subscribe(“Application.tick!”, app.tick)

bspump.jupyter.register_connection(*args, **kwargs)[source]
async bspump.jupyter.retrieve_sample_events(limit=10)[source]

Get sample events from the source registered to the current pipeline and register them for testing, has to be awaited in Jupyter environment

Ex:

await retrieve_sample_events(100)

Parameters:

limit (int, optional) – Number of events to retrieve. Defaults to 10.

Returns:

None

Return type:

None

bspump.jupyter.register_lookup(*args, **kwargs)[source]
bspump.jupyter.new_pipeline(name)[source]

Creates and registers a new pipeline

Parameters:

name (str) – Name of the pipeline

bspump.jupyter.end_pipeline()[source]

Ends the current pipeline and appends it to the list of pipelines

bspump.jupyter.register_source(func, test_events=None)[source]

Ex: @register_source def source(app, pipeline):

return bspump.socket.TCPStreamSource(app, pipeline)

bspump.jupyter.register_processor(*args, **kwargs)[source]
bspump.jupyter.register_generator(*args, **kwargs)[source]
bspump.jupyter.register_sink(func)[source]

Ex: @register_sink def sink(app, pipeline):

return bspump.socket.TCPStreamSink(app, pipeline)

bspump.jupyter.step(*args, **kwargs)[source]
bspump.jupyter.async_step(func)[source]
class bspump.jupyter.App(*args, **kwargs)[source]

Bases: BSPumpApplication

init_componets()[source]
bspump.jupyter.auto_pipeline(source=None, sink=None, name=None)[source]
bspump.jupyter.add_test_probe(name)[source]
exception bspump.jupyter.SkipEvent[source]

Bases: Exception

Raise this exception to drop the current event without processing it further.

The event will not be sent to the sink. This is useful for filtering events early in the processing pipeline.

Example

if event.get(“type”) == “spam”:

raise SkipEvent()

# This code only runs for non-spam events event[“processed”] = True

exception bspump.jupyter.FinalizeEvent(event)[source]

Bases: Exception

Raise this exception to send an event to the sink immediately, skipping any remaining processing cells.

This is useful for early exit scenarios where you want to output a result without running through all processing steps.

Example

if event.get(“cached”):

# Send cached result directly to sink raise FinalizeEvent(event)

# This code only runs for non-cached events event[“result”] = expensive_computation(event)

Parameters:

event – The event to send to the sink

__init__(event)[source]

Core Functions

auto_pipeline

Create a pipeline with automatic cell-based processing.

from bspump.jupyter import auto_pipeline

auto_pipeline(
    source=lambda app, pipeline: MySource(app, pipeline),
    sink=lambda app, pipeline: MySink(app, pipeline),
    name="MyPipeline",
    processors=[
        lambda app, pipeline: MyProcessor(app, pipeline)
    ]
)

Parameters:

  • source - Lambda returning a Source instance

  • sink - Lambda returning a Sink instance

  • name - Pipeline name

  • processors - Optional list of processor lambdas

Decorators

register_connection

Register a connection factory.

from bspump.jupyter import register_connection

@register_connection
def kafka_connection(app):
    return bspump.kafka.KafkaConnection(app, "KafkaConnection")

register_lookup

Register a lookup factory.

from bspump.jupyter import register_lookup

@register_lookup
def user_lookup(app):
    return bspump.DictionaryLookup(app, "UserLookup", {})

register_app_post_init

Register a callback for after app initialization.

from bspump.jupyter import register_app_post_init

@register_app_post_init
def setup(app):
    # Custom initialization
    pass

Pipeline Building

new_pipeline

Start building a pipeline.

from bspump.jupyter import new_pipeline

new_pipeline("MyPipeline")

register_source

Register the pipeline source.

from bspump.jupyter import register_source

@register_source
def source(app, pipeline):
    return MySource(app, pipeline)

register_processor

Register a processor.

from bspump.jupyter import register_processor

@register_processor
def processor(app, pipeline):
    return MyProcessor(app, pipeline)

register_sink

Register the pipeline sink.

from bspump.jupyter import register_sink

@register_sink
def sink(app, pipeline):
    return MySink(app, pipeline)

end_pipeline

Finalize pipeline building.

from bspump.jupyter import end_pipeline

end_pipeline()

Testing

bitswan_test_mode

Enable/disable test mode.

from bspump.jupyter import bitswan_test_mode

bitswan_test_mode(enabled=True)

add_test_probe

Add a test probe at a specific point.

from bspump.jupyter import add_test_probe

add_test_probe("after_transform")

bitswan_test_probes

Dictionary of test probe results.

from bspump.jupyter import bitswan_test_probes

results = bitswan_test_probes.get("after_transform", [])

bitswan_tested_pipelines

Dictionary of tested pipeline results.

from bspump.jupyter import bitswan_tested_pipelines

results = bitswan_tested_pipelines.get("MyPipeline")

Sampling

sample_events

Configure sample event collection.

from bspump.jupyter import sample_events

sample_events(count=10)

retrieve_sample_events

Retrieve collected sample events.

from bspump.jupyter import retrieve_sample_events

samples = retrieve_sample_events()

Deployment

deploy

Deploy the notebook as a standalone automation.

from bspump.jupyter import deploy

deploy()

Application Access

App

Access to the BSPump application instance.

from bspump.jupyter import App

svc = App.get_service("bspump.PumpService")
lookup = svc.locate_lookup("MyLookup")

Cell Processing

step

Process a single step (for explicit control).

from bspump.jupyter import step

@step
def process_step(event):
    event["processed"] = True
    return event

async_step

Async version of step.

from bspump.jupyter import async_step

@async_step
async def async_process_step(event):
    result = await fetch_data(event["id"])
    event["data"] = result
    return event