Source code for bspump.matrix.matrix

import abc
import logging
import os
import numpy as np

from bspump.asab import Configurable
from .utils.closedrows import ClosedRows, PersistentClosedRows

###

L = logging.getLogger(__name__)

###


[docs] class Matrix(abc.ABC, Configurable): """ Generic `Matrix` object. Matrix structure is organized in a following hierarchical order: Matrix -> Rows -> Columns -> Cells Cells have unified data format across the whole matrix. This format is specified by a `dtype`. It can be a simple integer or float but also a complex dictionary type with names and types of the fields. The description of types that can be used for a `dtype` of a cell: +------------+------------------+ | Name | Definition | +============+==================+ | 'b' | Byte | +------------+------------------+ | 'i' | Signed integer | +------------+------------------+ | 'u' | Unsigned integer | +------------+------------------+ | 'f' | Floating point | +------------+------------------+ | 'c' | Complex floating | | | point | +------------+------------------+ | 'S' | String | +------------+------------------+ | 'U' | Unicode string | +------------+------------------+ | 'V' | Raw data | +------------+------------------+ Example: 'i8' stands for int64. For more details, see https://docs.scipy.org/doc/numpy/reference/arrays.dtypes.html Object main attributes: `Array` is numpy ndarray, the actual data representation of the matrix object. `ClosedRows` is a set, where some row ids can be stored before deletion during the matrix rebuild. """ ConfigDefaults = { "max_closed_rows_capacity": 0.2, }
[docs] def __init__(self, app, dtype="float_", persistent=False, id=None, config=None): if not isinstance(dtype, str): dtype = dtype[:] self.Id = id if id is not None else self.__class__.__name__ super().__init__("matrix:{}".format(self.Id), config=config) self.App = app self.Loop = app.Loop self.DType = dtype self.MaxClosedRowsCapacity = float(self.Config["max_closed_rows_capacity"]) self.zeros() metrics_service = app.get_service("asab.MetricsService") self.Gauge = metrics_service.create_gauge( "RowCounter", tags={ "matrix": self.Id, }, init_values={ "rows.closed": 0, "rows.active": 0, }, )
[docs] def zeros(self, rows=1): self.Array = np.zeros(self.build_shape(rows), dtype=self.DType) self.ClosedRows = ClosedRows() self.ClosedRows.add(0)
[docs] def flush(self): """ The matrix will be recreated without rows from `ClosedRows`. """ indexes = set(range(self.Array.shape[0])) closed_indexes = set() closed_indexes |= self.ClosedRows.get_rows() saved_indexes = list(indexes - closed_indexes) saved_indexes.sort() self.Array = self.Array.take(saved_indexes, axis=0) self.ClosedRows.flush(self.Array.shape[0]) self.Gauge.set("rows.closed", 0) self.Gauge.set("rows.active", self.Array.shape[0]) return closed_indexes, saved_indexes
[docs] def close_rows(self, row_names, clear=True): pass
[docs] def close_row(self, row_name, clear=True): pass
[docs] def add_row(self): try: return self.ClosedRows.pop() except KeyError: self._grow_rows(max(5, int(0.10 * self.Array.shape[0]))) return self.ClosedRows.pop() finally: crc = len(self.ClosedRows) self.Gauge.set("rows.active", self.Array.shape[0] - crc) self.Gauge.set("rows.closed", crc)
[docs] def build_shape(self, rows=0): """ Override this method to have a control over the shape of the matrix. """ return (rows,)
[docs] def reshape(self, shape): return shape
def _grow_rows(self, rows=1): """ Override this method to gain control on how a new closed rows are added to the matrix """ current_rows = self.Array.shape[0] self.Array = np.pad( self.Array.copy(), ((0, rows), (0, 0)), "constant", constant_values=np.nan ) self.ClosedRows.extend(current_rows, self.Array.shape[0])
[docs] def time(self): return self.App.time()
[docs] async def analyze(self): """ The `Matrix` itself can run the `analyze()`. It is not recommended to iterate through the matrix row by row (or cell by cell). Instead use numpy functions. Examples: 1. You have a vector with n rows. You need only those row indices, where the cell content is more than 10. Use `np.where(vector > 10)`. 2. You have a matrix with n rows and m columns. You need to find out which rows fully consist of zeros. use `np.where(np.all(matrix == 0, axis=1))` to get those row indexes. Instead `np.all()` you can use `np.any()` to get all row indexes, where there is at least one zero. 3. Use `np.mean(matrix, axis=1)` to get means for all rows. 4. Useful numpy functions: `np.unique()`, `np.sum()`, `np.argmin()`, `np.argmax()`. """ pass
[docs] class PersistentMatrix(Matrix): ConfigDefaults = { "path": "", }
[docs] def __init__(self, app, dtype="float_", id=None, config=None): super().__init__(app, dtype=dtype, id=id, config=config)
[docs] def create_path(self): self.Path = self.Config["path"] if not os.path.exists(self.Path): os.makedirs(self.Path) self.ArrayPath = os.path.join(self.Path, "array.dat")
[docs] def zeros(self, rows=1): self.create_path() if os.path.exists(self.ArrayPath): self.Array = np.memmap(self.ArrayPath, dtype=self.DType, mode="readwrite") self.Array = self.Array.reshape(self.reshape(self.Array.shape)) else: array = np.zeros(self.build_shape(rows), dtype=self.DType) self.Array = np.memmap( self.ArrayPath, dtype=self.DType, mode="w+", shape=array.shape ) path = os.path.join(self.Path, "closed_rows.dat") self.ClosedRows = PersistentClosedRows(path, size=self.Array.shape[0])
[docs] def flush(self): """ The matrix will be recreated without rows from `ClosedRows`. """ indexes = set(range(self.Array.shape[0])) closed_indexes = set() closed_indexes |= self.ClosedRows.get_rows() saved_indexes = list(indexes - closed_indexes) saved_indexes.sort() self.Array = self.Array.take(saved_indexes, axis=0) array = np.memmap( self.ArrayPath, dtype=self.DType, mode="w+", shape=self.Array.shape ) array[:] = self.Array[:] self.Array = array self.ClosedRows.flush(self.Array.shape[0]) self.Gauge.set("rows.closed", 0) self.Gauge.set("rows.active", self.Array.shape[0]) return closed_indexes, saved_indexes
[docs] def close_row(self, row_name, clear=True): row_index = self.Index.get_row_index(row_name) if (row_index in self.ClosedRows) or (row_index is None): return False if clear: self.Array[row_index] = np.zeros(1, dtype=self.DType) self.ClosedRows.add(row_index) if len(self.ClosedRows) >= self.MaxClosedRowsCapacity * self.Array.shape[0]: self.flush() crc = len(self.ClosedRows) self.Gauge.set("rows.active", self.Array.shape[0] - crc) self.Gauge.set("rows.closed", crc) return True
def _grow_rows(self, rows=1): """ Override this method to gain control on how a new closed rows are added to the matrix """ current_rows = self.Array.shape[0] array = np.zeros(self.Array.shape, dtype=self.DType) array[:] = self.Array[:] array.resize((current_rows + rows,) + self.Array.shape[1:], refcheck=False) self.Array = np.memmap( self.ArrayPath, dtype=self.DType, mode="w+", shape=array.shape ) self.Array[:] = array[:] self.ClosedRows.extend(current_rows, self.Array.shape[0])