bspump.matrix¶
Matrix data structures for multi-dimensional analysis.
- class bspump.matrix.SessionMatrixExportCSVGenerator(app, pipeline, id=None, config=None)[source]¶
Bases:
Generator
- class bspump.matrix.TimeWindowMatrixExportCSVGenerator(app, pipeline, id=None, config=None)[source]¶
Bases:
Generator
- class bspump.matrix.MatrixSource(app, pipeline, matrix_id, id=None, config=None)[source]¶
Bases:
TriggerSource- __init__(app, pipeline, matrix_id, id=None, config=None)[source]¶
Set the initial ID,
Pipelineand Task.Parameters
- appApplication
Name of an Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ .
- pipelineaddress of a pipeline
Name of a
Pipeline.- idstr, default None
Name of a the
Pipeline.- configcompatible config type , default None
Option for adding a configuration file.
- class bspump.matrix.Matrix(app, dtype='float_', persistent=False, id=None, config=None)[source]¶
Bases:
ABC,ConfigurableGeneric Matrix object.
Matrix structure is organized in a following hierarchical order:
Matrix -> Rows -> Columns -> Cells
Cells have unified data format across the whole matrix. This format is specified by a dtype. It can be a simple integer or float but also a complex dictionary type with names and types of the fields.
The description of types that can be used for a dtype of a cell:
Name
Definition
‘b’
Byte
‘i’
Signed integer
‘u’
Unsigned integer
‘f’
Floating point
‘c’
Complex floating point
‘S’
String
‘U’
Unicode string
‘V’
Raw data
Example: ‘i8’ stands for int64.
For more details, see https://docs.scipy.org/doc/numpy/reference/arrays.dtypes.html
Object main attributes: Array is numpy ndarray, the actual data representation of the matrix object. ClosedRows is a set, where some row ids can be stored before deletion during the matrix rebuild.
- async analyze()[source]¶
The Matrix itself can run the analyze(). It is not recommended to iterate through the matrix row by row (or cell by cell). Instead use numpy functions. Examples: 1. You have a vector with n rows. You need only those row indices, where the cell content is more than 10. Use np.where(vector > 10). 2. You have a matrix with n rows and m columns. You need to find out which rows fully consist of zeros. use np.where(np.all(matrix == 0, axis=1)) to get those row indexes. Instead np.all() you can use np.any() to get all row indexes, where there is at least one zero. 3. Use np.mean(matrix, axis=1) to get means for all rows. 4. Useful numpy functions: np.unique(), np.sum(), np.argmin(), np.argmax().
- class bspump.matrix.PersistentMatrix(app, dtype='float_', id=None, config=None)[source]¶
Bases:
Matrix
- class bspump.matrix.PersistentTimeWindowMatrix(app, dtype='float_', start_time=None, resolution=60, columns=15, clock_driven=False, id=None, config=None)[source]¶
Bases:
PersistentNamedMatrix- __init__(app, dtype='float_', start_time=None, resolution=60, columns=15, clock_driven=False, id=None, config=None)[source]¶
- get_column(event_timestamp)[source]¶
Returns the right column, where the timestamp fits. If if falls earlier or later, returns None. The timestamp should be provided in seconds.
- advance(target_ts)[source]¶
Advance time window (add columns) so it covers target timestamp (target_ts) Also, if target_ts is in top 75% of the last existing column, add a new column too.
“target_ts” must always be in seconds
--------------------|----------- target_ts ^ >>> | ^ Start ------------------------------
- class bspump.matrix.SessionMatrix(app, dtype='float_', id=None, config=None)[source]¶
Bases:
NamedMatrix
- class bspump.matrix.PersistentSessionMatrix(app, dtype='float_', id=None, config=None)[source]¶
Bases:
PersistentNamedMatrix
- class bspump.matrix.GeoMatrix(app, dtype='float_', bbox=None, resolution=5, id=None, config=None)[source]¶
Bases:
MatrixMatrix, specific for GeoAnalyzer. bbox is the dictionary with max_lat, min_lat, max_lon and min_lon for corner gps-coordinates. GeoMatrix is 2d projection of real-world coordinates on a plane.
- get_gps_distance(lat1, lon1, lat2, lon2)[source]¶
Calculation of distance between 2 gps-coordinates in km.
- class bspump.matrix.PersistentGeoMatrix(app, dtype='float_', bbox=None, resolution=5, id=None, config=None)[source]¶
Bases:
PersistentMatrixMatrix, specific for GeoAnalyzer. bbox is the dictionary with max_lat, min_lat, max_lon and min_lon for corner gps-coordinates. GeoMatrix is 2d projection of real-world coordinates on a plane with pointers to IdsToMembers, where objects can be kept.
- get_gps_distance(lat1, lon1, lat2, lon2)[source]¶
Calculation of distance between 2 gps-coordinates in km.
Matrix¶
In-memory matrix for multi-dimensional data.
from bspump.matrix import Matrix
# Create a 100x100 matrix
matrix = Matrix(app, "MyMatrix", (100, 100))
# Access elements
matrix[10, 20] = 5
value = matrix[10, 20]
# Increment
matrix[10, 20] += 1
PersistentMatrix¶
Matrix with disk persistence for recovery.
from bspump.matrix import PersistentMatrix
matrix = PersistentMatrix(
app, "PersistentMatrix",
path="/data/matrix.dat",
shape=(1000, 1000)
)
The matrix state is automatically saved and restored on restart.
NamedMatrix¶
Matrix with named dimensions.
from bspump.matrix import NamedMatrix
matrix = NamedMatrix(app, "NamedMatrix", (100, 100))
# Set row/column names
matrix.set_row_name(0, "user_001")
matrix.set_col_name(0, "product_001")
# Access by name
matrix.set("user_001", "product_001", 5)
value = matrix.get("user_001", "product_001")
PersistentNamedMatrix¶
Named matrix with persistence.
from bspump.matrix import PersistentNamedMatrix
matrix = PersistentNamedMatrix(
app, "PersistentNamedMatrix",
path="/data/named_matrix.dat",
shape=(1000, 1000)
)
Using in Analyzers¶
class MatrixAnalyzer(bspump.Analyzer):
def __init__(self, app, pipeline, id=None, config=None):
super().__init__(app, pipeline, id, config)
self.matrix = Matrix(app, "AnalysisMatrix", (100, 100))
def evaluate(self, context, event):
x = event.get("x", 0) % 100
y = event.get("y", 0) % 100
self.matrix[x, y] += 1
event["cell_count"] = self.matrix[x, y]
return event
Matrix Operations¶
# Get shape
shape = matrix.shape
# Reset to zeros
matrix.zeros()
# Sum all elements
total = matrix.sum()
# Get row/column
row = matrix[10, :]
col = matrix[:, 20]
Configuration¶
[matrix:PersistentMatrix]
path=/data/matrix.dat
# Auto-save interval in seconds
save_interval=60
Best Practices¶
Use PersistentMatrix for recovery: Important data survives restarts
Size appropriately: Large matrices consume memory
Use NamedMatrix for clarity: Named dimensions improve readability
Periodic saves: Configure save_interval for important data