bspump

The main BSPump module containing the core application framework.

class bspump.BSPumpApplication(*args, **kwargs)[source]

Bases: Application

Description: BSPumpApplication is class used for …..

Returns:

__init__(args=None)[source]

Initialize the Application provided with arguments and modules.

Parameters:
  • args – sequence of arguments to be parsed by Application.parse_arguments() call.

  • modules – list of ASAB modules to be added by Application.add_module() call.

Examples:

```python class MyApplication(asab.Application):

def __init__(self):

super().__init__(modules=[asab.web.Module, asab.zookeeper.Module])

```

create_argument_parser()[source]

Description:

Returns:

parse_arguments(args=None)[source]

Parse the command line arguments and set the default values for the configuration accordingly.

Parameters:

args – The arguments to parse. If not set, sys.argv[1:] will be used.

Returns:

The arguments that were parsed.

async main()[source]

This method is called during the application run-time. It is intended to be overridden by the user.

class bspump.BSPumpService(app, service_name='bspump.PumpService')[source]

Bases: Service

Service registry based on Service object. Read more in ASAB documentation `Service <https://asab.readthedocs.io/en/latest/asab/service.html`_.

__init__(app, service_name='bspump.PumpService')[source]

Initializes parameters passed to the Service class.

Parameters

appApplication

Name of the Application.

service_namestr, Service name

string variable containing “”bspump.PumpService


locate(address)[source]

Locates a pipeline, source or processor based on the addressed parameter.

Parameters

addressstr, ID

Address of an pipeline component. 1. To locate a Pipeline, use the pipeline ID. 2. To locate a Processor or a Sink, use the pipeline ID and the component ID separated by a dot, like ‘pipelineId.componentId’. 3. To locate a Source, add ‘*’ before ID of the source, like ‘pipeline.*SourceId’.

Returns

Pipeline, Source or Processor object if it is found in the components list, otherwise None.

Usage

>>> pipeline = service.locate("SuperCoolPipeline")
>>> pipeline.Id
SuperCoolPipeline
>>> processor = service.locate("SuperCoolPipeline.PPrintProcessor")
>>> processor.Id
PPrintProcessor
>>> source = service.locate("SuperCoolPipeline.*MySource")
>>> source.Id
MySource
add_pipeline(pipeline)[source]

Adds a pipeline to the BSPump.

Parameters

pipelinePipeline

Name of the Pipeline.

add_pipelines(*pipelines)[source]

Adds a pipelines the BSPump.

Parameters

*pipelineslist

List of pipelines that are add to the BSPump.

del_pipeline(pipeline)[source]

Deletes a pipeline from a list of Pipelines.

**Parameters*

pipelinestr, ID

ID of a pipeline.

add_connection(connection)[source]

Adds a connection to the Connection dictionary.

Parameters

connectionstr, ID

ID of a connection.

Returns:

connection

add_connections(*connections)[source]

Adds a connections to the Connection dictionary.

Parameters

*connectionstr, ID

list of IDs of a connections.

locate_connection(connection_id)[source]

Locates connection based on connection ID.

Parameters

connection_idID

Connection ID.

add_lookup(lookup)[source]

Sets a lookup based on Lookup.

Parameters

lookupLookup

Name of the Lookup.

Returns:

lookup

add_lookups(*lookups)[source]

Adds a list of lookups to the Pipeline.

Parameters

lookupLookup

List of Lookups.

locate_lookup(lookup_id, context=None)[source]

Locates lookup based on ID.

Parameters

lookup_idID

ID of a Lookup.

context,default = None

Additional information.

Returns:

lookup from the lookup service or form the internal dictionary.

add_lookup_factory(lookup_factory)[source]

Adds a lookup factory

Parameters

lookup_factory :

Name of lookup factory.

add_matrix(matrix)[source]

Adds a matrix to the Pipeline.

Parameters

matrixMatrix

Name of Matrix.

Returns:

matrix

add_matrixes(*matrixes)[source]

Adds a list of Matrices to the Pipeline.

Parameters

*matrixeslist

List of matrices.

locate_matrix(matrix_id)[source]

Locates a matrix based on matrix ID

Parameters

matrix_idstr, ID

ID of a matrix.

async initialize(app)[source]

Initializes an Application based on ASAB Application

Parameters

appApplication

Name of the Application

async finalize(app)[source]

Stops all the pipelines

Parameters

appApplication

Name of the Application

Returns:

class bspump.Pipeline(app, id=None, config=None)[source]

Bases: ABC, Configurable

Description: Pipeline is …

An example of The Pipeline construction:

class MyPipeline(bspump.Pipeline):

def __init__(self, app, pipeline_id):

super().__init__(app, pipeline_id) self.build(

[ MySource(app, self), MyProcessor(app, self), MyProcessor2(app, self), ]

bspump.common.NullSink(app, self),

)

ConfigDefaults: dict = {'async_concurency_limit': 1000, 'reset_profiler': True, 'stop_on_errors': True}
__init__(app, id=None, config=None)[source]

Initializes basic variables used in the other Pipeline methods. You can also add more information using parameters.

Parameters

appApplication

name of the ASAB Application

idstr, default None

You can enter ID of the class. Otherwise a name of the current class will used by calling __class__ descriptor object.

config,default None

You can add a config file with additional settings and configurations, otherwise a default config is used.

time()[source]

Returns correct time.

Returns:

App.time()

Hint:

More information in the ASAB documentation in UTC Time.

get_throttles()[source]

Returns components from Pipeline that are throttled.

Returns:

self._throttles Return list of throttles.

Parameters: —

Returns:

xxxx

is_error()[source]

Returns False when there is no error, otherwise it returns True.

Returns:

self._error is not None.

Parameters: —

Returns:

xxxx

set_error(context, event, exc)[source]

When called with exc is None, it resets error (aka recovery).

When called with exc, it sets exceptions for soft errors.

Parameters

contexttype?

Context of an error.

eventData with time stamp stored in any data type usually is in JSON.

You can specify an event that is passed to the method.

excException.

Python default exceptions.

handle_error(exception, context, event)[source]

Used for setting up exceptions and conditions for errors. You can implement it to evaluate processing errors.

Parameters

exceptionException

Used for setting up a custom Exception.

contextinformation

Additional information can be passed.

eventData with time stamp stored in any data type, usually it is in JSON.

You can specify an event that is passed to the method.

Returns:

False for hard errors (stop the Pipeline processing). True for soft errors that will be ignored.

