bspump.matrix

Matrix data structures for multi-dimensional analysis.

class bspump.matrix.SessionMatrixExportCSVGenerator(app, pipeline, id=None, config=None)[source]

Bases: Generator

process(context, event)[source]

Description:

Parameters

context :

eventany data type

information of any data type with timestamp.

class bspump.matrix.TimeWindowMatrixExportCSVGenerator(app, pipeline, id=None, config=None)[source]

Bases: Generator

process(context, event)[source]

Description:

Parameters

context :

eventany data type

information of any data type with timestamp.

class bspump.matrix.MatrixSource(app, pipeline, matrix_id, id=None, config=None)[source]

Bases: TriggerSource

__init__(app, pipeline, matrix_id, id=None, config=None)[source]

Set the initial ID, Pipeline and Task.

Parameters

appApplication

Name of an Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ .

pipelineaddress of a pipeline

Name of a Pipeline.

idstr, default None

Name of a the Pipeline.

configcompatible config type , default None

Option for adding a configuration file.

async cycle()[source]

Not implemented.

Parameters

*args : ?

**kwags : ?

class bspump.matrix.Matrix(app, dtype='float_', persistent=False, id=None, config=None)[source]

Bases: ABC, Configurable

Generic Matrix object.

Matrix structure is organized in a following hierarchical order:

Matrix -> Rows -> Columns -> Cells

Cells have unified data format across the whole matrix. This format is specified by a dtype. It can be a simple integer or float but also a complex dictionary type with names and types of the fields.

The description of types that can be used for a dtype of a cell:

Name

Definition

‘b’

Byte

‘i’

Signed integer

‘u’

Unsigned integer

‘f’

Floating point

‘c’

Complex floating point

‘S’

String

‘U’

Unicode string

‘V’

Raw data

Example: ‘i8’ stands for int64.

For more details, see https://docs.scipy.org/doc/numpy/reference/arrays.dtypes.html

Object main attributes: Array is numpy ndarray, the actual data representation of the matrix object. ClosedRows is a set, where some row ids can be stored before deletion during the matrix rebuild.

ConfigDefaults: dict = {'max_closed_rows_capacity': 0.2}
__init__(app, dtype='float_', persistent=False, id=None, config=None)[source]
zeros(rows=1)[source]
flush()[source]

The matrix will be recreated without rows from ClosedRows.

close_rows(row_names, clear=True)[source]
close_row(row_name, clear=True)[source]
add_row()[source]
build_shape(rows=0)[source]

Override this method to have a control over the shape of the matrix.

reshape(shape)[source]
time()[source]
async analyze()[source]

The Matrix itself can run the analyze(). It is not recommended to iterate through the matrix row by row (or cell by cell). Instead use numpy functions. Examples: 1. You have a vector with n rows. You need only those row indices, where the cell content is more than 10. Use np.where(vector > 10). 2. You have a matrix with n rows and m columns. You need to find out which rows fully consist of zeros. use np.where(np.all(matrix == 0, axis=1)) to get those row indexes. Instead np.all() you can use np.any() to get all row indexes, where there is at least one zero. 3. Use np.mean(matrix, axis=1) to get means for all rows. 4. Useful numpy functions: np.unique(), np.sum(), np.argmin(), np.argmax().

class bspump.matrix.PersistentMatrix(app, dtype='float_', id=None, config=None)[source]

Bases: Matrix

ConfigDefaults: dict = {'path': ''}
__init__(app, dtype='float_', id=None, config=None)[source]
create_path()[source]
zeros(rows=1)[source]
flush()[source]

The matrix will be recreated without rows from ClosedRows.

close_row(row_name, clear=True)[source]
class bspump.matrix.NamedMatrix(app, dtype='float_', id=None, config=None)[source]

Bases: Matrix

__init__(app, dtype='float_', id=None, config=None)[source]
zeros()[source]
serialize()[source]
deserialize(data)[source]
flush()[source]

The matrix will be recreated without rows from ClosedRows.

add_row(row_name)[source]
Parameters:

row_name (str)

close_row(row_name, clear=True)[source]
close_rows(row_names, clear=True)[source]
get_row_index(row_name)[source]
Parameters:

row_name (str)

get_row_name(row_index)[source]
Parameters:

row_index (int)

class bspump.matrix.PersistentTimeWindowMatrix(app, dtype='float_', start_time=None, resolution=60, columns=15, clock_driven=False, id=None, config=None)[source]

Bases: PersistentNamedMatrix

__init__(app, dtype='float_', start_time=None, resolution=60, columns=15, clock_driven=False, id=None, config=None)[source]
build_shape(rows=0)[source]

Override this method to have a control over the shape of the matrix.

reshape(shape)[source]
add_row(row_name)[source]

Adds new row with row_id to the matrix and sets warming_up_count.

get_column(event_timestamp)[source]

Returns the right column, where the timestamp fits. If if falls earlier or later, returns None. The timestamp should be provided in seconds.

advance(target_ts)[source]

Advance time window (add columns) so it covers target timestamp (target_ts) Also, if target_ts is in top 75% of the last existing column, add a new column too.

“target_ts” must always be in seconds

--------------------|-----------
target_ts  ^ >>>    |
                                        ^
                                        Start
------------------------------
close_row(row_name, clear=True)[source]
flush()[source]

The matrix will be recreated without rows from ClosedRows.

async on_clock_tick()[source]

React on timer’s tick and advance the window.

zeros()[source]
add_column()[source]

Adds new time column to the matrix and deletes the first one, simulating the time flow. Start and End attributes are advanced as well.

class bspump.matrix.SessionMatrix(app, dtype='float_', id=None, config=None)[source]

