bspump¶
The main BSPump module containing the core application framework.
- class bspump.BSPumpApplication(*args, **kwargs)[source]¶
Bases:
ApplicationDescription: BSPumpApplication is class used for …..
- Returns:
- __init__(args=None)[source]¶
Initialize the Application provided with arguments and modules.
- Parameters:
args – sequence of arguments to be parsed by Application.parse_arguments() call.
modules – list of ASAB modules to be added by Application.add_module() call.
Examples:
```python class MyApplication(asab.Application):
- def __init__(self):
super().__init__(modules=[asab.web.Module, asab.zookeeper.Module])
- class bspump.BSPumpService(app, service_name='bspump.PumpService')[source]¶
Bases:
ServiceService registry based on Service object. Read more in ASAB documentation `Service <https://asab.readthedocs.io/en/latest/asab/service.html`_.
- __init__(app, service_name='bspump.PumpService')[source]¶
Initializes parameters passed to the Service class.
Parameters
- appApplication
Name of the Application.
- service_namestr, Service name
string variable containing “”bspump.PumpService
- locate(address)[source]¶
Locates a pipeline, source or processor based on the addressed parameter.
Parameters
- addressstr, ID
Address of an pipeline component. 1. To locate a Pipeline, use the pipeline ID. 2. To locate a Processor or a Sink, use the pipeline ID and the component ID separated by a dot, like ‘pipelineId.componentId’. 3. To locate a Source, add ‘*’ before ID of the source, like ‘pipeline.*SourceId’.
Returns
Pipeline, Source or Processor object if it is found in the components list, otherwise None.
Usage
>>> pipeline = service.locate("SuperCoolPipeline") >>> pipeline.Id SuperCoolPipeline
>>> processor = service.locate("SuperCoolPipeline.PPrintProcessor") >>> processor.Id PPrintProcessor
>>> source = service.locate("SuperCoolPipeline.*MySource") >>> source.Id MySource
- add_pipeline(pipeline)[source]¶
Adds a pipeline to the BSPump.
Parameters
- pipelinePipeline
Name of the Pipeline.
- add_pipelines(*pipelines)[source]¶
Adds a pipelines the BSPump.
Parameters
- *pipelineslist
List of pipelines that are add to the BSPump.
- del_pipeline(pipeline)[source]¶
Deletes a pipeline from a list of Pipelines.
**Parameters*
- pipelinestr, ID
ID of a pipeline.
- add_connection(connection)[source]¶
Adds a connection to the Connection dictionary.
Parameters
- connectionstr, ID
ID of a connection.
- Returns:
connection
- add_connections(*connections)[source]¶
Adds a connections to the Connection dictionary.
Parameters
- *connectionstr, ID
list of IDs of a connections.
- locate_connection(connection_id)[source]¶
Locates connection based on connection ID.
Parameters
- connection_idID
Connection ID.
- add_lookup(lookup)[source]¶
Sets a lookup based on Lookup.
Parameters
- lookupLookup
Name of the Lookup.
- Returns:
lookup
- add_lookups(*lookups)[source]¶
Adds a list of lookups to the Pipeline.
Parameters
- lookupLookup
List of Lookups.
- locate_lookup(lookup_id, context=None)[source]¶
Locates lookup based on ID.
Parameters
- lookup_idID
ID of a Lookup.
- context,default = None
Additional information.
- Returns:
lookup from the lookup service or form the internal dictionary.
- add_lookup_factory(lookup_factory)[source]¶
Adds a lookup factory
Parameters
- lookup_factory :
Name of lookup factory.
- add_matrix(matrix)[source]¶
Adds a matrix to the Pipeline.
Parameters
- matrixMatrix
Name of Matrix.
- Returns:
matrix
- add_matrixes(*matrixes)[source]¶
Adds a list of Matrices to the Pipeline.
Parameters
- *matrixeslist
List of matrices.
- locate_matrix(matrix_id)[source]¶
Locates a matrix based on matrix ID
Parameters
- matrix_idstr, ID
ID of a matrix.
- async initialize(app)[source]¶
Initializes an Application based on ASAB Application
Parameters
- appApplication
Name of the Application
- async finalize(app)[source]¶
Stops all the pipelines
Parameters
- appApplication
Name of the Application
- Returns:
- class bspump.Pipeline(app, id=None, config=None)[source]¶
Bases:
ABC,ConfigurableDescription: Pipeline is …
An example of The
Pipelineconstruction:class MyPipeline(bspump.Pipeline):
- def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id) self.build(
[ MySource(app, self), MyProcessor(app, self), MyProcessor2(app, self), ]
bspump.common.NullSink(app, self),
)
- ConfigDefaults: dict = {'async_concurency_limit': 1000, 'reset_profiler': True, 'stop_on_errors': True}¶
- __init__(app, id=None, config=None)[source]¶
Initializes basic variables used in the other
Pipelinemethods. You can also add more information using parameters.Parameters
- appApplication
name of the ASAB Application
- idstr, default None
You can enter ID of the class. Otherwise a name of the current class will used by calling __class__ descriptor object.
- config,default None
You can add a config file with additional settings and configurations, otherwise a default config is used.
- time()[source]¶
Returns correct time.
- Returns:
App.time()
- Hint:
More information in the ASAB documentation in UTC Time.
- get_throttles()[source]¶
Returns components from
Pipelinethat are throttled.- Returns:
self._throttles Return list of throttles.
Parameters: —
- Returns:
xxxx
- is_error()[source]¶
Returns False when there is no error, otherwise it returns True.
- Returns:
self._error is not None.
Parameters: —
- Returns:
xxxx
- set_error(context, event, exc)[source]¶
When called with exc is None, it resets error (aka recovery).
When called with exc, it sets exceptions for soft errors.
Parameters
- contexttype?
Context of an error.
- eventData with time stamp stored in any data type usually is in JSON.
You can specify an event that is passed to the method.
- excException.
Python default exceptions.
- handle_error(exception, context, event)[source]¶
Used for setting up exceptions and conditions for errors. You can implement it to evaluate processing errors.
Parameters
- exceptionException
Used for setting up a custom Exception.
- contextinformation
Additional information can be passed.
- eventData with time stamp stored in any data type, usually it is in JSON.
You can specify an event that is passed to the method.
- Returns:
False for hard errors (stop the
Pipelineprocessing). True for soft errors that will be ignored.
Example:
class SampleInternalPipeline(bspump.Pipeline): def __init__(self, app, pipeline_id): super().__init__(app, pipeline_id) self.build( bspump.common.InternalSource(app, self), bspump.common.JSONParserProcessor(app, self), bspump.common.PPrintSink(app, self) ) def handle_error(self, exception, context, event): if isinstance(exception, json.decoder.JSONDecodeError): return True return False
- link(ancestral_pipeline)[source]¶
Links this
Pipelinewith an ancestralPipeline. This is needed e. g. for a propagation of the throttling from childPipelinesback to their ancestors. If the childPipelineuses InternalSource, it may become throttled because the internal queue is full. If so, the throttling is propagated to the ancestralPipeline, so that its source may block incoming events until the internal queue is empty again.Parameters
- ancestral_pipelinestr
ID of a
Pipelinethat will be linked.
- unlink(ancestral_pipeline)[source]¶
Unlinks an ancestral
Pipelinefrom thisPipeline.Parameters
- ancestral_pipelinestr
ID of a ancestral
Pipelinethat will be unlinked.
- throttle(who, enable=True)[source]¶
Enables throttling method for a chosen
pipelineand its ancestralpipelines,x if needed.Parameters
- async ready()[source]¶
Checks if the
Pipelineis ready. The method can be used in source: await self.Pipeline.ready().
- is_ready()[source]¶
This method is a check up of the event in the Event class.
- Returns:
_ready.is_set().
- inject(context, event, depth)[source]¶
Injects method serves to inject events into the
Pipeline’s depth defined by the depth attribute. Every depth is interconnected with a generator object.Parameters
- contextstring
Information propagated through the
Pipeline.- eventData with time stamp stored in any data type, usually it is in JSON.
You can specify an event that is passed to the method.
- depthint
Level of depth.
- Note:
For normal operations, it is highly recommended to use process method instead.
- async process(event, context=None)[source]¶
Process method serves to inject events into the
Pipeline’s depth 0, while incrementing the event in metric.Parameters
- eventData with time stamp stored in any data type, usually it is in JSON.
You can specify an event that is passed to the method.
- contextstr, default None
You can add additional information needed for work with event streaming.
- Hint:
This is recommended way of inserting events into a
Pipeline.
- create_eps_counter()[source]¶
Creates a dictionary with information about the
Pipeline. It contains eps (events per second), warnings and errors.- Returns:
self.MetricsService Creates eps counter using MetricsService.
- Note:
EPS counter can be created using this method or dicertly by using MatricsService method.
- ensure_future(coro)[source]¶
You can use this method to schedule a future task that will be executed in a context of the
Pipeline. ThePipelinealso manages a whole lifecycle of the future/task, which means, it will collect the future result, trash it, and mainly it will capture any possible exception, which will then block thePipelinevia set_error().Parameters
- coro??
??
- Hint:
If the number of futures exceeds the configured limit, the
Pipelineis throttled.
- set_source(source)[source]¶
Sets a specific source or list of sources to the
Pipeline.Parameters
- sourcestr, list optional
ID of a source.
If a list of sources is passed to the method, it adds the entire list of sources to the
Pipeline.
- append_processor(processor)[source]¶
Adds a
Processorsto the currentPipeline.Parameters
- processorstr
ID of a
processor.
- Hint:
The Generator can be added by using this method. It requires a depth parameter.
- remove_processor(processor_id)[source]¶
Removes a specific
processorfrom thePipeline.Parameters
- processor_idstr
ID of a
processor.
- Raises:
Error when
processoris not found.
- insert_before(id, processor)[source]¶
Inserts the
Processorinto thePipelinein front of anotherprocessorspecified by ID.Parameters
- idstr
ID of a
processorthat we want to insert.- processorstr
Name of the
processorin front of which will be inserted the newprocessor.
- Returns:
True on success. False if ID was not found.
- insert_after(id, processor)[source]¶
Inserts the
Processorinto thePipelinebehind anotherProcessorsspecified by ID.Parameters
- idstr
ID of a processor that we want to insert.
- processorstr
- Returns:
True if successful. False if ID was not found.
- build(source, *processors)[source]¶
This method enables to add sources,
Processors, and sink to create the structure of thePipeline.Parameters
- iter_processors()[source]¶
Uses python generator routine that iterates through all
Processorsin thePipeline.- Yields:
A Processor from a list in the
Pipeline.
- locate_source(address)[source]¶
Locates a sources based on its ID.
Parameters
- addressstr
ID of the source.
- locate_connection(app, connection_id)[source]¶
Finds a connection by ID.
Parameters
- appApplication
Name of the Application.
- connection_idstr
ID of connection we want to locate.
- Returns:
connection
- class bspump.PumpBuilder(definition)[source]¶
Bases:
objectPumpBuilder is meant to create the pipeline with connections, processors, sources alternatively.
definitionis a path to the json file, containing description of the pump. Example of such file:{ "pipelines" : [ { "id": "MyPipeline0", "args": {}, "config": {}, "sources": [ { "id": "FileCSVSource", "module": "bspump.file", "class" : "FileCSVSource", "args": {}, "config": {"path":"etc/test.csv", "post":"noop"}, "trigger": { "module": "bspump.trigger", "class": "OpportunisticTrigger", "id": "", "args": {} } } ], "processors": [ { "module":"bspump-pumpbuilder", "class": "Processor00", "args": {}, "config": {} } ], "sink": { "module":"bspump.common", "class": "PPrintSink", "args": {}, "config": {} } } ] } |
- class bspump.Source(app, pipeline, id=None, config=None)[source]¶
Bases:
ConfigurableDescription:
- __init__(app, pipeline, id=None, config=None)[source]¶
Set the initial ID,
Pipelineand Task.Parameters
- appApplication
Name of an Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ .
- pipelineaddress of a pipeline
Name of a
Pipeline.- idstr, default None
Name of a the
Pipeline.- configcompatible config type , default None
Option for adding a configuration file.
- async process(event, context=None)[source]¶
This method is used to emit event into a
Pipeline.Parameters
- event: Data with time stamp stored in any data type, usually JSON.
Message or information that is passed to the method and emitted into a
Pipeline.- contextdefault None
Additional information.
If there is an error in the processing of the event, the
Pipelineis throttled by setting the error and the exception raised.:hint The source should catch this exception and fail gracefully.
- start(loop)[source]¶
Starts the
Pipelinethrough the _main method, but if main method is implemented it starts the coroutine using main method instead.Parameters
- loop?
Contains the coroutines.
- async stop()[source]¶
Stops the Source using self.Task. If the processes are not done it cancels them or raises an error.
- Returns:
- restart(loop)[source]¶
Restarts the loop of coroutines and returns result() method.
Parameters
- loop??
Contains the coroutines.
- async main()[source]¶
Can be implemented for additional features, else will raise NotImplementedError and _main is called instead.
- async stopped()[source]¶
Waits for all asynchronous tasks to be completed. It is helper that simplifies the implementation of sources.
Example:
..code:: python
async def main(self):
#… initialize resources here
await self.stopped()
#… finalize resources here
- locate_address()[source]¶
Locates address of a
Pipeline.- Returns:
ID and ID of a
Pipelineas a string.
- classmethod construct(app, pipeline, definition)[source]¶
Can create a source based on a specific definition. For example, a JSON file.
Parameters
- appApplication
Name of the Application.
- pipeline
Pipeline Specification of a
Pipeline.- definitiondict
Definition that is used to create a source.
- Returns:
cls(app, newid, config)
- Parameters:
definition (dict)
- class bspump.TriggerSource(app, pipeline, id=None, config=None)[source]¶
Bases:
SourceA specialized source that responds to trigger events.
TriggerSource is designed to work with BitSwan’s trigger system. It waits for trigger events and executes a cycle() method when triggered. This allows for event-driven data processing in pipelines.
Key features: - Waits for trigger events using asyncio.Event - Executes cycle() method when triggered - Manages multiple triggers simultaneously - Handles trigger lifecycle events
- Usage:
- class MyTriggerSource(TriggerSource):
# Attach to any trigger type source = MyTriggerSource(app, pipeline) source.on(some_trigger)
- __init__(app, pipeline, id=None, config=None)[source]¶
Set the initial ID,
Pipelineand Task.Parameters
- appApplication
Name of an Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ .
- pipelineaddress of a pipeline
Name of a
Pipeline.- idstr, default None
Name of a the
Pipeline.- configcompatible config type , default None
Option for adding a configuration file.
- time()[source]¶
Method used for measuring an accurate time.
- Returns:
App.time()
- Hint:
You can find more information about UTC Time in the ASAB documentation
- on(trigger)[source]¶
Sets a Trigger which is a method that waits for a given condition.
Parameters
- triggerkeyword of a trigger
Given condition that.
- Returns:
Trigger.add(trigger)
- class bspump.Sink(app, pipeline, id=None, config=None)[source]¶
Bases:
ProcessorBaseSink is basically a processor. It takes an event sends it to a database where it is stored.
- class bspump.Processor(app, pipeline, id=None, config=None)[source]¶
Bases:
ProcessorBaseInherits from ProcessorBase.
- class bspump.Generator(app, pipeline, id=None, config=None)[source]¶
Bases:
ProcessorBaseDescription:
- __init__(app, pipeline, id=None, config=None)[source]¶
Description:
Parameters
- appApplication
Name of the Application.
- pipelinePipeline
Name of the Pipeline.
- idstr, default = None
ID
- configJSON, default = None
configuration file containing additional information.
- class bspump.Connection(app, id=None, config=None)[source]¶
Bases:
ABC,ConfigurableConnection class is responsible for creating a connection between items or services within the infrastructure of BSPump. Their main use is to create connection with the main components of BSPump: source,
processorand sink.- __init__(app, id=None, config=None)[source]¶
Description:
Parameters
- appApplication
Specification of an Application.
id : default None
- configJSON or other compatible format, default None
It contains important information and data responsible for creating a connection.
- time()[source]¶
Returns accurate time of the asynchronous process.
- Hint:
More information in the ASAB documentation in UTC Time.
- classmethod construct(app, definition)[source]¶
Creates a connection based on a specific definition. For example, a JSON file.
Parameters
- appApplication
ID of the Application <https://asab.readthedocs.io/en/latest/asab/application.html#>_.
- definitiondefinition format
Defines instructions for the method that can be used to create a connection.
- Returns:
cls(app, newid, config)
- Parameters:
definition (dict)
- class bspump.Analyzer(app, pipeline, analyze_on_clock=False, id=None, config=None)[source]¶
Bases:
ProcessorDescription:
- __init__(app, pipeline, analyze_on_clock=False, id=None, config=None)[source]¶
Initializes the Parameters
Parameters
- evaluate(context, event)[source]¶
- The function which records the information from the event into the analyzed object.
Specific for each analyzer.
Parameters
context :
- eventany data type
information with timestamp.
- predicate(context, event)[source]¶
This function is meant to check, if the event is worth to process. If it is, should return True. specific for each analyzer, but default one always returns True.
Parameters
context :
- eventany data type
information with timestamp.
- Returns:
True
- exception bspump.ProcessingError[source]¶
Bases:
RuntimeErrorGeneric exception that indicate an error in a pipeline processing.
- class bspump.Lookup(app, id=None, config=None, lazy=False)[source]¶
Bases:
ConfigurableDescription:
- Returns:
- class bspump.DictionaryLookup(app, id=None, config=None, lazy=False)[source]¶
Bases:
MappingLookupDescription:
- class bspump.Matrix(app, dtype='float_', persistent=False, id=None, config=None)[source]¶
Bases:
ABC,ConfigurableGeneric Matrix object.
Matrix structure is organized in a following hierarchical order:
Matrix -> Rows -> Columns -> Cells
Cells have unified data format across the whole matrix. This format is specified by a dtype. It can be a simple integer or float but also a complex dictionary type with names and types of the fields.
The description of types that can be used for a dtype of a cell:
Name
Definition
‘b’
Byte
‘i’
Signed integer
‘u’
Unsigned integer
‘f’
Floating point
‘c’
Complex floating point
‘S’
String
‘U’
Unicode string
‘V’
Raw data
Example: ‘i8’ stands for int64.
For more details, see https://docs.scipy.org/doc/numpy/reference/arrays.dtypes.html
Object main attributes: Array is numpy ndarray, the actual data representation of the matrix object. ClosedRows is a set, where some row ids can be stored before deletion during the matrix rebuild.
- async analyze()[source]¶
The Matrix itself can run the analyze(). It is not recommended to iterate through the matrix row by row (or cell by cell). Instead use numpy functions. Examples: 1. You have a vector with n rows. You need only those row indices, where the cell content is more than 10. Use np.where(vector > 10). 2. You have a matrix with n rows and m columns. You need to find out which rows fully consist of zeros. use np.where(np.all(matrix == 0, axis=1)) to get those row indexes. Instead np.all() you can use np.any() to get all row indexes, where there is at least one zero. 3. Use np.mean(matrix, axis=1) to get means for all rows. 4. Useful numpy functions: np.unique(), np.sum(), np.argmin(), np.argmax().
- class bspump.Model(app, id=None, config=None)[source]¶
Bases:
ConfigurableGeneric Model object. Loads trained model and parameters.
- class bspump.PersistentNamedMatrix(app, dtype='float_', id=None, config=None)[source]¶
Bases:
PersistentMatrix
- class bspump.Anomaly[source]¶
Bases:
dictDescription: Anomaly is an abstract class to be overridden for a specific anomaly and its type.
- Returns:
Implement: TYPE, on_tick
- TYPE = None¶
BSPumpApplication¶
- class bspump.BSPumpApplication(*args, **kwargs)[source]¶
Bases:
ApplicationDescription: BSPumpApplication is class used for …..
- Returns:
- __init__(args=None)[source]¶
Initialize the Application provided with arguments and modules.
- Parameters:
args – sequence of arguments to be parsed by Application.parse_arguments() call.
modules – list of ASAB modules to be added by Application.add_module() call.
Examples:
```python class MyApplication(asab.Application):
- def __init__(self):
super().__init__(modules=[asab.web.Module, asab.zookeeper.Module])
The main application class that manages the event loop, services, and component lifecycle.
Example:
import bspump
app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
svc.add_pipeline(MyPipeline(app, "MyPipeline"))
app.run()
BSPumpService¶
- class bspump.BSPumpService(app, service_name='bspump.PumpService')[source]¶
Bases:
ServiceService registry based on Service object. Read more in ASAB documentation `Service <https://asab.readthedocs.io/en/latest/asab/service.html`_.
- __init__(app, service_name='bspump.PumpService')[source]¶
Initializes parameters passed to the Service class.
Parameters
- appApplication
Name of the Application.
- service_namestr, Service name
string variable containing “”bspump.PumpService
- locate(address)[source]¶
Locates a pipeline, source or processor based on the addressed parameter.
Parameters
- addressstr, ID
Address of an pipeline component. 1. To locate a Pipeline, use the pipeline ID. 2. To locate a Processor or a Sink, use the pipeline ID and the component ID separated by a dot, like ‘pipelineId.componentId’. 3. To locate a Source, add ‘*’ before ID of the source, like ‘pipeline.*SourceId’.
Returns
Pipeline, Source or Processor object if it is found in the components list, otherwise None.
Usage
>>> pipeline = service.locate("SuperCoolPipeline") >>> pipeline.Id SuperCoolPipeline
>>> processor = service.locate("SuperCoolPipeline.PPrintProcessor") >>> processor.Id PPrintProcessor
>>> source = service.locate("SuperCoolPipeline.*MySource") >>> source.Id MySource
- add_pipeline(pipeline)[source]¶
Adds a pipeline to the BSPump.
Parameters
- pipelinePipeline
Name of the Pipeline.
- add_pipelines(*pipelines)[source]¶
Adds a pipelines the BSPump.
Parameters
- *pipelineslist
List of pipelines that are add to the BSPump.
- del_pipeline(pipeline)[source]¶
Deletes a pipeline from a list of Pipelines.
**Parameters*
- pipelinestr, ID
ID of a pipeline.
- add_connection(connection)[source]¶
Adds a connection to the Connection dictionary.
Parameters
- connectionstr, ID
ID of a connection.
- Returns:
connection
- add_connections(*connections)[source]¶
Adds a connections to the Connection dictionary.
Parameters
- *connectionstr, ID
list of IDs of a connections.
- locate_connection(connection_id)[source]¶
Locates connection based on connection ID.
Parameters
- connection_idID
Connection ID.
- add_lookup(lookup)[source]¶
Sets a lookup based on Lookup.
Parameters
- lookupLookup
Name of the Lookup.
- Returns:
lookup
- add_lookups(*lookups)[source]¶
Adds a list of lookups to the Pipeline.
Parameters
- lookupLookup
List of Lookups.
- locate_lookup(lookup_id, context=None)[source]¶
Locates lookup based on ID.
Parameters
- lookup_idID
ID of a Lookup.
- context,default = None
Additional information.
- Returns:
lookup from the lookup service or form the internal dictionary.
- add_lookup_factory(lookup_factory)[source]¶
Adds a lookup factory
Parameters
- lookup_factory :
Name of lookup factory.
- add_matrix(matrix)[source]¶
Adds a matrix to the Pipeline.
Parameters
- matrixMatrix
Name of Matrix.
- Returns:
matrix
- add_matrixes(*matrixes)[source]¶
Adds a list of Matrices to the Pipeline.
Parameters
- *matrixeslist
List of matrices.
- locate_matrix(matrix_id)[source]¶
Locates a matrix based on matrix ID
Parameters
- matrix_idstr, ID
ID of a matrix.
- async initialize(app)[source]¶
Initializes an Application based on ASAB Application
Parameters
- appApplication
Name of the Application
- async finalize(app)[source]¶
Stops all the pipelines
Parameters
- appApplication
Name of the Application
- Returns:
Service that manages pipelines, connections, and lookups.
Methods:
add_pipeline(pipeline)- Register a pipelineadd_connection(connection)- Register a connectionadd_lookup(lookup)- Register a lookuplocate_connection(connection_id)- Get a connection by IDlocate_lookup(lookup_id)- Get a lookup by ID
Pipeline¶
- class bspump.Pipeline(app, id=None, config=None)[source]¶
Bases:
ABC,ConfigurableDescription: Pipeline is …
An example of The
Pipelineconstruction:class MyPipeline(bspump.Pipeline):
- def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id) self.build(
[ MySource(app, self), MyProcessor(app, self), MyProcessor2(app, self), ]
bspump.common.NullSink(app, self),
)
- ConfigDefaults: dict = {'async_concurency_limit': 1000, 'reset_profiler': True, 'stop_on_errors': True}¶
- __init__(app, id=None, config=None)[source]¶
Initializes basic variables used in the other
Pipelinemethods. You can also add more information using parameters.Parameters
- appApplication
name of the ASAB Application
- idstr, default None
You can enter ID of the class. Otherwise a name of the current class will used by calling __class__ descriptor object.
- config,default None
You can add a config file with additional settings and configurations, otherwise a default config is used.
- time()[source]¶
Returns correct time.
- Returns:
App.time()
- Hint:
More information in the ASAB documentation in UTC Time.
- get_throttles()[source]¶
Returns components from
Pipelinethat are throttled.- Returns:
self._throttles Return list of throttles.
Parameters: —
- Returns:
xxxx
- is_error()[source]¶
Returns False when there is no error, otherwise it returns True.
- Returns:
self._error is not None.
Parameters: —
- Returns:
xxxx
- set_error(context, event, exc)[source]¶
When called with exc is None, it resets error (aka recovery).
When called with exc, it sets exceptions for soft errors.
Parameters
- contexttype?
Context of an error.
- eventData with time stamp stored in any data type usually is in JSON.
You can specify an event that is passed to the method.
- excException.
Python default exceptions.
- handle_error(exception, context, event)[source]¶
Used for setting up exceptions and conditions for errors. You can implement it to evaluate processing errors.
Parameters
- exceptionException
Used for setting up a custom Exception.
- contextinformation
Additional information can be passed.
- eventData with time stamp stored in any data type, usually it is in JSON.
You can specify an event that is passed to the method.
- Returns:
False for hard errors (stop the
Pipelineprocessing). True for soft errors that will be ignored.
Example:
class SampleInternalPipeline(bspump.Pipeline): def __init__(self, app, pipeline_id): super().__init__(app, pipeline_id) self.build( bspump.common.InternalSource(app, self), bspump.common.JSONParserProcessor(app, self), bspump.common.PPrintSink(app, self) ) def handle_error(self, exception, context, event): if isinstance(exception, json.decoder.JSONDecodeError): return True return False
- link(ancestral_pipeline)[source]¶
Links this
Pipelinewith an ancestralPipeline. This is needed e. g. for a propagation of the throttling from childPipelinesback to their ancestors. If the childPipelineuses InternalSource, it may become throttled because the internal queue is full. If so, the throttling is propagated to the ancestralPipeline, so that its source may block incoming events until the internal queue is empty again.Parameters
- ancestral_pipelinestr
ID of a
Pipelinethat will be linked.
- unlink(ancestral_pipeline)[source]¶
Unlinks an ancestral
Pipelinefrom thisPipeline.Parameters
- ancestral_pipelinestr
ID of a ancestral
Pipelinethat will be unlinked.
- throttle(who, enable=True)[source]¶
Enables throttling method for a chosen
pipelineand its ancestralpipelines,x if needed.Parameters
- async ready()[source]¶
Checks if the
Pipelineis ready. The method can be used in source: await self.Pipeline.ready().
- is_ready()[source]¶
This method is a check up of the event in the Event class.
- Returns:
_ready.is_set().
- inject(context, event, depth)[source]¶
Injects method serves to inject events into the
Pipeline’s depth defined by the depth attribute. Every depth is interconnected with a generator object.Parameters
- contextstring
Information propagated through the
Pipeline.- eventData with time stamp stored in any data type, usually it is in JSON.
You can specify an event that is passed to the method.
- depthint
Level of depth.
- Note:
For normal operations, it is highly recommended to use process method instead.
- async process(event, context=None)[source]¶
Process method serves to inject events into the
Pipeline’s depth 0, while incrementing the event in metric.Parameters
- eventData with time stamp stored in any data type, usually it is in JSON.
You can specify an event that is passed to the method.
- contextstr, default None
You can add additional information needed for work with event streaming.
- Hint:
This is recommended way of inserting events into a
Pipeline.
- create_eps_counter()[source]¶
Creates a dictionary with information about the
Pipeline. It contains eps (events per second), warnings and errors.- Returns:
self.MetricsService Creates eps counter using MetricsService.
- Note:
EPS counter can be created using this method or dicertly by using MatricsService method.
- ensure_future(coro)[source]¶
You can use this method to schedule a future task that will be executed in a context of the
Pipeline. ThePipelinealso manages a whole lifecycle of the future/task, which means, it will collect the future result, trash it, and mainly it will capture any possible exception, which will then block thePipelinevia set_error().Parameters
- coro??
??
- Hint:
If the number of futures exceeds the configured limit, the
Pipelineis throttled.
- set_source(source)[source]¶
Sets a specific source or list of sources to the
Pipeline.Parameters
- sourcestr, list optional
ID of a source.
If a list of sources is passed to the method, it adds the entire list of sources to the
Pipeline.
- append_processor(processor)[source]¶
Adds a
Processorsto the currentPipeline.Parameters
- processorstr
ID of a
processor.
- Hint:
The Generator can be added by using this method. It requires a depth parameter.
- remove_processor(processor_id)[source]¶
Removes a specific
processorfrom thePipeline.Parameters
- processor_idstr
ID of a
processor.
- Raises:
Error when
processoris not found.
- insert_before(id, processor)[source]¶
Inserts the
Processorinto thePipelinein front of anotherprocessorspecified by ID.Parameters
- idstr
ID of a
processorthat we want to insert.- processorstr
Name of the
processorin front of which will be inserted the newprocessor.
- Returns:
True on success. False if ID was not found.
- insert_after(id, processor)[source]¶
Inserts the
Processorinto thePipelinebehind anotherProcessorsspecified by ID.Parameters
- idstr
ID of a processor that we want to insert.
- processorstr
- Returns:
True if successful. False if ID was not found.
- build(source, *processors)[source]¶
This method enables to add sources,
Processors, and sink to create the structure of thePipeline.Parameters
- iter_processors()[source]¶
Uses python generator routine that iterates through all
Processorsin thePipeline.- Yields:
A Processor from a list in the
Pipeline.
- locate_source(address)[source]¶
Locates a sources based on its ID.
Parameters
- addressstr
ID of the source.
- locate_connection(app, connection_id)[source]¶
Finds a connection by ID.
Parameters
- appApplication
Name of the Application.
- connection_idstr
ID of connection we want to locate.
- Returns:
connection
The core pipeline class that chains sources, processors, and sinks.
Example:
class MyPipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
MySource(app, self),
MyProcessor(app, self),
MySink(app, self),
)
PumpBuilder¶
- class bspump.PumpBuilder(definition)[source]¶
Bases:
objectPumpBuilder is meant to create the pipeline with connections, processors, sources alternatively.
definitionis a path to the json file, containing description of the pump. Example of such file:{ "pipelines" : [ { "id": "MyPipeline0", "args": {}, "config": {}, "sources": [ { "id": "FileCSVSource", "module": "bspump.file", "class" : "FileCSVSource", "args": {}, "config": {"path":"etc/test.csv", "post":"noop"}, "trigger": { "module": "bspump.trigger", "class": "OpportunisticTrigger", "id": "", "args": {} } } ], "processors": [ { "module":"bspump-pumpbuilder", "class": "Processor00", "args": {}, "config": {} } ], "sink": { "module":"bspump.common", "class": "PPrintSink", "args": {}, "config": {} } } ] } |
Utility for building pipelines programmatically.
Source¶
- class bspump.Source(app, pipeline, id=None, config=None)[source]¶
Bases:
ConfigurableDescription:
- __init__(app, pipeline, id=None, config=None)[source]¶
Set the initial ID,
Pipelineand Task.Parameters
- appApplication
Name of an Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ .
- pipelineaddress of a pipeline
Name of a
Pipeline.- idstr, default None
Name of a the
Pipeline.- configcompatible config type , default None
Option for adding a configuration file.
- async process(event, context=None)[source]¶
This method is used to emit event into a
Pipeline.Parameters
- event: Data with time stamp stored in any data type, usually JSON.
Message or information that is passed to the method and emitted into a
Pipeline.- contextdefault None
Additional information.
If there is an error in the processing of the event, the
Pipelineis throttled by setting the error and the exception raised.:hint The source should catch this exception and fail gracefully.
- start(loop)[source]¶
Starts the
Pipelinethrough the _main method, but if main method is implemented it starts the coroutine using main method instead.Parameters
- loop?
Contains the coroutines.
- async stop()[source]¶
Stops the Source using self.Task. If the processes are not done it cancels them or raises an error.
- Returns:
- restart(loop)[source]¶
Restarts the loop of coroutines and returns result() method.
Parameters
- loop??
Contains the coroutines.
- async main()[source]¶
Can be implemented for additional features, else will raise NotImplementedError and _main is called instead.
- async stopped()[source]¶
Waits for all asynchronous tasks to be completed. It is helper that simplifies the implementation of sources.
Example:
..code:: python
async def main(self):
#… initialize resources here
await self.stopped()
#… finalize resources here
- locate_address()[source]¶
Locates address of a
Pipeline.- Returns:
ID and ID of a
Pipelineas a string.
- classmethod construct(app, pipeline, definition)[source]¶
Can create a source based on a specific definition. For example, a JSON file.
Parameters
- appApplication
Name of the Application.
- pipeline
Pipeline Specification of a
Pipeline.- definitiondict
Definition that is used to create a source.
- Returns:
cls(app, newid, config)
- Parameters:
definition (dict)
Base class for event sources.
TriggerSource¶
- class bspump.TriggerSource(app, pipeline, id=None, config=None)[source]¶
Bases:
SourceA specialized source that responds to trigger events.
TriggerSource is designed to work with BitSwan’s trigger system. It waits for trigger events and executes a cycle() method when triggered. This allows for event-driven data processing in pipelines.
Key features: - Waits for trigger events using asyncio.Event - Executes cycle() method when triggered - Manages multiple triggers simultaneously - Handles trigger lifecycle events
- Usage:
- class MyTriggerSource(TriggerSource):
# Attach to any trigger type source = MyTriggerSource(app, pipeline) source.on(some_trigger)
- __init__(app, pipeline, id=None, config=None)[source]¶
Set the initial ID,
Pipelineand Task.Parameters
- appApplication
Name of an Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ .
- pipelineaddress of a pipeline
Name of a
Pipeline.- idstr, default None
Name of a the
Pipeline.- configcompatible config type , default None
Option for adding a configuration file.
- time()[source]¶
Method used for measuring an accurate time.
- Returns:
App.time()
- Hint:
You can find more information about UTC Time in the ASAB documentation
- on(trigger)[source]¶
Sets a Trigger which is a method that waits for a given condition.
Parameters
- triggerkeyword of a trigger
Given condition that.
- Returns:
Trigger.add(trigger)
Base class for trigger-activated sources.
Processor¶
- class bspump.Processor(app, pipeline, id=None, config=None)[source]¶
Bases:
ProcessorBaseInherits from ProcessorBase.
Base class for event processors.
Generator¶
- class bspump.Generator(app, pipeline, id=None, config=None)[source]¶
Bases:
ProcessorBaseDescription:
- __init__(app, pipeline, id=None, config=None)[source]¶
Description:
Parameters
- appApplication
Name of the Application.
- pipelinePipeline
Name of the Pipeline.
- idstr, default = None
ID
- configJSON, default = None
configuration file containing additional information.
Base class for generators that produce multiple events.
Sink¶
- class bspump.Sink(app, pipeline, id=None, config=None)[source]¶
Bases:
ProcessorBaseSink is basically a processor. It takes an event sends it to a database where it is stored.
Base class for event sinks.
Connection¶
- class bspump.Connection(app, id=None, config=None)[source]¶
Bases:
ABC,ConfigurableConnection class is responsible for creating a connection between items or services within the infrastructure of BSPump. Their main use is to create connection with the main components of BSPump: source,
processorand sink.- __init__(app, id=None, config=None)[source]¶
Description:
Parameters
- appApplication
Specification of an Application.
id : default None
- configJSON or other compatible format, default None
It contains important information and data responsible for creating a connection.
- time()[source]¶
Returns accurate time of the asynchronous process.
- Hint:
More information in the ASAB documentation in UTC Time.
- classmethod construct(app, definition)[source]¶
Creates a connection based on a specific definition. For example, a JSON file.
Parameters
- appApplication
ID of the Application <https://asab.readthedocs.io/en/latest/asab/application.html#>_.
- definitiondefinition format
Defines instructions for the method that can be used to create a connection.
- Returns:
cls(app, newid, config)
- Parameters:
definition (dict)
Base class for shared connections.
Lookup¶
- class bspump.Lookup(app, id=None, config=None, lazy=False)[source]¶
Bases:
ConfigurableDescription:
- Returns:
Base class for lookup tables.
DictionaryLookup¶
- class bspump.DictionaryLookup(app, id=None, config=None, lazy=False)[source]¶
Bases:
MappingLookupDescription:
Simple dictionary-based lookup.
MappingLookup¶
Mapping-based lookup with set/get operations.
ProcessingError¶
- class bspump.ProcessingError[source]¶
Bases:
RuntimeErrorGeneric exception that indicate an error in a pipeline processing.
Exception raised during event processing.
Analyzer¶
- class bspump.Analyzer(app, pipeline, analyze_on_clock=False, id=None, config=None)[source]¶
Bases:
ProcessorDescription:
- __init__(app, pipeline, analyze_on_clock=False, id=None, config=None)[source]¶
Initializes the Parameters
Parameters
- evaluate(context, event)[source]¶
- The function which records the information from the event into the analyzed object.
Specific for each analyzer.
Parameters
context :
- eventany data type
information with timestamp.
- predicate(context, event)[source]¶
This function is meant to check, if the event is worth to process. If it is, should return True. specific for each analyzer, but default one always returns True.
Parameters
context :
- eventany data type
information with timestamp.
- Returns:
True
Base class for analyzers.
Matrix¶
- class bspump.Matrix(app, dtype='float_', persistent=False, id=None, config=None)[source]¶
Bases:
ABC,ConfigurableGeneric Matrix object.
Matrix structure is organized in a following hierarchical order:
Matrix -> Rows -> Columns -> Cells
Cells have unified data format across the whole matrix. This format is specified by a dtype. It can be a simple integer or float but also a complex dictionary type with names and types of the fields.
The description of types that can be used for a dtype of a cell:
Name
Definition
‘b’
Byte
‘i’
Signed integer
‘u’
Unsigned integer
‘f’
Floating point
‘c’
Complex floating point
‘S’
String
‘U’
Unicode string
‘V’
Raw data
Example: ‘i8’ stands for int64.
For more details, see https://docs.scipy.org/doc/numpy/reference/arrays.dtypes.html
Object main attributes: Array is numpy ndarray, the actual data representation of the matrix object. ClosedRows is a set, where some row ids can be stored before deletion during the matrix rebuild.
- async analyze()[source]¶
The Matrix itself can run the analyze(). It is not recommended to iterate through the matrix row by row (or cell by cell). Instead use numpy functions. Examples: 1. You have a vector with n rows. You need only those row indices, where the cell content is more than 10. Use np.where(vector > 10). 2. You have a matrix with n rows and m columns. You need to find out which rows fully consist of zeros. use np.where(np.all(matrix == 0, axis=1)) to get those row indexes. Instead np.all() you can use np.any() to get all row indexes, where there is at least one zero. 3. Use np.mean(matrix, axis=1) to get means for all rows. 4. Useful numpy functions: np.unique(), np.sum(), np.argmin(), np.argmax().
Matrix data structure for multi-dimensional analysis.