Example:

class SampleInternalPipeline(bspump.Pipeline):

                def __init__(self, app, pipeline_id):
                                super().__init__(app, pipeline_id)

                                self.build(
                                                bspump.common.InternalSource(app, self),
                                                bspump.common.JSONParserProcessor(app, self),
                                                bspump.common.PPrintSink(app, self)
                                )

                def handle_error(self, exception, context, event):
                                if isinstance(exception, json.decoder.JSONDecodeError):
                                                return True
                                return False

Links this Pipeline with an ancestral Pipeline. This is needed e. g. for a propagation of the throttling from child Pipelines back to their ancestors. If the child Pipeline uses InternalSource, it may become throttled because the internal queue is full. If so, the throttling is propagated to the ancestral Pipeline, so that its source may block incoming events until the internal queue is empty again.

Parameters

ancestral_pipelinestr

ID of a Pipeline that will be linked.

Unlinks an ancestral Pipeline from this Pipeline.

Parameters

ancestral_pipelinestr

ID of a ancestral Pipeline that will be unlinked.

throttle(who, enable=True)[source]

Enables throttling method for a chosen pipeline and its ancestral pipelines,x if needed.

Parameters

whoID of a processor.

Name of a processor that we want to throttle.

enablebool, default True

When True, content of argument ‘who’ is added to _throttles list.

async ready()[source]

Checks if the Pipeline is ready. The method can be used in source: await self.Pipeline.ready().

is_ready()[source]

This method is a check up of the event in the Event class.

Returns:

_ready.is_set().

inject(context, event, depth)[source]

Injects method serves to inject events into the Pipeline’s depth defined by the depth attribute. Every depth is interconnected with a generator object.

Parameters

contextstring

Information propagated through the Pipeline.

eventData with time stamp stored in any data type, usually it is in JSON.

You can specify an event that is passed to the method.

depthint

Level of depth.

Note:

For normal operations, it is highly recommended to use process method instead.

async process(event, context=None)[source]

Process method serves to inject events into the Pipeline’s depth 0, while incrementing the event in metric.

Parameters

eventData with time stamp stored in any data type, usually it is in JSON.

You can specify an event that is passed to the method.

contextstr, default None

You can add additional information needed for work with event streaming.

Hint:

This is recommended way of inserting events into a Pipeline.

create_eps_counter()[source]

Creates a dictionary with information about the Pipeline. It contains eps (events per second), warnings and errors.

Returns:

self.MetricsService Creates eps counter using MetricsService.

Note:

EPS counter can be created using this method or dicertly by using MatricsService method.

ensure_future(coro)[source]

You can use this method to schedule a future task that will be executed in a context of the Pipeline. The Pipeline also manages a whole lifecycle of the future/task, which means, it will collect the future result, trash it, and mainly it will capture any possible exception, which will then block the Pipeline via set_error().

Parameters

coro??

??

Hint:

If the number of futures exceeds the configured limit, the Pipeline is throttled.


set_source(source)[source]

Sets a specific source or list of sources to the Pipeline.

Parameters

sourcestr, list optional

ID of a source.

If a list of sources is passed to the method, it adds the entire list of sources to the Pipeline.

append_processor(processor)[source]

Adds a Processors to the current Pipeline.

Parameters

processorstr

ID of a processor.

Hint:

The Generator can be added by using this method. It requires a depth parameter.

remove_processor(processor_id)[source]

Removes a specific processor from the Pipeline.

Parameters

processor_idstr

ID of a processor.

Raises:

Error when processor is not found.

insert_before(id, processor)[source]

Inserts the Processor into the Pipeline in front of another processor specified by ID.

Parameters

idstr

ID of a processor that we want to insert.

processorstr

Name of the processor in front of which will be inserted the new processor.

Returns:

True on success. False if ID was not found.

insert_after(id, processor)[source]

Inserts the Processor into the Pipeline behind another Processors specified by ID.

Parameters

idstr

ID of a processor that we want to insert.

processorstr

Name of a processor after which we insert our processor.

Returns:

True if successful. False if ID was not found.

build(source, *processors)[source]

This method enables to add sources, Processors, and sink to create the structure of the Pipeline.

Parameters

sourcestr

ID of a source.

*processorsstr, list optional

ID of Processor or list of IDs.

iter_processors()[source]

Uses python generator routine that iterates through all Processors in the Pipeline.

Yields:

A Processor from a list in the Pipeline.

locate_source(address)[source]

Locates a sources based on its ID.

Parameters

addressstr

ID of the source.

locate_connection(app, connection_id)[source]

Finds a connection by ID.

Parameters

appApplication

Name of the Application.

connection_idstr

ID of connection we want to locate.

Returns:

connection

locate_processor(processor_id)[source]

Finds a Processor by ID.

Parameters

processor_idstr

ID of a Processor.

Returns:

processor


start()[source]

Starts the lifecycle of the Pipeline.

async stop()[source]

Gracefully stops the lifecycle of the Pipeline.

rest_get()[source]

Returns information about the status of the Pipeline:

Returns:

class bspump.PumpBuilder(definition)[source]

Bases: object

PumpBuilder is meant to create the pipeline with connections, processors, sources alternatively. definition is a path to the json file, containing description of the pump. Example of such file:

{
        "pipelines" : [
                {
                        "id": "MyPipeline0",
                        "args": {},
                        "config": {},
                        "sources": [
                                {
                                        "id": "FileCSVSource",
                                        "module": "bspump.file",
                                        "class" : "FileCSVSource",
                                        "args": {},
                                        "config": {"path":"etc/test.csv", "post":"noop"},
                                        "trigger": {
                                                "module": "bspump.trigger",
                                                "class": "OpportunisticTrigger",
                                                "id": "",
                                                "args": {}
                                        }
                                }
                        ],
                        "processors": [
                                {
                                        "module":"bspump-pumpbuilder",
                                        "class": "Processor00",
                                        "args": {},
                                        "config": {}
                                }
                        ],
                        "sink": {
                                "module":"bspump.common",
                                "class": "PPrintSink",
                                "args": {},
                                "config": {}
                        }
                }
        ]
}

|
__init__(definition)[source]

Description:


construct_pump(app, svc)[source]

