Source code for bspump.lookup.matrixlookup

import json
import logging

import numpy as np

import bspump.asab as asab
from ..abc.lookup import Lookup
from ..matrix.sessionmatrix import SessionMatrix

###

L = logging.getLogger(__name__)

###


[docs] class MatrixLookup(Lookup): """ Numpy Lookup """ ConfigDefaults = { "update_period": 5, }
[docs] def __init__( self, app, matrix_id=None, dtype="float_", on_clock_update=False, id=None, config=None, lazy=False, ): super().__init__(app, id=id, config=config, lazy=lazy) self.Indexes = {} svc = app.get_service("bspump.PumpService") if matrix_id is None: s_id = self.Id + "Matrix" self.Matrix = SessionMatrix(app, dtype, id=s_id) svc.add_matrix(self.Matrix) else: self.Matrix = svc.locate_matrix(matrix_id) self.MatrixPubSub = None self.Timer = None self.Target = None if self.is_master(): if on_clock_update: self.UpdatePeriod = float(self.Config["update_period"]) self.Timer = asab.Timer(app, self._on_clock_tick, autorestart=True) self.Timer.start(self.UpdatePeriod) else: self.MatrixPubSub = self.Matrix.PubSub self.MatrixPubSub.subscribe("Matrix changed!", self._on_matrix_changed)
async def _on_matrix_changed(self, message): self.update_indexes() async def _on_clock_tick(self): self.update_indexes()
[docs] def update_indexes(self): for index in self.Indexes: self.Indexes[index].update(self.Matrix)
[docs] def search(self, condition, target_column): """ Default search, override if optimized with indexes """ x = np.where(condition) if len(x[0]) == 0: return None return np.asscalar(self.Matrix.Array[x[0][0]][target_column])
[docs] def serialize(self): serialized = {} serialized["Matrix"] = self.Matrix.serialize() serialized["Indexes"] = {} for index in self.Indexes: serialized["Indexes"][index] = self.Indexes[index].serialize() return (json.dumps(serialized)).encode("utf-8")
[docs] def deserialize(self, data_json): data = json.loads(data_json.decode("utf-8")) self.Matrix.deserialize(data["Matrix"]) indexes = data.get("Indexes", {}) for index in indexes: self.Indexes[index].deserialize(indexes[index])
[docs] def rest_get(self): rest = super().rest_get() rest["Indexes"] = {} for index in self.Indexes: rest["Indexes"][index] = self.Indexes[index].serialize() rest["Matrix"] = self.Matrix.serialize() return rest
[docs] def create_index(self, index_class, *args, **kwarg): new_index = index_class(*args, **kwarg) index_id = new_index.Id self.Indexes[index_id] = new_index return new_index