bspump.abc

Abstract base classes for BSPump components.

Source

class bspump.abc.source.Source(app, pipeline, id=None, config=None)[source]

Bases: Configurable

Description:

__init__(app, pipeline, id=None, config=None)[source]

Set the initial ID, Pipeline and Task.

Parameters

appApplication

Name of an Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ .

pipelineaddress of a pipeline

Name of a Pipeline.

idstr, default None

Name of a the Pipeline.

configcompatible config type , default None

Option for adding a configuration file.

async process(event, context=None)[source]

This method is used to emit event into a Pipeline.

Parameters

event: Data with time stamp stored in any data type, usually JSON.

Message or information that is passed to the method and emitted into a Pipeline.

contextdefault None

Additional information.

If there is an error in the processing of the event, the Pipeline is throttled by setting the error and the exception raised.

:hint The source should catch this exception and fail gracefully.

start(loop)[source]

Starts the Pipeline through the _main method, but if main method is implemented it starts the coroutine using main method instead.

Parameters

loop?

Contains the coroutines.

async stop()[source]

Stops the Source using self.Task. If the processes are not done it cancels them or raises an error.

Returns:

restart(loop)[source]

Restarts the loop of coroutines and returns result() method.

Parameters

loop??

Contains the coroutines.

async main()[source]

Can be implemented for additional features, else will raise NotImplementedError and _main is called instead.

async stopped()[source]

Waits for all asynchronous tasks to be completed. It is helper that simplifies the implementation of sources.

Example:

..code:: python

async def main(self):

#… initialize resources here

await self.stopped()

#… finalize resources here

locate_address()[source]

Locates address of a Pipeline.

Returns:

ID and ID of a Pipeline as a string.

rest_get()[source]
Returns:

ID and class ID

classmethod construct(app, pipeline, definition)[source]

Can create a source based on a specific definition. For example, a JSON file.

Parameters

appApplication

Name of the Application.

pipelinePipeline

Specification of a Pipeline.

definitiondict

Definition that is used to create a source.

Returns:

cls(app, newid, config)

Parameters:

definition (dict)

class bspump.abc.source.TriggerSource(app, pipeline, id=None, config=None)[source]

Bases: Source

A specialized source that responds to trigger events.

TriggerSource is designed to work with BitSwan’s trigger system. It waits for trigger events and executes a cycle() method when triggered. This allows for event-driven data processing in pipelines.

Key features: - Waits for trigger events using asyncio.Event - Executes cycle() method when triggered - Manages multiple triggers simultaneously - Handles trigger lifecycle events

Usage:
class MyTriggerSource(TriggerSource):
async def cycle(self, *args, **kwargs):

# Your trigger logic here await self.Pipeline.process({“data”: “triggered”})

# Attach to any trigger type source = MyTriggerSource(app, pipeline) source.on(some_trigger)

__init__(app, pipeline, id=None, config=None)[source]

Set the initial ID, Pipeline and Task.

Parameters

appApplication

Name of an Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ .

pipelineaddress of a pipeline

Name of a Pipeline.

idstr, default None

Name of a the Pipeline.

configcompatible config type , default None

Option for adding a configuration file.

time()[source]

Method used for measuring an accurate time.

Returns:

App.time()

Hint:

You can find more information about UTC Time in the ASAB documentation

on(trigger)[source]

Sets a Trigger which is a method that waits for a given condition.

Parameters

triggerkeyword of a trigger

Given condition that.

Returns:

Trigger.add(trigger)

async main(*args, **kwags)[source]

Waits for Pipeline, triggers, and calls exceptions when the source is initiated.

Parameters

*args : ?

**kwags : ?


async cycle(*args, **kwags)[source]

Not implemented.

Parameters

*args : ?

**kwags : ?

rest_get()[source]

Description:

Returns:

Source

Base class for continuous event sources.

class MySource(bspump.Source):
    async def main(self):
        while True:
            event = await self.get_event()
            await self.Pipeline.ready()
            await self.Pipeline.process(event)

TriggerSource

Base class for trigger-activated sources.

from bspump.abc.source import TriggerSource

class MyTriggerSource(TriggerSource):
    async def cycle(self, *args, **kwargs):
        await self.Pipeline.ready()
        event = self.generate_event()
        await self.Pipeline.process(event)

Processor

class bspump.abc.processor.ProcessorBase(app, pipeline, id=None, config=None)[source]

Bases: Configurable

Description:


__init__(app, pipeline, id=None, config=None)[source]

Initializes the Parameters

Parameters

appobject

Application object.

pipelinePipeline

Name of the Pipeline.

idstr, default=None,

ID of the class of config.

configJSON, or other compatible formats, default=None

Configuration file.

time()[source]

Accurate representation of a time in the Pipeline.

Returns:

App.time()

classmethod construct(app, pipeline, definition)[source]

Can construct a processor based on a specific definition. For example, a JSON file.

Parameters

appApplication

Name of the Application <https://asab.readthedocs.io/en/latest/asab/application.html#>_.

pipelinestr

Name of the Pipeline.

definitiondict

Set of instructions based on which processor can be constructed.

Returns:

cls(app, pipeline, id=newid, config=config)

Parameters:

definition (dict)


process(context, event)[source]

Can be implemented to return event based on a given logic.

Parameters

context :

Additional information passed to the method.