The main method to construct the pump. app is a BspumpApplication object, svc` is service. Example of use:

app = BSPumpApplication()
svc = app.get_service("bspump.PumpService")
pump_builder = PumpBuilder(definition)
pump_builder.construct_pump(app, svc)
app.run()

|
construct_connections(app, svc)[source]

Description:


construct_connection(app, svc, connection)[source]

Description:


construct_lookups(app, svc)[source]

Description:


construct_lookup(app, svc, lookup)[source]

Description:


construct_pipelines(app, svc)[source]

Description:


construct_pipeline(app, svc, pipeline_definition)[source]

Description:


construct_sources(app, svc, pipeline, definition)[source]

Description:


construct_source(app, svc, pipeline, definition)[source]

Description:


construct_trigger(app, svc, definition)[source]

Description:


construct_processors(app, svc, pipeline, definition)[source]

Description:


construct_processor(app, svc, pipeline, definition)[source]

Description:


class bspump.Source(app, pipeline, id=None, config=None)[source]

Bases: Configurable

Description:

__init__(app, pipeline, id=None, config=None)[source]

Set the initial ID, Pipeline and Task.

Parameters

appApplication

Name of an Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ .

pipelineaddress of a pipeline

Name of a Pipeline.

idstr, default None

Name of a the Pipeline.

configcompatible config type , default None

Option for adding a configuration file.

async process(event, context=None)[source]

This method is used to emit event into a Pipeline.

Parameters

event: Data with time stamp stored in any data type, usually JSON.

Message or information that is passed to the method and emitted into a Pipeline.

contextdefault None

Additional information.

If there is an error in the processing of the event, the Pipeline is throttled by setting the error and the exception raised.

:hint The source should catch this exception and fail gracefully.

start(loop)[source]

Starts the Pipeline through the _main method, but if main method is implemented it starts the coroutine using main method instead.

Parameters

loop?

Contains the coroutines.

async stop()[source]

Stops the Source using self.Task. If the processes are not done it cancels them or raises an error.

Returns:

restart(loop)[source]

Restarts the loop of coroutines and returns result() method.

Parameters

loop??

Contains the coroutines.

async main()[source]

Can be implemented for additional features, else will raise NotImplementedError and _main is called instead.

async stopped()[source]

Waits for all asynchronous tasks to be completed. It is helper that simplifies the implementation of sources.

Example:

..code:: python

async def main(self):

#… initialize resources here

await self.stopped()

#… finalize resources here

locate_address()[source]

Locates address of a Pipeline.

Returns:

ID and ID of a Pipeline as a string.

rest_get()[source]
Returns:

ID and class ID

classmethod construct(app, pipeline, definition)[source]

Can create a source based on a specific definition. For example, a JSON file.

Parameters

appApplication

Name of the Application.

pipelinePipeline

Specification of a Pipeline.

definitiondict

Definition that is used to create a source.

Returns:

cls(app, newid, config)

Parameters:

definition (dict)

class bspump.TriggerSource(app, pipeline, id=None, config=None)[source]

Bases: Source

A specialized source that responds to trigger events.

TriggerSource is designed to work with BitSwan’s trigger system. It waits for trigger events and executes a cycle() method when triggered. This allows for event-driven data processing in pipelines.

Key features: - Waits for trigger events using asyncio.Event - Executes cycle() method when triggered - Manages multiple triggers simultaneously - Handles trigger lifecycle events

Usage:
class MyTriggerSource(TriggerSource):
async def cycle(self, *args, **kwargs):

# Your trigger logic here await self.Pipeline.process({“data”: “triggered”})

# Attach to any trigger type source = MyTriggerSource(app, pipeline) source.on(some_trigger)

__init__(app, pipeline, id=None, config=None)[source]

Set the initial ID, Pipeline and Task.

Parameters

appApplication

Name of an Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ .

pipelineaddress of a pipeline

Name of a Pipeline.

idstr, default None

Name of a the Pipeline.

configcompatible config type , default None

Option for adding a configuration file.

time()[source]

Method used for measuring an accurate time.

Returns:

App.time()

Hint:

You can find more information about UTC Time in the ASAB documentation

on(trigger)[source]

Sets a Trigger which is a method that waits for a given condition.

Parameters

triggerkeyword of a trigger

Given condition that.

Returns:

Trigger.add(trigger)

async main(*args, **kwags)[source]

Waits for Pipeline, triggers, and calls exceptions when the source is initiated.

Parameters

*args : ?

**kwags : ?


async cycle(*args, **kwags)[source]

Not implemented.

Parameters

*args : ?

**kwags : ?

rest_get()[source]

Description:

Returns:

class bspump.Sink(app, pipeline, id=None, config=None)[source]

Bases: ProcessorBase

Sink is basically a processor. It takes an event sends it to a database where it is stored.


handle_error(context, event, exception, timestamp)[source]
class bspump.Processor(app, pipeline, id=None, config=None)[source]

Bases: ProcessorBase

Inherits from ProcessorBase.


class bspump.Generator(app, pipeline, id=None, config=None)[source]

Bases: ProcessorBase

Description:


__init__(app, pipeline, id=None, config=None)[source]

Description:

Parameters

appApplication

Name of the Application.

pipelinePipeline

Name of the Pipeline.

idstr, default = None

ID

configJSON, default = None

configuration file containing additional information.

set_depth(depth)[source]

Description:

Parameters

depth : int

process(context, event)[source]

Description:

Parameters

context :

eventany data type

information of any data type with timestamp.

async generate(context, event, depth)[source]

Description:

Parameters

context :

eventany data type

information of any data type with timestamp.

depth : int

class bspump.Connection(app, id=None, config=None)[source]

Bases: ABC, Configurable

Connection class is responsible for creating a connection between items or services within the infrastructure of BSPump. Their main use is to create connection with the main components of BSPump: source, processor and sink.


__init__(app, id=None, config=None)[source]

Description:

Parameters

appApplication

Specification of an Application.

id : default None

configJSON or other compatible format, default None

It contains important information and data responsible for creating a connection.

time()[source]

Returns accurate time of the asynchronous process.

Hint:

More information in the ASAB documentation in UTC Time.


classmethod construct(app, definition)[source]

Creates a connection based on a specific definition. For example, a JSON file.

Parameters

appApplication

ID of the Application <https://asab.readthedocs.io/en/latest/asab/application.html#>_.

definitiondefinition format

Defines instructions for the method that can be used to create a connection.

Returns:

cls(app, newid, config)

Parameters:

definition (dict)


class bspump.Analyzer(app, pipeline, analyze_on_clock=False, id=None, config=None)[source]

Bases: Processor

Description:

ConfigDefaults: dict = {'analyze_period': 60}
__init__(app, pipeline, analyze_on_clock=False, id=None, config=None)[source]

Initializes the Parameters

Parameters

appobject

Application object.

pipelinePipeline

Name of the Pipeline.

idstr, default=None,

ID of the class of config.

configJSON, or other compatible formats, default=None

Configuration file.

start_timer(event_type)[source]

Description:

analyze()[source]

Description:

evaluate(context, event)[source]
The function which records the information from the event into the analyzed object.

Specific for each analyzer.

Parameters

context :

eventany data type

information with timestamp.

predicate(context, event)[source]

This function is meant to check, if the event is worth to process. If it is, should return True. specific for each analyzer, but default one always returns True.

Parameters

context :

eventany data type

information with timestamp.

Returns:

True

process(context, event)[source]
The event passes through process(context, event) unchanged.

Meanwhile it is evaluated.

Parameters

context :

eventany data type

information with timestamp.

Returns:

event

async on_clock_tick()[source]

Run analysis every tick.

exception bspump.ProcessingError[source]

Bases: RuntimeError

Generic exception that indicate an error in a pipeline processing.

class bspump.Lookup(app, id=None, config=None, lazy=False)[source]

Bases: Configurable

Description:


Returns:

ConfigDefaults: dict = {'master_lookup_id': '', 'master_url': 'http://localhost:8080', 'master_url_endpoint': '/bspump/v1/lookup/', 'source_url': ''}
__init__(app, id=None, config=None, lazy=False)[source]

Description:

time()[source]

Description:

Returns:

time


ensure_future_update(loop)[source]

Description:

Returns:


async load()[source]

Description:

Return type:

bool

serialize()[source]

Description:

deserialize(data)[source]

Description:


rest_get()[source]

Description:

Returns:

is_master()[source]

Description:

Returns:

??


class bspump.MappingLookup(app, id=None, config=None, lazy=False)[source]

Bases: Lookup, Mapping

Description:


class bspump.DictionaryLookup(app, id=None, config=None, lazy=False)[source]

Bases: MappingLookup

Description:

__init__(app, id=None, config=None, lazy=False)[source]

Description:


serialize()[source]

Description:

Returns:

json data


deserialize(data)[source]

Description:


rest_get()[source]

Description:

Returns:

rest


set(dictionary)[source]

Description:


Parameters:

dictionary (dict)

bspump.load_json_file(fname)[source]
class bspump.Matrix(app, dtype='float_', persistent=False, id=None, config=None)[source]

Bases: ABC, Configurable

Generic Matrix object.

Matrix structure is organized in a following hierarchical order:

Matrix -> Rows -> Columns -> Cells

Cells have unified data format across the whole matrix. This format is specified by a dtype. It can be a simple integer or float but also a complex dictionary type with names and types of the fields.

The description of types that can be used for a dtype of a cell:

Name

Definition

‘b’

Byte

‘i’

Signed integer

‘u’

Unsigned integer

‘f’

Floating point

‘c’

Complex floating point

‘S’

String

‘U’

Unicode string

‘V’

Raw data

Example: ‘i8’ stands for int64.

For more details, see https://docs.scipy.org/doc/numpy/reference/arrays.dtypes.html

Object main attributes: Array is numpy ndarray, the actual data representation of the matrix object. ClosedRows is a set, where some row ids can be stored before deletion during the matrix rebuild.

ConfigDefaults: dict = {'max_closed_rows_capacity': 0.2}
__init__(app, dtype='float_', persistent=False, id=None, config=None)[source]
zeros(rows=1)[source]
flush()[source]

The matrix will be recreated without rows from ClosedRows.

close_rows(row_names, clear=True)[source]
close_row(row_name, clear=True)[source]
add_row()[source]
build_shape(rows=0)[source]

Override this method to have a control over the shape of the matrix.

reshape(shape)[source]
time()[source]
async analyze()[source]

The Matrix itself can run the analyze(). It is not recommended to iterate through the matrix row by row (or cell by cell). Instead use numpy functions. Examples: 1. You have a vector with n rows. You need only those row indices, where the cell content is more than 10. Use np.where(vector > 10). 2. You have a matrix with n rows and m columns. You need to find out which rows fully consist of zeros. use np.where(np.all(matrix == 0, axis=1)) to get those row indexes. Instead np.all() you can use np.any() to get all row indexes, where there is at least one zero. 3. Use np.mean(matrix, axis=1) to get means for all rows. 4. Useful numpy functions: np.unique(), np.sum(), np.argmin(), np.argmax().

class bspump.PersistentMatrix(app, dtype='float_', id=None, config=None)[source]

Bases: Matrix

ConfigDefaults: dict = {'path': ''}
__init__(app, dtype='float_', id=None, config=None)[source]
create_path()[source]
zeros(rows=1)[source]
flush()[source]

The matrix will be recreated without rows from ClosedRows.

close_row(row_name, clear=True)[source]
class bspump.NamedMatrix(app, dtype='float_', id=None, config=None)[source]

Bases: Matrix

__init__(app, dtype='float_', id=None, config=None)[source]
zeros()[source]
serialize()[source]
deserialize(data)[source]
flush()[source]

The matrix will be recreated without rows from ClosedRows.

add_row(row_name)[source]
Parameters:

row_name (str)

close_row(row_name, clear=True)[source]
close_rows(row_names, clear=True)[source]
get_row_index(row_name)[source]
Parameters:

row_name (str)

get_row_name(row_index)[source]
Parameters:

row_index (int)

class bspump.Model(app, id=None, config=None)[source]

Bases: Configurable

Generic Model object. Loads trained model and parameters.

ConfigDefaults: dict = {'path_model': '', 'path_parameters': ''}
__init__(app, id=None, config=None)[source]
load_model_from_file()[source]

Load model from file.

load_parameters_from_file()[source]

Loads model parameters from json file. Override if needed.

async update()[source]

Updates model on fly.

transform(*args)[source]

Method used to transform data for model input.

predict(*args)[source]

Method uses model to predict value from sample.

class bspump.PersistentNamedMatrix(app, dtype='float_', id=None, config=None)[source]

Bases: PersistentMatrix

__init__(app, dtype='float_', id=None, config=None)[source]
zeros()[source]
flush()[source]

The matrix will be recreated without rows from ClosedRows.

add_row(row_name)[source]
Parameters:

row_name (str)

close_row(row_name, clear=True)[source]
close_rows(row_names, clear=True)[source]
get_row_index(row_name)[source]
Parameters:

row_name (str)

get_row_name(row_index)[source]
Parameters:

row_index (int)

class bspump.Anomaly[source]

Bases: dict

Description: Anomaly is an abstract class to be overridden for a specific anomaly and its type.

Returns:

Implement: TYPE, on_tick


TYPE = None
is_closed()[source]

Description:

Returns:

sets status to closed


close(current_time)[source]

Description:

async on_tick(current_time)[source]

Description:

Hint:

Implement to perform operations on the anomaly, f. e. close.

BSPumpApplication

class bspump.BSPumpApplication(*args, **kwargs)[source]

Bases: Application

Description: BSPumpApplication is class used for …..

Returns:

__init__(args=None)[source]

Initialize the Application provided with arguments and modules.

Parameters:
  • args – sequence of arguments to be parsed by Application.parse_arguments() call.

  • modules – list of ASAB modules to be added by Application.add_module() call.

Examples:

```python class MyApplication(asab.Application):

