Source code for bspump.jupyter.jupyter

import asyncio
from dataclasses import dataclass
from functools import partial
import bspump
import os
from typing import Any, Callable, List

from .exceptions import SkipEvent, FinalizeEvent

# Define globals if not define already
if "bitswan_auto_pipeline" not in globals():
    __bitswan_processors = []
    __bitswan_pipelines = {}
    __bitswan_current_pipeline = None
    __bitswan_dev_runtime = None
    __bitswan_connections = []
    __bitswan_lookups = []
    _bitswan_app_post_inits = []
    bitswan_auto_pipeline = {}
    __bs_step_locals = {}
    bitswan_test_mode = []
    __bitswan_autopipeline_count = 1
    bitswan_test_probes = {}
    bitswan_tested_pipelines = set()


class DevPipeline(bspump.Pipeline):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)


class DevApp(bspump.BSPumpApplication):
    def __init__(self, args=[]):
        super().__init__(args=args)


class DevRuntime:
    def __init__(self, args=[]):
        self.events: list[tuple[str, list[Any]]] = []
        self.dev_app = DevApp(args)

    def clear(self, name: str, event: list[Any]) -> None:
        self.events = [(name, event)]

    def get_prev_events(self, name) -> tuple[list[tuple[str, list[Any]]], list[Any]]:
        new_eventss = []
        prev_events = []
        for event in self.events:
            if event[0] == name:
                break
            prev_events = event[1]
            new_eventss.append(event)
        return new_eventss, prev_events

    def step(self, name: str, func) -> None:
        new_eventss, prev_events = self.get_prev_events(name)
        new_events = [func(prev_event) for prev_event in prev_events]
        [print(event) for event in new_events]
        new_eventss.append((name, new_events))
        self.events = new_eventss

    async def async_step(self, name: str, func) -> None:
        new_eventss, prev_events = self.get_prev_events(name)
        new_events = []

        async def inject(event):
            new_events.append(event)

        [await func(inject, prev_event) for prev_event in prev_events]
        [print(event) for event in new_events]
        new_eventss.append((name, new_events))
        self.events = new_eventss


