bspump.jupyter¶
Jupyter notebook integration for interactive pipeline development.
- bspump.jupyter.init_bitswan_jupyter(config_path=None)[source]¶
Helper function to initialize Bitswan in Jupyter environment, needs to be called before any other Bitswan function. Does not need to be called if config file is not needed
- Parameters:
config_path (str, optional) – Path to the config file. Defaults to None.
- bspump.jupyter.register_app_post_init(func)[source]¶
Ex: @register_app_post_init def post_init(app):
app.PubSub.subscribe(“Application.tick!”, app.tick)
- async bspump.jupyter.retrieve_sample_events(limit=10)[source]¶
Get sample events from the source registered to the current pipeline and register them for testing, has to be awaited in Jupyter environment
- Ex:
await retrieve_sample_events(100)
- Parameters:
limit (int, optional) – Number of events to retrieve. Defaults to 10.
- Returns:
None
- Return type:
None
- bspump.jupyter.new_pipeline(name)[source]¶
Creates and registers a new pipeline
- Parameters:
name (str) – Name of the pipeline
- bspump.jupyter.end_pipeline()[source]¶
Ends the current pipeline and appends it to the list of pipelines
- bspump.jupyter.register_source(func, test_events=None)[source]¶
Ex: @register_source def source(app, pipeline):
return bspump.socket.TCPStreamSource(app, pipeline)
- bspump.jupyter.register_sink(func)[source]¶
Ex: @register_sink def sink(app, pipeline):
return bspump.socket.TCPStreamSink(app, pipeline)
- class bspump.jupyter.App(*args, **kwargs)[source]¶
Bases:
BSPumpApplication
- exception bspump.jupyter.SkipEvent[source]¶
Bases:
ExceptionRaise this exception to drop the current event without processing it further.
The event will not be sent to the sink. This is useful for filtering events early in the processing pipeline.
Example
- if event.get(“type”) == “spam”:
raise SkipEvent()
# This code only runs for non-spam events event[“processed”] = True
- exception bspump.jupyter.FinalizeEvent(event)[source]¶
Bases:
ExceptionRaise this exception to send an event to the sink immediately, skipping any remaining processing cells.
This is useful for early exit scenarios where you want to output a result without running through all processing steps.
Example
- if event.get(“cached”):
# Send cached result directly to sink raise FinalizeEvent(event)
# This code only runs for non-cached events event[“result”] = expensive_computation(event)
- Parameters:
event – The event to send to the sink
Core Functions¶
auto_pipeline¶
Create a pipeline with automatic cell-based processing.
from bspump.jupyter import auto_pipeline
auto_pipeline(
source=lambda app, pipeline: MySource(app, pipeline),
sink=lambda app, pipeline: MySink(app, pipeline),
name="MyPipeline",
processors=[
lambda app, pipeline: MyProcessor(app, pipeline)
]
)
Parameters:
source- Lambda returning a Source instancesink- Lambda returning a Sink instancename- Pipeline nameprocessors- Optional list of processor lambdas
Decorators¶
register_connection¶
Register a connection factory.
from bspump.jupyter import register_connection
@register_connection
def kafka_connection(app):
return bspump.kafka.KafkaConnection(app, "KafkaConnection")
register_lookup¶
Register a lookup factory.
from bspump.jupyter import register_lookup
@register_lookup
def user_lookup(app):
return bspump.DictionaryLookup(app, "UserLookup", {})
register_app_post_init¶
Register a callback for after app initialization.
from bspump.jupyter import register_app_post_init
@register_app_post_init
def setup(app):
# Custom initialization
pass
Pipeline Building¶
new_pipeline¶
Start building a pipeline.
from bspump.jupyter import new_pipeline
new_pipeline("MyPipeline")
register_source¶
Register the pipeline source.
from bspump.jupyter import register_source
@register_source
def source(app, pipeline):
return MySource(app, pipeline)
register_processor¶
Register a processor.
from bspump.jupyter import register_processor
@register_processor
def processor(app, pipeline):
return MyProcessor(app, pipeline)
register_sink¶
Register the pipeline sink.
from bspump.jupyter import register_sink
@register_sink
def sink(app, pipeline):
return MySink(app, pipeline)
end_pipeline¶
Finalize pipeline building.
from bspump.jupyter import end_pipeline
end_pipeline()
Testing¶
bitswan_test_mode¶
Enable/disable test mode.
from bspump.jupyter import bitswan_test_mode
bitswan_test_mode(enabled=True)
add_test_probe¶
Add a test probe at a specific point.
from bspump.jupyter import add_test_probe
add_test_probe("after_transform")
bitswan_test_probes¶
Dictionary of test probe results.
from bspump.jupyter import bitswan_test_probes
results = bitswan_test_probes.get("after_transform", [])
bitswan_tested_pipelines¶
Dictionary of tested pipeline results.
from bspump.jupyter import bitswan_tested_pipelines
results = bitswan_tested_pipelines.get("MyPipeline")
Sampling¶
sample_events¶
Configure sample event collection.
from bspump.jupyter import sample_events
sample_events(count=10)
retrieve_sample_events¶
Retrieve collected sample events.
from bspump.jupyter import retrieve_sample_events
samples = retrieve_sample_events()
Deployment¶
deploy¶
Deploy the notebook as a standalone automation.
from bspump.jupyter import deploy
deploy()
Application Access¶
App¶
Access to the BSPump application instance.
from bspump.jupyter import App
svc = App.get_service("bspump.PumpService")
lookup = svc.locate_lookup("MyLookup")
Cell Processing¶
step¶
Process a single step (for explicit control).
from bspump.jupyter import step
@step
def process_step(event):
event["processed"] = True
return event
async_step¶
Async version of step.
from bspump.jupyter import async_step
@async_step
async def async_process_step(event):
result = await fetch_data(event["id"])
event["data"] = result
return event