def __init__(self):

super().__init__(modules=[asab.web.Module, asab.zookeeper.Module])

```

create_argument_parser()[source]

Description:

Returns:

parse_arguments(args=None)[source]

Parse the command line arguments and set the default values for the configuration accordingly.

Parameters:

args – The arguments to parse. If not set, sys.argv[1:] will be used.

Returns:

The arguments that were parsed.

async main()[source]

This method is called during the application run-time. It is intended to be overridden by the user.

The main application class that manages the event loop, services, and component lifecycle.

Example:

import bspump

app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
svc.add_pipeline(MyPipeline(app, "MyPipeline"))
app.run()

BSPumpService

class bspump.BSPumpService(app, service_name='bspump.PumpService')[source]

Bases: Service

Service registry based on Service object. Read more in ASAB documentation `Service <https://asab.readthedocs.io/en/latest/asab/service.html`_.

__init__(app, service_name='bspump.PumpService')[source]

Initializes parameters passed to the Service class.

Parameters

appApplication

Name of the Application.

service_namestr, Service name

string variable containing “”bspump.PumpService


locate(address)[source]

Locates a pipeline, source or processor based on the addressed parameter.

Parameters

addressstr, ID

Address of an pipeline component. 1. To locate a Pipeline, use the pipeline ID. 2. To locate a Processor or a Sink, use the pipeline ID and the component ID separated by a dot, like ‘pipelineId.componentId’. 3. To locate a Source, add ‘*’ before ID of the source, like ‘pipeline.*SourceId’.