[docs] def auto_pipeline(source=None, sink=None, name=None): if source is None: raise Exception( "When calling auto_pipeline must specify a function that returns a source." ) if sink is None: raise Exception( "When calling auto_pipeline must specify a function that returns a sink." ) global __bitswan_autopipeline_count if name is None: pipeline_name = f"auto_pipeline_{__bitswan_autopipeline_count}" else: pipeline_name = f"{name}" new_pipeline(pipeline_name) __bitswan_autopipeline_count += 1 import inspect frame = inspect.currentframe() register_source(source, test_events=frame.f_back.f_locals.get("test_events")) bitswan_auto_pipeline["sink"] = sink
async def test_devruntime(): test_runtime = DevRuntime() def test_func(prev_event): return prev_event + 1 tr = test_runtime tr.clear("sample", [1, 2, 3]) tr.step("step1", test_func) tr.step("step2", test_func) tr.step("step3", test_func) assert tr.events == [ ("sample", [1, 2, 3]), ("step1", [2, 3, 4]), ("step2", [3, 4, 5]), ("step3", [4, 5, 6]), ] def test_func1(prev_event): return prev_event + 7 tr.step("step2", test_func1) assert tr.events == [ ("sample", [1, 2, 3]), ("step1", [2, 3, 4]), ("step2", [9, 10, 11]), ] async def test_func2(injector, prev_event): if prev_event % 2 == 0: injector(prev_event + 1) await tr.async_step("step3", test_func2) assert tr.events == [ ("sample", [1, 2, 3]), ("step1", [2, 3, 4]), ("step2", [9, 10, 11]), ("step3", [11]), ] def is_running_in_jupyter(): try: from IPython import get_ipython # Check if the IPython kernel is running which is a strong indication of a Jupyter environment if "IPKernelApp" in get_ipython().config: return True if ( "VSCODE_PID" in os.environ ): # Check for Visual Studio Code's Jupyter extension return True return False except Exception: return False __bitswan_dev = is_running_in_jupyter() def ensure_bitswan_runtime(func): def wrapper(*args, **kwargs): global __bitswan_dev_runtime if not __bitswan_dev_runtime: __bitswan_dev_runtime = DevRuntime() return func(*args, **kwargs) return wrapper
[docs] def init_bitswan_jupyter(config_path: str = None): """Helper function to initialize Bitswan in Jupyter environment, needs to be called before any other Bitswan function. Does not need to be called if config file is not needed Args: config_path (str, optional): Path to the config file. Defaults to None. """ global __bitswan_dev_runtime args = ["-c", config_path] if config_path else [] __bitswan_dev_runtime = DevRuntime(args)
[docs] def add_test_probe(name): global bitswan_test_mode global bitswan_test_probes if bitswan_test_mode: import inspect frame = inspect.currentframe() try: probe, expected = bitswan_test_probes.get(name, (None, None)) if probe is not None: print(f" │ Probing {name}.") probed = probe(frame.f_back.f_locals, frame.f_back.f_globals) if not probed == expected: print( " └ \033[91m" + f"Probe {name} failed. Got {probed} expected {expected}." + "\033[0m" ) exit(1) finally: del frame
[docs] @ensure_bitswan_runtime def sample_events(events: List[Any]): """Inject sample events into the current pipeline for testing Args: events (List[Any]): List of events to be injected """ global __bitswan_dev_runtime __bitswan_dev_runtime.clear("__sample", events)
[docs] def register_app_post_init(func): """ Ex: @register_app_post_init def post_init(app): app.PubSub.subscribe("Application.tick!", app.tick) """ global _bitswan_app_post_inits _bitswan_app_post_inits.append(func)
[docs] @ensure_bitswan_runtime def register_connection(func): """ Ex: @register_connection def connection(app): return bspump.kafka.KafkaConnection(app, "KafkaConnection") """ global __bitswan_connections global __bitswan_dev_runtime __bitswan_connections.append(func) if __bitswan_dev: global __bitswan_dev_runtime connection = func(__bitswan_dev_runtime.dev_app) __bitswan_dev_runtime.dev_app.PumpService.add_connection(connection)
[docs] async def retrieve_sample_events(limit: int = 10) -> None: """Get sample events from the source registered to the current pipeline and register them for testing, has to be awaited in Jupyter environment Ex: await retrieve_sample_events(100) Args: limit (int, optional): Number of events to retrieve. Defaults to 10. Returns: None """ global __bitswan_dev_runtime # Capture the current state of __bitswan_processors current_processors = list(__bitswan_processors) class TmpPipeline(bspump.Pipeline): def __init__(self, app, pipeline_id): super().__init__(app, pipeline_id) processors = [] for processor in current_processors: instance = processor(app, self) processors.append(instance) self.build(*processors) self.events = [] def inject(self, context, event, depth): if len(self.events) >= limit: return print(event) self.events.append(event) def get_events(self): print(f"Collected {len(self.events)} events") return self.events pipeline = TmpPipeline(__bitswan_dev_runtime.dev_app, __bitswan_current_pipeline) pipeline.start() try: while 1: if len(pipeline.events) >= limit: await pipeline.stop() break await asyncio.sleep(0.5) except asyncio.CancelledError: await pipeline.stop() __bitswan_dev_runtime.clear("__sample", pipeline.get_events()) return
[docs] @ensure_bitswan_runtime def register_lookup(func): """ Ex: @register_lookup def lookup(app): return ENodeBLookup(app, id='ENodeBLookup') """ global __bitswan_lookups __bitswan_lookups.append(func) if __bitswan_dev: global __bitswan_dev_runtime lookup = func(__bitswan_dev_runtime.dev_app) __bitswan_dev_runtime.dev_app.PumpService.add_lookup(lookup)
[docs] def new_pipeline(name: str): """Creates and registers a new pipeline Args: name (str): Name of the pipeline """ global __bitswan_processors global __bitswan_current_pipeline __bitswan_processors = [] __bitswan_current_pipeline = name
[docs] def end_pipeline(): """Ends the current pipeline and appends it to the list of pipelines""" global __bitswan_current_pipeline global __bitswan_processors global __bitswan_pipelines global __bitswan_dev if __bitswan_dev: return # Capture the current state of __bitswan_processors current_processors = list(__bitswan_processors) class Pipeline(bspump.Pipeline): def __init__(self, app, pipeline_id): super().__init__(app, pipeline_id) processors = [] for processor in current_processors: instance = processor(app, self) processors.append(instance) self.build(*processors) # Append the new Pipeline class __bitswan_pipelines[__bitswan_current_pipeline] = Pipeline
[docs] def register_source(func, test_events=None): """ Ex: @register_source def source(app, pipeline): return bspump.socket.TCPStreamSource(app, pipeline) """ if test_events is None: import inspect frame = inspect.currentframe() test_events = frame.f_back.f_locals.get("test_events", {}) global __bitswan_processors global bitswan_test_mode if bitswan_test_mode: import bspump.test def test_source(app, pipeline): return bspump.test.TestSource(app, pipeline, test_events=test_events) __bitswan_processors.append(test_source) else: __bitswan_processors.append(func)
[docs] @ensure_bitswan_runtime def register_processor(func): """ Ex: @register_processor def processor(app, pipeline): return bspump.socket.TCPStreamProcessor(app, pipeline) """ global __bitswan_processors global __bitswan_dev global __bitswan_dev_runtime global __bitswan_current_pipeline if not __bitswan_dev: __bitswan_processors.append(func) else: pipeline = DevPipeline( app=__bitswan_dev_runtime.dev_app, id=__bitswan_current_pipeline ) processor = func(__bitswan_dev_runtime.dev_app, pipeline) callable_process = partial(processor.process, processor) __bitswan_dev_runtime.step(func.__name__, callable_process)
[docs] @ensure_bitswan_runtime def register_generator(func): """ Ex: @register_generator def generator(app, pipeline): return MyGeneratorClass(app, pipeline) """ async def _fn(): global __bitswan_processors global __bitswan_dev global __bitswan_dev_runtime global __bitswan_current_pipeline if not __bitswan_dev: # TODO: check this __bitswan_processors.append(func) else: pipeline = DevPipeline( __bitswan_dev_runtime.dev_app, __bitswan_current_pipeline ) generator = func(__bitswan_dev_runtime.dev_app, pipeline) async def asfunc(inject, event): async def super_inject(context, event, depth): return await inject(event) pipeline.inject = super_inject return await generator.generate(None, event, 0) await __bitswan_dev_runtime.async_step(func.__name__, asfunc) return func def wrapper(*args, **kwargs): import asyncio import nest_asyncio nest_asyncio.apply() loop = asyncio.get_event_loop() task = asyncio.ensure_future(_fn()) loop.run_until_complete(task) wrapper() return _fn
[docs] def register_sink(func): """ Ex: @register_sink def sink(app, pipeline): return bspump.socket.TCPStreamSink(app, pipeline) """ global __bitswan_processors global bitswan_test_mode if bitswan_test_mode: import bspump.test def test_sink(app, pipeline): return bspump.test.TestSink(app, pipeline) __bitswan_processors.append(test_sink) else: __bitswan_processors.append(func)
def snake_to_camel_case(name): """Convert snake_case name to CamelCase.""" return "".join(word.capitalize() for word in name.split("_"))
[docs] @ensure_bitswan_runtime def step(func: Callable[[Any], Any]): """Decorator that registers a new processor with the given function Ex: @step def my_processor(event): return {"processed": event} Args: func (Callable[[Any], Any]): Function to be registered as a processor """ global __bitswan_processors global __bitswan_dev global __bitswan_dev_runtime if not __bitswan_dev: # Convert function name from snake case to CamelCase and create a unique class name class_name = snake_to_camel_case(func.__name__) + "Processor" # Dynamically create a new Processor class with the custom class name CustomProcessor = type( class_name, (bspump.Processor,), {"process": lambda self, context, event: func(event)}, ) # Append the new Processor to the __bitswan_processors list __bitswan_processors.append(CustomProcessor) else: __bitswan_dev_runtime.step(func.__name__, func) # Return the original function unmodified return func
[docs] def async_step(func): # Convert function name from snake case to CamelCase and create a unique class name global __bitswan_dev class_name = snake_to_camel_case(func.__name__) + "Generator" # Dynamically create a new Generator class with the custom class name async def _generate(self, context, event, depth): async def injector(event): # TODO: do better if __bitswan_dev: return await self.Pipeline.inject(context, event, depth) else: return self.Pipeline.inject(context, event, depth) try: return await func(injector, event) except SkipEvent: # Event is dropped - do not call injector return except FinalizeEvent as e: # Send the event to sink immediately, skip remaining processing return await injector(e.event) CustomGenerator = type( class_name, (bspump.Generator,), # Async generate function calls func with injector and event. The injector is taken from the pipeline. {"generate": _generate}, ) def generator(app, pipeline): return CustomGenerator(app, pipeline) generator.__name__ = func.__name__ register_generator(generator)
def _init_pipelines(app, service): global __bitswan_pipelines for name, pipeline in __bitswan_pipelines.items(): service.add_pipeline(pipeline(app, name)) def _init_connections(app, service): global __bitswan_connections for connection in __bitswan_connections: service.add_connection(connection(app)) def _init_lookups(app, service): global __bitswan_lookups for lookup in __bitswan_lookups: service.add_lookup(lookup(app))
[docs] class App(bspump.BSPumpApplication):
[docs] def init_componets(self): global _bitswan_app_post_inits svc = self.get_service("bspump.PumpService") _init_connections(self, svc) _init_lookups(self, svc) _init_pipelines(self, svc) for func in _bitswan_app_post_inits: func(self)
[docs] def deploy(): import os import json deploy_secret = os.environ.get("BITSWAN_DEPLOY_SECRET", None) is_vscode = "VSCODE_PID" in os.environ @dataclass class DeployDetails: notebook_json: dict[str, Any] deploy_secret: str deploy_url: str def get_deploy_details() -> DeployDetails | None: """Get the notebook JSON, deploy secret and deploy URL Raises: e: RuntimeError: No VSCode or Colab environment detected. Please run this in a different environment. Returns: DeployDetails | None: DeployDetails object containing the notebook JSON, deploy secret and deploy URL """ if is_vscode: from IPython import get_ipython ip = get_ipython() if not ip: return None notebook_path = os.path.abspath(ip.user_ns.get("__vsc_ipynb_file__", "")) if not notebook_path or not notebook_path.endswith(".ipynb"): return None with open(notebook_path, "r", encoding="utf-8") as f: notebook_json = json.load(f) details = DeployDetails( json.dumps(notebook_json, sort_keys=True, indent=2), os.environ["BITSWAN_DEPLOY_SECRET"], os.environ[ "BITSWAN_DEPLOY_URL" ], # raises KeyError if not set, to match google colab behavior ) return details else: try: from google.colab import _message, userdata except ImportError: raise RuntimeError( "No VSCode or Colab environment detected. Please run this in a different environment." ) notebook_json_string = _message.blocking_request( "get_ipynb", request="", timeout_sec=None ) details = DeployDetails( json.dumps(notebook_json_string["ipynb"], sort_keys=True, indent=2), userdata.get("BITSWAN_DEPLOY_SECRET"), userdata.get("BITSWAN_DEPLOY_URL"), ) return details deploy_details = get_deploy_details() if not deploy_details: print("Error retrieving notebook contents") return deploy_url = os.path.join( deploy_details.deploy_url, "__jupyter-deploy-pipeline/", "?secret=" + deploy_secret + "&restart=true", ) print("Packing for deployment...") import zipfile with zipfile.ZipFile("main.zip", "w") as myzip: myzip.writestr("main.ipynb", deploy_details.notebook_json) import requests print("Uploading to server..") with open("main.zip", "rb") as f: r = requests.post(deploy_url, data=f) print(json.loads(r.text)["status"])