bspump.abc¶
Abstract base classes for BSPump components.
Source¶
- class bspump.abc.source.Source(app, pipeline, id=None, config=None)[source]¶
Bases:
ConfigurableDescription:
- __init__(app, pipeline, id=None, config=None)[source]¶
Set the initial ID,
Pipelineand Task.Parameters
- appApplication
Name of an Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ .
- pipelineaddress of a pipeline
Name of a
Pipeline.- idstr, default None
Name of a the
Pipeline.- configcompatible config type , default None
Option for adding a configuration file.
- async process(event, context=None)[source]¶
This method is used to emit event into a
Pipeline.Parameters
- event: Data with time stamp stored in any data type, usually JSON.
Message or information that is passed to the method and emitted into a
Pipeline.- contextdefault None
Additional information.
If there is an error in the processing of the event, the
Pipelineis throttled by setting the error and the exception raised.:hint The source should catch this exception and fail gracefully.
- start(loop)[source]¶
Starts the
Pipelinethrough the _main method, but if main method is implemented it starts the coroutine using main method instead.Parameters
- loop?
Contains the coroutines.
- async stop()[source]¶
Stops the Source using self.Task. If the processes are not done it cancels them or raises an error.
- Returns:
- restart(loop)[source]¶
Restarts the loop of coroutines and returns result() method.
Parameters
- loop??
Contains the coroutines.
- async main()[source]¶
Can be implemented for additional features, else will raise NotImplementedError and _main is called instead.
- async stopped()[source]¶
Waits for all asynchronous tasks to be completed. It is helper that simplifies the implementation of sources.
Example:
..code:: python
async def main(self):
#… initialize resources here
await self.stopped()
#… finalize resources here
- locate_address()[source]¶
Locates address of a
Pipeline.- Returns:
ID and ID of a
Pipelineas a string.
- classmethod construct(app, pipeline, definition)[source]¶
Can create a source based on a specific definition. For example, a JSON file.
Parameters
- appApplication
Name of the Application.
- pipeline
Pipeline Specification of a
Pipeline.- definitiondict
Definition that is used to create a source.
- Returns:
cls(app, newid, config)
- Parameters:
definition (dict)
- class bspump.abc.source.TriggerSource(app, pipeline, id=None, config=None)[source]¶
Bases:
SourceA specialized source that responds to trigger events.
TriggerSource is designed to work with BitSwan’s trigger system. It waits for trigger events and executes a cycle() method when triggered. This allows for event-driven data processing in pipelines.
Key features: - Waits for trigger events using asyncio.Event - Executes cycle() method when triggered - Manages multiple triggers simultaneously - Handles trigger lifecycle events
- Usage:
- class MyTriggerSource(TriggerSource):
# Attach to any trigger type source = MyTriggerSource(app, pipeline) source.on(some_trigger)
- __init__(app, pipeline, id=None, config=None)[source]¶
Set the initial ID,
Pipelineand Task.Parameters
- appApplication
Name of an Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ .
- pipelineaddress of a pipeline
Name of a
Pipeline.- idstr, default None
Name of a the
Pipeline.- configcompatible config type , default None
Option for adding a configuration file.
- time()[source]¶
Method used for measuring an accurate time.
- Returns:
App.time()
- Hint:
You can find more information about UTC Time in the ASAB documentation
- on(trigger)[source]¶
Sets a Trigger which is a method that waits for a given condition.
Parameters
- triggerkeyword of a trigger
Given condition that.
- Returns:
Trigger.add(trigger)
Source
Base class for continuous event sources.
class MySource(bspump.Source):
async def main(self):
while True:
event = await self.get_event()
await self.Pipeline.ready()
await self.Pipeline.process(event)
TriggerSource
Base class for trigger-activated sources.
from bspump.abc.source import TriggerSource
class MyTriggerSource(TriggerSource):
async def cycle(self, *args, **kwargs):
await self.Pipeline.ready()
event = self.generate_event()
await self.Pipeline.process(event)
Processor¶
- class bspump.abc.processor.ProcessorBase(app, pipeline, id=None, config=None)[source]¶
Bases:
ConfigurableDescription:
- classmethod construct(app, pipeline, definition)[source]¶
Can construct a
processorbased on a specific definition. For example, a JSON file.Parameters
- appApplication
Name of the Application <https://asab.readthedocs.io/en/latest/asab/application.html#>_.
- pipelinestr
Name of the
Pipeline.- definitiondict
Set of instructions based on which
processorcan be constructed.
- Returns:
cls(app, pipeline, id=newid, config=config)
- Parameters:
definition (dict)
- process(context, event)[source]¶
Can be implemented to return event based on a given logic.
Parameters
- context :
Additional information passed to the method.
- eventData with time stamp stored in any data type, usually it is in JSON.
You can specify an event that is passed to the method.
- class bspump.abc.processor.Processor(app, pipeline, id=None, config=None)[source]¶
Bases:
ProcessorBaseInherits from ProcessorBase.
Processor
Base class for event processors.
class MyProcessor(bspump.Processor):
def process(self, context, event):
# Transform event
event["processed"] = True
return event
# Or async:
async def process(self, context, event):
result = await self.async_operation(event)
event["result"] = result
return event
Sink¶
- class bspump.abc.sink.Sink(app, pipeline, id=None, config=None)[source]¶
Bases:
ProcessorBaseSink is basically a processor. It takes an event sends it to a database where it is stored.
Sink
Base class for event sinks.
class MySink(bspump.Sink):
def process(self, context, event):
self.output(event)
# Or async:
async def process(self, context, event):
await self.async_output(event)
Generator¶
- class bspump.abc.generator.Generator(app, pipeline, id=None, config=None)[source]¶
Bases:
ProcessorBaseDescription:
- __init__(app, pipeline, id=None, config=None)[source]¶
Description:
Parameters
- appApplication
Name of the Application.
- pipelinePipeline
Name of the Pipeline.
- idstr, default = None
ID
- configJSON, default = None
configuration file containing additional information.
Generator
Base class for 1-to-many event generation.
class MyGenerator(bspump.Generator):
async def generate(self, context, event, depth):
for item in event["items"]:
self.Pipeline.inject(context, item, depth)
Connection¶
- class bspump.abc.connection.Connection(app, id=None, config=None)[source]¶
Bases:
ABC,ConfigurableConnection class is responsible for creating a connection between items or services within the infrastructure of BSPump. Their main use is to create connection with the main components of BSPump: source,
processorand sink.- __init__(app, id=None, config=None)[source]¶
Description:
Parameters
- appApplication
Specification of an Application.
id : default None
- configJSON or other compatible format, default None
It contains important information and data responsible for creating a connection.
- time()[source]¶
Returns accurate time of the asynchronous process.
- Hint:
More information in the ASAB documentation in UTC Time.
- classmethod construct(app, definition)[source]¶
Creates a connection based on a specific definition. For example, a JSON file.
Parameters
- appApplication
ID of the Application <https://asab.readthedocs.io/en/latest/asab/application.html#>_.
- definitiondefinition format
Defines instructions for the method that can be used to create a connection.
- Returns:
cls(app, newid, config)
- Parameters:
definition (dict)
Connection
Base class for shared connections.
class MyConnection(bspump.Connection):
async def connect(self):
self.client = await create_client()
async def disconnect(self):
await self.client.close()
Lookup¶
- class bspump.abc.lookup.Lookup(app, id=None, config=None, lazy=False)[source]¶
Bases:
ConfigurableDescription:
- Returns:
- class bspump.abc.lookup.AsyncLookupMixin(app, id=None, config=None, lazy=False)[source]¶
Bases:
LookupDescription:
- class bspump.abc.lookup.DictionaryLookup(app, id=None, config=None, lazy=False)[source]¶
Bases:
MappingLookupDescription:
Lookup
Base class for lookup tables.
class MyLookup(bspump.Lookup):
def get(self, key):
return self.data.get(key)
def set(self, key, value):
self.data[key] = value
Anomaly¶
- class bspump.abc.anomaly.Anomaly[source]¶
Bases:
dictDescription: Anomaly is an abstract class to be overridden for a specific anomaly and its type.
- Returns:
Implement: TYPE, on_tick
- TYPE = None¶
Anomaly
Base class for anomaly detection.