Returns

Pipeline, Source or Processor object if it is found in the components list, otherwise None.

Usage

>>> pipeline = service.locate("SuperCoolPipeline")
>>> pipeline.Id
SuperCoolPipeline
>>> processor = service.locate("SuperCoolPipeline.PPrintProcessor")
>>> processor.Id
PPrintProcessor
>>> source = service.locate("SuperCoolPipeline.*MySource")
>>> source.Id
MySource
add_pipeline(pipeline)[source]

Adds a pipeline to the BSPump.

Parameters

pipelinePipeline

Name of the Pipeline.

add_pipelines(*pipelines)[source]

Adds a pipelines the BSPump.

Parameters

*pipelineslist

List of pipelines that are add to the BSPump.

del_pipeline(pipeline)[source]

Deletes a pipeline from a list of Pipelines.

**Parameters*

pipelinestr, ID

ID of a pipeline.

add_connection(connection)[source]

Adds a connection to the Connection dictionary.

Parameters

connectionstr, ID

ID of a connection.

Returns:

connection

add_connections(*connections)[source]

Adds a connections to the Connection dictionary.

Parameters

*connectionstr, ID

list of IDs of a connections.

locate_connection(connection_id)[source]

Locates connection based on connection ID.

Parameters

connection_idID

Connection ID.

add_lookup(lookup)[source]

Sets a lookup based on Lookup.

Parameters

lookupLookup

Name of the Lookup.

Returns:

lookup

add_lookups(*lookups)[source]

Adds a list of lookups to the Pipeline.

Parameters

lookupLookup

List of Lookups.

locate_lookup(lookup_id, context=None)[source]

Locates lookup based on ID.

Parameters

lookup_idID

ID of a Lookup.

context,default = None

Additional information.

Returns:

lookup from the lookup service or form the internal dictionary.

add_lookup_factory(lookup_factory)[source]

Adds a lookup factory

Parameters

lookup_factory :

Name of lookup factory.

add_matrix(matrix)[source]

Adds a matrix to the Pipeline.

Parameters

matrixMatrix

Name of Matrix.

Returns:

matrix

add_matrixes(*matrixes)[source]

Adds a list of Matrices to the Pipeline.

Parameters

*matrixeslist

List of matrices.

locate_matrix(matrix_id)[source]

Locates a matrix based on matrix ID

Parameters

matrix_idstr, ID

ID of a matrix.

async initialize(app)[source]

Initializes an Application based on ASAB Application

Parameters

appApplication

Name of the Application

async finalize(app)[source]

Stops all the pipelines

Parameters

appApplication

Name of the Application

Returns:

Service that manages pipelines, connections, and lookups.

Methods:

  • add_pipeline(pipeline) - Register a pipeline

  • add_connection(connection) - Register a connection

  • add_lookup(lookup) - Register a lookup

  • locate_connection(connection_id) - Get a connection by ID

  • locate_lookup(lookup_id) - Get a lookup by ID

Pipeline

class bspump.Pipeline(app, id=None, config=None)[source]

Bases: ABC, Configurable

Description: Pipeline is …

An example of The Pipeline construction:

class MyPipeline(bspump.Pipeline):

def __init__(self, app, pipeline_id):

super().__init__(app, pipeline_id) self.build(

[ MySource(app, self), MyProcessor(app, self), MyProcessor2(app, self), ]

bspump.common.NullSink(app, self),

)

ConfigDefaults: dict = {'async_concurency_limit': 1000, 'reset_profiler': True, 'stop_on_errors': True}
__init__(app, id=None, config=None)[source]

Initializes basic variables used in the other Pipeline methods. You can also add more information using parameters.

Parameters

appApplication

name of the ASAB Application

idstr, default None

You can enter ID of the class. Otherwise a name of the current class will used by calling __class__ descriptor object.

config,default None

You can add a config file with additional settings and configurations, otherwise a default config is used.

time()[source]

Returns correct time.

Returns:

App.time()

Hint:

More information in the ASAB documentation in UTC Time.

get_throttles()[source]

