Source code for bspump.matrix.timewindowmatrix

import time
import logging

import numpy as np

from bspump.asab import Timer

import os

from .namedmatrix import NamedMatrix, PersistentNamedMatrix
from .utils.timeconfig import TimeConfig, PersistentTimeConfig
from .utils.warmingupcount import WarmingUpCount, PersistentWarmingUpCount

###

L = logging.getLogger(__name__)

###


class TimeWindowMatrix(NamedMatrix):
    """
            Container, specific for `TimeWindowAnalyzer`.

            The main specific attributes are:
            `Start` is the starting timestamp of the first column of the matrix;
            `End` is the ending timestamp of the last column;
            `Resolution` is the width of the column in seconds.

    .. code-block:: python

            --> Columns (time dimension), column "width" = resolution
            +---+---+---+---+---+---+
            |   |   |   |   |   |   |
            +---+---+---+---+---+---+
            |   |   |   |   |   |   |
            +---+---+---+---+---+---+
            |   |   |   |   |   |   |
            +---+---+---+---+---+---+
            |   |   |   |   |   |   |
            +---+---+---+---+---+---+
            |   |   |   |   |   |   |
            +---+---+---+---+---+---+
            ^                       ^
            End (past)   <          Start (== now)

    """

    def __init__(
        self,
        app,
        dtype="float_",
        start_time=None,
        resolution=60,
        columns=15,
        clock_driven=False,
        id=None,
        config=None,
    ):
        self.Columns = columns
        if start_time is None:
            start_time = time.time()

        start = (1 + (start_time // resolution)) * resolution
        self.Start = start
        self.Resolution = resolution

        super().__init__(app, dtype=dtype, id=id, config=config)

        if clock_driven:
            advance_period = resolution / 4
            self.Timer = Timer(app, self.on_clock_tick, autorestart=True)
            self.Timer.start(advance_period)
        else:
            self.Timer = None

        self.ClockDriven = clock_driven
        metrics_service = app.get_service("asab.MetricsService")
        self.Counters = metrics_service.create_counter(
            "EarlyLateEventCounter",
            tags={
                "matrix": self.Id,
            },
            init_values={
                "events.early": 0,
                "events.late": 0,
            },
        )

    def build_shape(self, rows=0):
        """
        Override this method to have a control over the shape of the matrix.
        """
        return (
            rows,
            self.Columns,
        )

    def reshape(self, shape):
        return (
            int(shape[0] / self.Columns),
            self.Columns,
        )

    def add_row(self, row_name):
        """
        Adds new row with `row_id` to the matrix and sets `warming_up_count`.
        """

        row_index = super().add_row(row_name)
        if self.Array.shape[0] != len(self.WarmingUpCount):
            self.WarmingUpCount.extend(self.Array.shape[0], self.Array.shape[1])
        else:
            self.WarmingUpCount.assign(row_index, self.Array.shape[1])

        return row_index

    def get_column(self, event_timestamp):
        """
        Returns the right column, where the timestamp fits.
        If if falls earlier or later, returns `None`.
        The timestamp should be provided in seconds.
        """

        if event_timestamp <= self.TimeConfig.get_end():
            self.Counters.add("events.late", 1)
            return None

        if event_timestamp >= self.TimeConfig.get_start():
            self.Counters.add("events.early", 1)
            return None

        column_idx = int(
            (event_timestamp - self.TimeConfig.get_end())
            // self.TimeConfig.get_resolution()
        )

        # These are temporal debug lines
        if column_idx < 0:
            L.exception(
                "The column index {} is less then 0, {} event timestamp, {} start time, {} end time, {} resolution, {} num columns".format(
                    column_idx,
                    event_timestamp,
                    self.TimeConfig.get_start(),
                    self.TimeConfig.get_end(),
                    self.TimeConfig.get_resolution(),
                    self.Array.shape[1],
                )
            )
            raise

        if column_idx >= self.Array.shape[1]:
            L.exception(
                "The column index {} is more then columns number, {} event timestamp, {} start time, {} end time, {} resolution, {} num columns".format(
                    column_idx,
                    event_timestamp,
                    self.TimeConfig.get_start(),
                    self.TimeConfig.get_end(),
                    self.TimeConfig.get_resolution(),
                    self.Array.shape[1],
                )
            )
            raise

        return column_idx

    def advance(self, target_ts):
        """
                Advance time window (add columns) so it covers target `timestamp` (`target_ts`)
                Also, if `target_ts` is in top 75% of the last existing column, add a new column too.

                "target_ts" must always be in seconds

        .. code-block:: python

                --------------------|-----------
                target_ts  ^ >>>    |
                                                        ^
                                                        Start
                ------------------------------

        """
        added = 0
        while True:
            dt = (
                self.TimeConfig.get_start() - target_ts
            ) / self.TimeConfig.get_resolution()
            if dt > 0.25:
                break
            self.add_column()
            added += 1

        return added

    def close_row(self, row_name, clear=True):
        closed = super().close_row(row_name, clear=clear)
        if closed:
            row_index = self.Index.get_row_index(row_name)
            if row_index is not None:
                self.WarmingUpCount.assign(row_index, self.Array.shape[1])

        return closed

    def flush(self):
        """
        The matrix will be recreated without rows from `ClosedRows`.
        """
        closed_indexes, saved_indexes = super().flush()
        self.WarmingUpCount.flush(saved_indexes)
        return closed_indexes, saved_indexes

    async def on_clock_tick(self):
        """
        React on timer's tick and advance the window.
        """
        target_ts = time.time()
        self.advance(target_ts)

    def zeros(self):
        super().zeros()
        self.TimeConfig = TimeConfig(self.Resolution, self.Columns, self.Start)
        self.End = self.TimeConfig.get_end()
        self.WarmingUpCount = WarmingUpCount(self.Array.shape[0])

    def add_column(self):
        """
        Adds new time column to the matrix and deletes the first one, simulating
        the time flow. `Start` and `End` attributes are advanced as well.
        """

        self.TimeConfig.add_start(self.TimeConfig.get_resolution())
        self.TimeConfig.add_end(self.TimeConfig.get_resolution())

        if self.Array.shape[0] == 0:
            return

        column = np.empty(
            (
                self.Array.shape[0],
                1,
            )
            + self.Array.shape[2:],
            dtype=self.Array.dtype,
        )
        column[:] = np.nan
        array = self.Array
        array = np.hstack((array, column))
        array = np.delete(array, 0, axis=1)
        self.Array = array

        open_rows = list(
            set(range(0, self.Array.shape[0])) - self.ClosedRows.get_rows()
        )
        self.WarmingUpCount.decrease(open_rows)
        self.Start = self.TimeConfig.get_start()
        self.End = self.TimeConfig.get_end()


[docs] class PersistentTimeWindowMatrix(PersistentNamedMatrix):
[docs] def __init__( self, app, dtype="float_", start_time=None, resolution=60, columns=15, clock_driven=False, id=None, config=None, ): self.Columns = columns if start_time is None: start_time = time.time() start = (1 + (start_time // resolution)) * resolution self.Start = start self.Resolution = resolution super().__init__(app, dtype=dtype, id=id, config=config) if clock_driven: advance_period = resolution / 4 self.Timer = Timer(app, self.on_clock_tick, autorestart=True) self.Timer.start(advance_period) if self.TimeConfig.get_start() != start: self.advance(start) else: self.Timer = None self.ClockDriven = clock_driven self.Start = self.TimeConfig.get_start() self.End = self.TimeConfig.get_end() metrics_service = app.get_service("asab.MetricsService") self.Counters = metrics_service.create_counter( "EarlyLateEventCounter", tags={ "matrix": self.Id, }, init_values={ "events.early": 0, "events.late": 0, }, )
[docs] def build_shape(self, rows=0): """ Override this method to have a control over the shape of the matrix. """ return ( rows, self.Columns, )
[docs] def reshape(self, shape): return ( int(shape[0] / self.Columns), self.Columns, )
[docs] def add_row(self, row_name): """ Adds new row with `row_id` to the matrix and sets `warming_up_count`. """ row_index = super().add_row(row_name) if self.Array.shape[0] != len(self.WarmingUpCount): self.WarmingUpCount.extend(self.Array.shape[0], self.Array.shape[1]) else: self.WarmingUpCount.assign(row_index, self.Array.shape[1]) return row_index
[docs] def get_column(self, event_timestamp): """ Returns the right column, where the timestamp fits. If if falls earlier or later, returns `None`. The timestamp should be provided in seconds. """ if event_timestamp <= self.TimeConfig.get_end(): self.Counters.add("events.late", 1) return None if event_timestamp >= self.TimeConfig.get_start(): self.Counters.add("events.early", 1) return None column_idx = int( (event_timestamp - self.TimeConfig.get_end()) // self.TimeConfig.get_resolution() ) # These are temporal debug lines if column_idx < 0: L.exception( "The column index {} is less then 0, {} event timestamp, {} start time, {} end time, {} resolution, {} num columns".format( column_idx, event_timestamp, self.TimeConfig.get_start(), self.TimeConfig.get_end(), self.TimeConfig.get_resolution(), self.Array.shape[1], ) ) raise if column_idx >= self.Array.shape[1]: L.exception( "The column index {} is more then columns number, {} event timestamp, {} start time, {} end time, {} resolution, {} num columns".format( column_idx, event_timestamp, self.TimeConfig.get_start(), self.TimeConfig.get_end(), self.TimeConfig.get_resolution(), self.Array.shape[1], ) ) raise return column_idx
[docs] def advance(self, target_ts): """ Advance time window (add columns) so it covers target `timestamp` (`target_ts`) Also, if `target_ts` is in top 75% of the last existing column, add a new column too. "target_ts" must always be in seconds .. code-block:: python --------------------|----------- target_ts ^ >>> | ^ Start ------------------------------ """ added = 0 while True: dt = ( self.TimeConfig.get_start() - target_ts ) / self.TimeConfig.get_resolution() if dt > 0.25: break self.add_column() added += 1 return added
[docs] def close_row(self, row_name, clear=True): closed = super().close_row(row_name, clear=clear) if closed: row_index = self.Index.get_row_index(row_name) if row_index is not None: self.WarmingUpCount.assign(row_index, self.Array.shape[1]) return closed
[docs] def flush(self): """ The matrix will be recreated without rows from `ClosedRows`. """ closed_indexes, saved_indexes = super().flush() self.WarmingUpCount.flush(saved_indexes) return closed_indexes, saved_indexes
[docs] async def on_clock_tick(self): """ React on timer's tick and advance the window. """ target_ts = time.time() self.advance(target_ts)
[docs] def zeros(self): super().zeros() path = os.path.join(self.Path, "time_config.dat") self.TimeConfig = PersistentTimeConfig( path, self.Resolution, self.Columns, self.Start ) self.End = self.TimeConfig.get_end() path = os.path.join(self.Path, "warming_up_count.dat") self.WarmingUpCount = PersistentWarmingUpCount(path, self.Array.shape[0]) if self.TimeConfig.get_start() != self.Start: self.advance(self.Start)
[docs] def add_column(self): """ Adds new time column to the matrix and deletes the first one, simulating the time flow. `Start` and `End` attributes are advanced as well. """ self.TimeConfig.add_start(self.TimeConfig.get_resolution()) self.TimeConfig.add_end(self.TimeConfig.get_resolution()) if self.Array.shape[0] == 0: return column = np.zeros( ( self.Array.shape[0], 1, ) + self.Array.shape[2:], dtype=self.Array.dtype, ) array = np.zeros(self.Array.shape, dtype=self.DType) array[:] = self.Array[:] array = np.hstack((array, column)) array = np.delete(array, 0, axis=1) self.Array = np.memmap( self.ArrayPath, dtype=self.DType, mode="w+", shape=array.shape ) self.Array[:] = array[:] open_rows = list( set(range(0, self.Array.shape[0])) - self.ClosedRows.get_rows() ) self.WarmingUpCount.decrease(open_rows) self.Start = self.TimeConfig.get_start() self.End = self.TimeConfig.get_end()