eventData with time stamp stored in any data type, usually it is in JSON.

You can specify an event that is passed to the method.

locate_address()[source]

Returns an ID of a processor and a Pipeline.

Returns:

ID of the Pipeline and self.Id.


rest_get()[source]

Description:

Returns:

class bspump.abc.processor.Processor(app, pipeline, id=None, config=None)[source]

Bases: ProcessorBase

Inherits from ProcessorBase.


Processor

Base class for event processors.

class MyProcessor(bspump.Processor):
    def process(self, context, event):
        # Transform event
        event["processed"] = True
        return event

    # Or async:
    async def process(self, context, event):
        result = await self.async_operation(event)
        event["result"] = result
        return event

Sink

class bspump.abc.sink.Sink(app, pipeline, id=None, config=None)[source]

Bases: ProcessorBase

Sink is basically a processor. It takes an event sends it to a database where it is stored.


handle_error(context, event, exception, timestamp)[source]

Sink

Base class for event sinks.

class MySink(bspump.Sink):
    def process(self, context, event):
        self.output(event)

    # Or async:
    async def process(self, context, event):
        await self.async_output(event)

Generator

class bspump.abc.generator.Generator(app, pipeline, id=None, config=None)[source]

Bases: ProcessorBase

Description:


__init__(app, pipeline, id=None, config=None)[source]

Description:

Parameters

appApplication

Name of the Application.

pipelinePipeline

Name of the Pipeline.

idstr, default = None

ID

configJSON, default = None

configuration file containing additional information.

set_depth(depth)[source]

Description:

Parameters

depth : int

process(context, event)[source]

Description:

Parameters

context :

eventany data type

information of any data type with timestamp.

async generate(context, event, depth)[source]

Description:

Parameters

context :

eventany data type

information of any data type with timestamp.

depth : int

Generator

Base class for 1-to-many event generation.

class MyGenerator(bspump.Generator):
    async def generate(self, context, event, depth):
        for item in event["items"]:
            self.Pipeline.inject(context, item, depth)

Connection

class bspump.abc.connection.Connection(app, id=None, config=None)[source]

Bases: ABC, Configurable

Connection class is responsible for creating a connection between items or services within the infrastructure of BSPump. Their main use is to create connection with the main components of BSPump: source, processor and sink.


__init__(app, id=None, config=None)[source]

Description:

Parameters

appApplication

Specification of an Application.

id : default None

configJSON or other compatible format, default None

It contains important information and data responsible for creating a connection.

time()[source]

Returns accurate time of the asynchronous process.

Hint:

More information in the ASAB documentation in UTC Time.


classmethod construct(app, definition)[source]

Creates a connection based on a specific definition. For example, a JSON file.

Parameters

appApplication

ID of the Application <https://asab.readthedocs.io/en/latest/asab/application.html#>_.

definitiondefinition format

Defines instructions for the method that can be used to create a connection.

Returns:

cls(app, newid, config)

Parameters:

definition (dict)


Connection

Base class for shared connections.

class MyConnection(bspump.Connection):
    async def connect(self):
        self.client = await create_client()

    async def disconnect(self):
        await self.client.close()

Lookup

class bspump.abc.lookup.Lookup(app, id=None, config=None, lazy=False)[source]

Bases: Configurable

Description:


Returns:

ConfigDefaults: dict = {'master_lookup_id': '', 'master_url': 'http://localhost:8080', 'master_url_endpoint': '/bspump/v1/lookup/', 'source_url': ''}
__init__(app, id=None, config=None, lazy=False)[source]

Description:

Provider: LookupProviderABC | None
time()[source]

Description:

Returns:

time


ensure_future_update(loop)[source]

Description:

Returns:


async load()[source]

Description:

Return type:

bool

serialize()[source]

Description:

deserialize(data)[source]

Description:


rest_get()[source]

Description:

Returns:

is_master()[source]

Description:

Returns:

??


class bspump.abc.lookup.MappingLookup(app, id=None, config=None, lazy=False)[source]

Bases: Lookup, Mapping

Description:


class bspump.abc.lookup.AsyncLookupMixin(app, id=None, config=None, lazy=False)[source]

Bases: Lookup

Description:

async get(key)[source]
class bspump.abc.lookup.DictionaryLookup(app, id=None, config=None, lazy=False)[source]

Bases: MappingLookup

Description:

__init__(app, id=None, config=None, lazy=False)[source]

Description:


serialize()[source]

Description:

Returns:

json data


deserialize(data)[source]

Description:


rest_get()[source]

Description:

Returns:

rest


set(dictionary)[source]

Description:


Parameters:

dictionary (dict)

Lookup

Base class for lookup tables.

class MyLookup(bspump.Lookup):
    def get(self, key):
        return self.data.get(key)

    def set(self, key, value):
        self.data[key] = value

Anomaly

class bspump.abc.anomaly.Anomaly[source]

Bases: dict

Description: Anomaly is an abstract class to be overridden for a specific anomaly and its type.

Returns:

Implement: TYPE, on_tick


TYPE = None
is_closed()[source]

Description:

Returns:

sets status to closed


close(current_time)[source]

Description:

async on_tick(current_time)[source]

Description:

Hint:

Implement to perform operations on the anomaly, f. e. close.

Anomaly

Base class for anomaly detection.