Returns components from Pipeline that are throttled.

Returns:

self._throttles Return list of throttles.

Parameters: —

Returns:

xxxx

is_error()[source]

Returns False when there is no error, otherwise it returns True.

Returns:

self._error is not None.

Parameters: —

Returns:

xxxx

set_error(context, event, exc)[source]

When called with exc is None, it resets error (aka recovery).

When called with exc, it sets exceptions for soft errors.

Parameters

contexttype?

Context of an error.

eventData with time stamp stored in any data type usually is in JSON.

You can specify an event that is passed to the method.

excException.

Python default exceptions.

handle_error(exception, context, event)[source]

Used for setting up exceptions and conditions for errors. You can implement it to evaluate processing errors.

Parameters

exceptionException

Used for setting up a custom Exception.

contextinformation

Additional information can be passed.

eventData with time stamp stored in any data type, usually it is in JSON.

You can specify an event that is passed to the method.

Returns:

False for hard errors (stop the Pipeline processing). True for soft errors that will be ignored.

Example:

class SampleInternalPipeline(bspump.Pipeline):

                def __init__(self, app, pipeline_id):
                                super().__init__(app, pipeline_id)

                                self.build(
                                                bspump.common.InternalSource(app, self),
                                                bspump.common.JSONParserProcessor(app, self),
                                                bspump.common.PPrintSink(app, self)
                                )

                def handle_error(self, exception, context, event):
                                if isinstance(exception, json.decoder.JSONDecodeError):
                                                return True
                                return False

link(ancestral_pipeline)[source]

Links this Pipeline with an ancestral Pipeline. This is needed e. g. for a propagation of the throttling from child Pipelines back to their ancestors. If the child Pipeline uses InternalSource, it may become throttled because the internal queue is full. If so, the throttling is propagated to the ancestral Pipeline, so that its source may block incoming events until the internal queue is empty again.

Parameters

ancestral_pipelinestr

ID of a Pipeline that will be linked.

unlink(ancestral_pipeline)[source]

Unlinks an ancestral Pipeline from this Pipeline.

Parameters

ancestral_pipelinestr

ID of a ancestral Pipeline that will be unlinked.

throttle(who, enable=True)[source]

Enables throttling method for a chosen pipeline and its ancestral pipelines,x if needed.

Parameters

whoID of a processor.

Name of a processor that we want to throttle.

enablebool, default True

When True, content of argument ‘who’ is added to _throttles list.

async ready()[source]

Checks if the Pipeline is ready. The method can be used in source: await self.Pipeline.ready().

is_ready()[source]

This method is a check up of the event in the Event class.

Returns:

_ready.is_set().

inject(context, event, depth)[source]

Injects method serves to inject events into the Pipeline’s depth defined by the depth attribute. Every depth is interconnected with a generator object.

Parameters

contextstring

Information propagated through the Pipeline.

eventData with time stamp stored in any data type, usually it is in JSON.

You can specify an event that is passed to the method.

depthint

Level of depth.

Note:

For normal operations, it is highly recommended to use process method instead.

async process(event, context=None)[source]

Process method serves to inject events into the Pipeline’s depth 0, while incrementing the event in metric.

Parameters

eventData with time stamp stored in any data type, usually it is in JSON.

You can specify an event that is passed to the method.

contextstr, default None

You can add additional information needed for work with event streaming.

Hint:

This is recommended way of inserting events into a Pipeline.

create_eps_counter()[source]

Creates a dictionary with information about the Pipeline. It contains eps (events per second), warnings and errors.

Returns:

self.MetricsService Creates eps counter using MetricsService.

Note:

EPS counter can be created using this method or dicertly by using MatricsService method.

ensure_future(coro)[source]

You can use this method to schedule a future task that will be executed in a context of the Pipeline. The Pipeline also manages a whole lifecycle of the future/task, which means, it will collect the future result, trash it, and mainly it will capture any possible exception, which will then block the Pipeline via set_error().

Parameters

coro??

??

Hint:

If the number of futures exceeds the configured limit, the Pipeline is throttled.


set_source(source)[source]

Sets a specific source or list of sources to the Pipeline.

Parameters

sourcestr, list optional

ID of a source.

If a list of sources is passed to the method, it adds the entire list of sources to the Pipeline.

append_processor(processor)[source]

Adds a Processors to the current Pipeline.

Parameters

processorstr

ID of a processor.

Hint:

The Generator can be added by using this method. It requires a depth parameter.

remove_processor(processor_id)[source]

Removes a specific processor from the Pipeline.

Parameters

processor_idstr

ID of a processor.

Raises:

Error when processor is not found.

insert_before(id, processor)[source]

Inserts the Processor into the Pipeline in front of another processor specified by ID.

Parameters

idstr

ID of a processor that we want to insert.

processorstr

Name of the processor in front of which will be inserted the new processor.

Returns:

True on success. False if ID was not found.

insert_after(id, processor)[source]

Inserts the Processor into the Pipeline behind another Processors specified by ID.

Parameters

idstr

ID of a processor that we want to insert.

processorstr

Name of a processor after which we insert our processor.

Returns:

True if successful. False if ID was not found.

build(source, *processors)[source]

This method enables to add sources, Processors, and sink to create the structure of the Pipeline.

Parameters

sourcestr

ID of a source.

*processorsstr, list optional

ID of Processor or list of IDs.

iter_processors()[source]

Uses python generator routine that iterates through all Processors in the Pipeline.

Yields:

A Processor from a list in the Pipeline.

locate_source(address)[source]

Locates a sources based on its ID.

Parameters

addressstr

ID of the source.

locate_connection(app, connection_id)[source]

Finds a connection by ID.

Parameters

appApplication

Name of the Application.

connection_idstr

ID of connection we want to locate.

Returns:

connection

locate_processor(processor_id)[source]

Finds a Processor by ID.

Parameters

processor_idstr

ID of a Processor.

Returns:

processor


start()[source]

Starts the lifecycle of the Pipeline.

async stop()[source]

Gracefully stops the lifecycle of the Pipeline.

rest_get()[source]

Returns information about the status of the Pipeline:

Returns:

The core pipeline class that chains sources, processors, and sinks.

Example:

class MyPipeline(bspump.Pipeline):
    def __init__(self, app, pipeline_id):
        super().__init__(app, pipeline_id)
        self.build(
            MySource(app, self),
            MyProcessor(app, self),
            MySink(app, self),
        )

PumpBuilder