Bases: NamedMatrix

ConfigDefaults: dict = {'primary_name': 'id'}

Matrix, specific for SessionAnalyzer.

__init__(app, dtype='float_', id=None, config=None)[source]
store(row_name, event)[source]

Store the event in the matrix. The event must prepared so that it matches a data type of the cell (dtype)

Parameters:

row_name (str)

store_event(row_index, event, keys=None)[source]
Parameters:

row_index (int)

decode_row(row_index, keys=None)[source]
Parameters:

row_index (int)

class bspump.matrix.PersistentSessionMatrix(app, dtype='float_', id=None, config=None)[source]

Bases: PersistentNamedMatrix

ConfigDefaults: dict = {'primary_name': 'id'}
__init__(app, dtype='float_', id=None, config=None)[source]
store(row_name, event)[source]

Store the event in the matrix. The event must prepared so that it matches a data type of the cell (dtype)

Parameters:

row_name (str)

store_event(row_index, event, keys=None)[source]
Parameters:

row_index (int)

decode_row(row_index, keys=None)[source]
Parameters:

row_index (int)

class bspump.matrix.GeoMatrix(app, dtype='float_', bbox=None, resolution=5, id=None, config=None)[source]

Bases: Matrix

Matrix, specific for GeoAnalyzer. bbox is the dictionary with max_lat, min_lat, max_lon and min_lon for corner gps-coordinates. GeoMatrix is 2d projection of real-world coordinates on a plane.

ConfigDefaults: dict = {'max_lat': 71.26, 'max_lon': 40.6, 'min_lat': 23.33, 'min_lon': -10.1}
__init__(app, dtype='float_', bbox=None, resolution=5, id=None, config=None)[source]
zeros()[source]
is_in_boundaries(lat, lon)[source]

Check, if coordinates are within the bbox coordinates.

degrees_to_radians(degrees)[source]
get_gps_distance(lat1, lon1, lat2, lon2)[source]

Calculation of distance between 2 gps-coordinates in km.

project_equirectangular(lat, lon)[source]

Converts latitude and longitude into row and column indexes.

inverse_equirectangular(row, column)[source]

Converts row and column into latitude and longitude.

class bspump.matrix.PersistentGeoMatrix(app, dtype='float_', bbox=None, resolution=5, id=None, config=None)[source]

Bases: PersistentMatrix

Matrix, specific for GeoAnalyzer. bbox is the dictionary with max_lat, min_lat, max_lon and min_lon for corner gps-coordinates. GeoMatrix is 2d projection of real-world coordinates on a plane with pointers to IdsToMembers, where objects can be kept.

ConfigDefaults: dict = {'max_lat': 71.26, 'max_lon': 40.6, 'min_lat': 23.33, 'min_lon': -10.1}
__init__(app, dtype='float_', bbox=None, resolution=5, id=None, config=None)[source]
zeros()[source]
reshape(shape)[source]
is_in_boundaries(lat, lon)[source]

Check, if coordinates are within the bbox coordinates.

degrees_to_radians(degrees)[source]
get_gps_distance(lat1, lon1, lat2, lon2)[source]

Calculation of distance between 2 gps-coordinates in km.

project_equirectangular(lat, lon)[source]

Converts latitude and longitude into row and column indexes.

inverse_equirectangular(row, column)[source]

Converts row and column into latitude and longitude.

Matrix

In-memory matrix for multi-dimensional data.

from bspump.matrix import Matrix

# Create a 100x100 matrix
matrix = Matrix(app, "MyMatrix", (100, 100))

# Access elements
matrix[10, 20] = 5
value = matrix[10, 20]

# Increment
matrix[10, 20] += 1

PersistentMatrix

Matrix with disk persistence for recovery.

from bspump.matrix import PersistentMatrix

matrix = PersistentMatrix(
    app, "PersistentMatrix",
    path="/data/matrix.dat",
    shape=(1000, 1000)
)

The matrix state is automatically saved and restored on restart.

NamedMatrix

Matrix with named dimensions.

from bspump.matrix import NamedMatrix

matrix = NamedMatrix(app, "NamedMatrix", (100, 100))

# Set row/column names
matrix.set_row_name(0, "user_001")
matrix.set_col_name(0, "product_001")

# Access by name
matrix.set("user_001", "product_001", 5)
value = matrix.get("user_001", "product_001")

PersistentNamedMatrix

Named matrix with persistence.

from bspump.matrix import PersistentNamedMatrix

matrix = PersistentNamedMatrix(
    app, "PersistentNamedMatrix",
    path="/data/named_matrix.dat",
    shape=(1000, 1000)
)

Using in Analyzers

class MatrixAnalyzer(bspump.Analyzer):
    def __init__(self, app, pipeline, id=None, config=None):
        super().__init__(app, pipeline, id, config)
        self.matrix = Matrix(app, "AnalysisMatrix", (100, 100))

    def evaluate(self, context, event):
        x = event.get("x", 0) % 100
        y = event.get("y", 0) % 100
        self.matrix[x, y] += 1
        event["cell_count"] = self.matrix[x, y]
        return event

Matrix Operations

# Get shape
shape = matrix.shape

# Reset to zeros
matrix.zeros()

# Sum all elements
total = matrix.sum()

# Get row/column
row = matrix[10, :]
col = matrix[:, 20]

Configuration

[matrix:PersistentMatrix]
path=/data/matrix.dat
# Auto-save interval in seconds
save_interval=60

Best Practices

  1. Use PersistentMatrix for recovery: Important data survives restarts

  2. Size appropriately: Large matrices consume memory

  3. Use NamedMatrix for clarity: Named dimensions improve readability

  4. Periodic saves: Configure save_interval for important data