class bspump.PumpBuilder(definition)[source]

Bases: object

PumpBuilder is meant to create the pipeline with connections, processors, sources alternatively. definition is a path to the json file, containing description of the pump. Example of such file:

{
        "pipelines" : [
                {
                        "id": "MyPipeline0",
                        "args": {},
                        "config": {},
                        "sources": [
                                {
                                        "id": "FileCSVSource",
                                        "module": "bspump.file",
                                        "class" : "FileCSVSource",
                                        "args": {},
                                        "config": {"path":"etc/test.csv", "post":"noop"},
                                        "trigger": {
                                                "module": "bspump.trigger",
                                                "class": "OpportunisticTrigger",
                                                "id": "",
                                                "args": {}
                                        }
                                }
                        ],
                        "processors": [
                                {
                                        "module":"bspump-pumpbuilder",
                                        "class": "Processor00",
                                        "args": {},
                                        "config": {}
                                }
                        ],
                        "sink": {
                                "module":"bspump.common",
                                "class": "PPrintSink",
                                "args": {},
                                "config": {}
                        }
                }
        ]
}

|
__init__(definition)[source]

Description:


construct_pump(app, svc)[source]

The main method to construct the pump. app is a BspumpApplication object, svc` is service. Example of use:

app = BSPumpApplication()
svc = app.get_service("bspump.PumpService")
pump_builder = PumpBuilder(definition)
pump_builder.construct_pump(app, svc)
app.run()

|
construct_connections(app, svc)[source]

Description:


construct_connection(app, svc, connection)[source]

Description:


construct_lookups(app, svc)[source]

Description:


construct_lookup(app, svc, lookup)[source]

Description:


construct_pipelines(app, svc)[source]

Description:


construct_pipeline(app, svc, pipeline_definition)[source]

Description:


construct_sources(app, svc, pipeline, definition)[source]

Description:


construct_source(app, svc, pipeline, definition)[source]

Description:


construct_trigger(app, svc, definition)[source]

Description:


construct_processors(app, svc, pipeline, definition)[source]

Description:


construct_processor(app, svc, pipeline, definition)[source]

Description:


Utility for building pipelines programmatically.

Source

class bspump.Source(app, pipeline, id=None, config=None)[source]

Bases: Configurable

Description:

__init__(app, pipeline, id=None, config=None)[source]

Set the initial ID, Pipeline and Task.

Parameters

appApplication

Name of an Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ .

pipelineaddress of a pipeline

Name of a Pipeline.

idstr, default None

Name of a the Pipeline.

configcompatible config type , default None

Option for adding a configuration file.

async process(event, context=None)[source]

This method is used to emit event into a Pipeline.

Parameters

event: Data with time stamp stored in any data type, usually JSON.

Message or information that is passed to the method and emitted into a Pipeline.

contextdefault None

Additional information.

If there is an error in the processing of the event, the Pipeline is throttled by setting the error and the exception raised.

:hint The source should catch this exception and fail gracefully.

start(loop)[source]

Starts the Pipeline through the _main method, but if main method is implemented it starts the coroutine using main method instead.

Parameters

loop?

Contains the coroutines.

async stop()[source]

Stops the Source using self.Task. If the processes are not done it cancels them or raises an error.

Returns:

restart(loop)[source]

Restarts the loop of coroutines and returns result() method.

Parameters

loop??

Contains the coroutines.

async main()[source]

Can be implemented for additional features, else will raise NotImplementedError and _main is called instead.

async stopped()[source]

Waits for all asynchronous tasks to be completed. It is helper that simplifies the implementation of sources.

Example:

..code:: python

async def main(self):

#… initialize resources here

await self.stopped()

#… finalize resources here

locate_address()[source]

Locates address of a Pipeline.

Returns:

ID and ID of a Pipeline as a string.

rest_get()[source]
Returns:

ID and class ID

classmethod construct(app, pipeline, definition)[source]

Can create a source based on a specific definition. For example, a JSON file.

Parameters

appApplication

Name of the Application.

pipelinePipeline

Specification of a Pipeline.

definitiondict

Definition that is used to create a source.

Returns:

cls(app, newid, config)

Parameters:

definition (dict)

Base class for event sources.

TriggerSource

class bspump.TriggerSource(app, pipeline, id=None, config=None)[source]

Bases: Source

A specialized source that responds to trigger events.

TriggerSource is designed to work with BitSwan’s trigger system. It waits for trigger events and executes a cycle() method when triggered. This allows for event-driven data processing in pipelines.

Key features: - Waits for trigger events using asyncio.Event - Executes cycle() method when triggered - Manages multiple triggers simultaneously - Handles trigger lifecycle events

Usage:
class MyTriggerSource(TriggerSource):
async def cycle(self, *args, **kwargs):

# Your trigger logic here await self.Pipeline.process({“data”: “triggered”})

# Attach to any trigger type source = MyTriggerSource(app, pipeline) source.on(some_trigger)

__init__(app, pipeline, id=None, config=None)[source]

Set the initial ID, Pipeline and Task.

Parameters

appApplication

Name of an Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ .

pipelineaddress of a pipeline

Name of a Pipeline.

idstr, default None

Name of a the Pipeline.

configcompatible config type , default None

Option for adding a configuration file.

time()[source]

Method used for measuring an accurate time.

Returns:

App.time()

Hint:

You can find more information about UTC Time in the ASAB documentation

on(trigger)[source]

Sets a Trigger which is a method that waits for a given condition.

Parameters

triggerkeyword of a trigger

Given condition that.

Returns:

Trigger.add(trigger)

async main(*args, **kwags)[source]

Waits for Pipeline, triggers, and calls exceptions when the source is initiated.

Parameters

*args : ?

**kwags : ?


async cycle(*args, **kwags)[source]

Not implemented.

Parameters

*args : ?

**kwags : ?

rest_get()[source]

Description:

Returns:

Base class for trigger-activated sources.

Processor

class bspump.Processor(app, pipeline, id=None, config=None)[source]

Bases: ProcessorBase

Inherits from ProcessorBase.


Base class for event processors.

Generator

class bspump.Generator(app, pipeline, id=None, config=None)[source]

Bases: ProcessorBase

Description:


__init__(app, pipeline, id=None, config=None)[source]

Description:

Parameters

appApplication

Name of the Application.

pipelinePipeline

Name of the Pipeline.

idstr, default = None

ID

configJSON, default = None

configuration file containing additional information.

set_depth(depth)[source]

Description:

Parameters

depth : int

process(context, event)[source]

Description:

Parameters

context :

eventany data type

information of any data type with timestamp.

async generate(context, event, depth)[source]

Description:

Parameters

context :

eventany data type

information of any data type with timestamp.

depth : int

Base class for generators that produce multiple events.

Sink

class bspump.Sink(app, pipeline, id=None, config=None)[source]

Bases: ProcessorBase

Sink is basically a processor. It takes an event sends it to a database where it is stored.


handle_error(context, event, exception, timestamp)[source]

Base class for event sinks.

Connection

class bspump.Connection(app, id=None, config=None)[source]

Bases: ABC, Configurable

Connection class is responsible for creating a connection between items or services within the infrastructure of BSPump. Their main use is to create connection with the main components of BSPump: source, processor and sink.


__init__(app, id=None, config=None)[source]

Description:

Parameters

appApplication

Specification of an Application.

id : default None

configJSON or other compatible format, default None

It contains important information and data responsible for creating a connection.

time()[source]

Returns accurate time of the asynchronous process.

Hint:

More information in the ASAB documentation in UTC Time.


classmethod construct(app, definition)[source]

Creates a connection based on a specific definition. For example, a JSON file.

Parameters

appApplication

ID of the Application <https://asab.readthedocs.io/en/latest/asab/application.html#>_.

definitiondefinition format

Defines instructions for the method that can be used to create a connection.

Returns:

cls(app, newid, config)

Parameters:

definition (dict)


Base class for shared connections.

Lookup

class bspump.Lookup(app, id=None, config=None, lazy=False)[source]

Bases: Configurable

Description:


Returns:

ConfigDefaults: dict = {'master_lookup_id': '', 'master_url': 'http://localhost:8080', 'master_url_endpoint': '/bspump/v1/lookup/', 'source_url': ''}
__init__(app, id=None, config=None, lazy=False)[source]

Description:

Provider: LookupProviderABC | None
time()[source]

Description:

Returns:

time


ensure_future_update(loop)[source]

Description:

Returns:


async load()[source]

Description:

Return type:

bool

serialize()[source]

Description:

deserialize(data)[source]

Description:


rest_get()[source]

Description:

Returns:

is_master()[source]

Description:

Returns:

??


Base class for lookup tables.

DictionaryLookup

class bspump.DictionaryLookup(app, id=None, config=None, lazy=False)[source]

Bases: MappingLookup

Description:

__init__(app, id=None, config=None, lazy=False)[source]

Description:


serialize()[source]

Description:

Returns:

json data


deserialize(data)[source]

Description:


rest_get()[source]

Description:

Returns:

rest


set(dictionary)[source]

Description:


Parameters:

dictionary (dict)

Simple dictionary-based lookup.

MappingLookup

class bspump.MappingLookup(app, id=None, config=None, lazy=False)[source]

Bases: Lookup, Mapping

Description:


Mapping-based lookup with set/get operations.

ProcessingError

class bspump.ProcessingError[source]

Bases: RuntimeError

Generic exception that indicate an error in a pipeline processing.

Exception raised during event processing.

Analyzer

class bspump.Analyzer(app, pipeline, analyze_on_clock=False, id=None, config=None)[source]

Bases: Processor

Description:

ConfigDefaults: dict = {'analyze_period': 60}
__init__(app, pipeline, analyze_on_clock=False, id=None, config=None)[source]

Initializes the Parameters

Parameters

appobject

Application object.

pipelinePipeline

Name of the Pipeline.

idstr, default=None,

ID of the class of config.

configJSON, or other compatible formats, default=None

Configuration file.

start_timer(event_type)[source]

Description:

analyze()[source]

Description:

evaluate(context, event)[source]
The function which records the information from the event into the analyzed object.

Specific for each analyzer.

Parameters

context :

eventany data type

information with timestamp.

predicate(context, event)[source]

This function is meant to check, if the event is worth to process. If it is, should return True. specific for each analyzer, but default one always returns True.

Parameters

context :

eventany data type

information with timestamp.

Returns:

True

process(context, event)[source]
The event passes through process(context, event) unchanged.

Meanwhile it is evaluated.

Parameters

context :

eventany data type

information with timestamp.

Returns:

event

async on_clock_tick()[source]

Run analysis every tick.

Base class for analyzers.

Matrix

class bspump.Matrix(app, dtype='float_', persistent=False, id=None, config=None)[source]

Bases: ABC, Configurable

Generic Matrix object.

Matrix structure is organized in a following hierarchical order:

Matrix -> Rows -> Columns -> Cells

Cells have unified data format across the whole matrix. This format is specified by a dtype. It can be a simple integer or float but also a complex dictionary type with names and types of the fields.

The description of types that can be used for a dtype of a cell:

Name

Definition

‘b’

Byte

‘i’

Signed integer

‘u’

Unsigned integer

‘f’

Floating point

‘c’

Complex floating point

‘S’

String

‘U’

Unicode string

‘V’

Raw data

Example: ‘i8’ stands for int64.

For more details, see https://docs.scipy.org/doc/numpy/reference/arrays.dtypes.html

Object main attributes: Array is numpy ndarray, the actual data representation of the matrix object. ClosedRows is a set, where some row ids can be stored before deletion during the matrix rebuild.

ConfigDefaults: dict = {'max_closed_rows_capacity': 0.2}
__init__(app, dtype='float_', persistent=False, id=None, config=None)[source]
zeros(rows=1)[source]
flush()[source]

The matrix will be recreated without rows from ClosedRows.

close_rows(row_names, clear=True)[source]
close_row(row_name, clear=True)[source]
add_row()[source]
build_shape(rows=0)[source]

Override this method to have a control over the shape of the matrix.

reshape(shape)[source]
time()[source]
async analyze()[source]

The Matrix itself can run the analyze(). It is not recommended to iterate through the matrix row by row (or cell by cell). Instead use numpy functions. Examples: 1. You have a vector with n rows. You need only those row indices, where the cell content is more than 10. Use np.where(vector > 10). 2. You have a matrix with n rows and m columns. You need to find out which rows fully consist of zeros. use np.where(np.all(matrix == 0, axis=1)) to get those row indexes. Instead np.all() you can use np.any() to get all row indexes, where there is at least one zero. 3. Use np.mean(matrix, axis=1) to get means for all rows. 4. Useful numpy functions: np.unique(), np.sum(), np.argmin(), np.argmax().

Matrix data structure for multi-dimensional analysis.