bspump.analyzer

Analyzers for aggregation, statistics, and streaming analysis.

class bspump.analyzer.Analyzer(app, pipeline, analyze_on_clock=False, id=None, config=None)[source]

Bases: Processor

Description:

ConfigDefaults: dict = {'analyze_period': 60}
__init__(app, pipeline, analyze_on_clock=False, id=None, config=None)[source]

Initializes the Parameters

Parameters

appobject

Application object.

pipelinePipeline

Name of the Pipeline.

idstr, default=None,

ID of the class of config.

configJSON, or other compatible formats, default=None

Configuration file.

start_timer(event_type)[source]

Description:

analyze()[source]

Description:

evaluate(context, event)[source]
The function which records the information from the event into the analyzed object.

Specific for each analyzer.

Parameters

context :

eventany data type

information with timestamp.

predicate(context, event)[source]

This function is meant to check, if the event is worth to process. If it is, should return True. specific for each analyzer, but default one always returns True.

Parameters

context :

eventany data type

information with timestamp.

Returns:

True

process(context, event)[source]
The event passes through process(context, event) unchanged.

Meanwhile it is evaluated.

Parameters

context :

eventany data type

information with timestamp.

Returns:

event

async on_clock_tick()[source]

Run analysis every tick.

class bspump.analyzer.TimeWindowAnalyzer(app, pipeline, matrix_id=None, dtype='float_', columns=15, analyze_on_clock=False, resolution=60, start_time=None, clock_driven=False, persistent=False, id=None, config=None)[source]

Bases: Analyzer

This is the analyzer for events with a temporal dimension (aka timestamp). Configurable sliding window records events within specified windows and implements functions to find the exact time slot. Timer periodically shifts the window by time window resolution, dropping previous events.

TimeWindowAnalyzer operates over the TimeWindowMatrix object.

resolution`is how many seconds fit in one time cell, default value is `60. start_time is a unix timestamp for time to start. Default value is None, which will be equivalent current time. clock_driven is a boolean parameter, specifying how the matrix should be advanced. If True, it advances on timer’s tick, else manually. Default value is True. matrix_id is an id of TimeWindowMatrix object alternatively passed, if not provided, the new matrix will be created with and ID derived from the Analyzer Id analyze_on_clock enables enables analysis by timer.

If the TimeWindowAnalyzer is clock_driven, the time should be periodically shifted (on_clock_tick()). The same function runs analysis, if it’s enabled.

__init__(app, pipeline, matrix_id=None, dtype='float_', columns=15, analyze_on_clock=False, resolution=60, start_time=None, clock_driven=False, persistent=False, id=None, config=None)[source]

Description:

class bspump.analyzer.TimeDriftAnalyzer(app, pipeline, id=None, config=None)[source]

Bases: Analyzer

The analyzer, which shows how different is time of the stream from the current time. The output of the analysis is a metric with average time, median time, minimum time, maximum time and a standard deviation.

Default Config

analyze_period5*60

Once per 5 minutes.

history_size100

Keep maximum 100 array members.

sparse_count1

Process every single event.

timestamp_attr@timestamp

Timestamp attribute present in the event to perform the drift analyzer on.

ConfigDefaults: dict = {'analyze_period': 300, 'history_size': 100, 'sparse_count': 1, 'timestamp_attr': '@timestamp'}
__init__(app, pipeline, id=None, config=None)[source]

Description:

Parameters

appApplication

Name of the Application.

pipelinePipeline

Name of the Pipeline.

idstr, default = None

ID

configJSON, default = None

Configuration file with additional information.

predicate(context, event)[source]

Description:

Parameters

context :

eventany data type

information with timestamp

Returns:

True

get_diff(event_timestamp)[source]

Returns the time difference of current event.

Parameters

event_timestamp : ?

Returns:

diff

evaluate(context, event)[source]

Description:

analyze()[source]

Description:

class bspump.analyzer.SessionAnalyzer(app, pipeline, matrix_id=None, dtype='float_', analyze_on_clock=False, persistent=False, id=None, config=None)[source]

Bases: Analyzer

This is the analyzer for events with multiple different dimensions.

SessionAnalyzer operates over the SessionMatrix object. column_formats is an array, each element contains the letter from the table + number:

Name

Definition

‘b’

Byte

‘i’

Signed integer

‘u’

Unsigned integer

‘f’

Floating point

‘c’

Complex floating point

‘S’

String

‘U’

Unicode string

‘V’

Raw data

Example: ‘i8’ stands for int64. Important! However it is possible to use all these letters, it is recommended to use only ‘i’ for integers, ‘f’ for floats, ‘U’ for strings. Anything else might cause problems in serialization. It is possible to create a matrix with elements of specified format. The tuple with number of dimensions should stand before the letter. Example: ‘(6, 3)i8’ will create the matrix with n rows, 6 columns and 3 third dimensions with integer elements. column_names is an array with names of each column, with the same length as column_formats. matrix_id is an id of SessionMatrix object defined alternatively.

__init__(app, pipeline, matrix_id=None, dtype='float_', analyze_on_clock=False, persistent=False, id=None, config=None)[source]

Description:

Parameters

appApplication

Name of the Application.

pipelinePipeline

Name of the Pipeline

matrix_id : str, default = None

dtype : str, default = ‘float_

analyze_on_clock : bool, default = False

persistent : bool, default = False

id : str, default = None

configJSON, default = None

configuration file with additional information.

class bspump.analyzer.GeoAnalyzer(app, pipeline, matrix_id=None, dtype='float_', analyze_on_clock=False, bbox=None, resolution=5, persistent=False, id=None, config=None)[source]

Bases: Analyzer

This is the analyzer for events with geographical points dimension.

GeoAnalyzer operates over the GeoMatrix object. matrix_id is an id of GeoMatrix object defined alternatively.

Config Defaults

max_lat : 71.26 min_lat : 23.33 min_lon : -10.10 max_lon : 40.6

ConfigDefaults: dict = {'max_lat': 71.26, 'max_lon': 40.6, 'min_lat': 23.33, 'min_lon': -10.1}
__init__(app, pipeline, matrix_id=None, dtype='float_', analyze_on_clock=False, bbox=None, resolution=5, persistent=False, id=None, config=None)[source]

Description:

Parameters

appApplication

Name of the Application.

pipelinePipeline

Name of the Pipeline.

matrix_id : str, default = None

dtype : ??, default = “float_

analyze_on_clock : bool, default = False

bbox : ??, default = None

resolution : int, default = 5

persistent : bool, default = False

id : str, default = None

config = JSON, default = None

configuration file with additional information

class bspump.analyzer.LatchAnalyzer(app, pipeline, query=True, analyze_on_clock=False, inclusive=False, id=None, config=None)[source]

Bases: Analyzer

The LatchAnalyzer accumulates events in the Latch variable. The Latch is a queue of maximum size specified in configuration - latch_max_size

If latch_max_size is 0 then Latch is not limited

If accumulated events exceeds latch_max_size then first event is dropped.

Latch can be filled based on the query variable (True by default). The query may be: 1. True, then all events will be added to Latch. 2. False, all events will be skipped. 3. Dictionary, following the mongo-like query syntaxis (see the rules in ContentFilter). In this case only events matched with this query will be added to the Latch.

The query can be injected with an API call to allow to control events in the latch.

ConfigDefaults: dict = {'latch_max_size': 50}
__init__(app, pipeline, query=True, analyze_on_clock=False, inclusive=False, id=None, config=None)[source]

Description:

Parameters

appApplication

Name of the Application.

pipelinePipeline

Name of the Pipeline.

querybool, default = True

description

analyze_on_clock : bool, default = False

inclusive : bool, default = False

id : str, default = None

configJSON, default = None

configuration file with additional information.

process(context, event)[source]

Description:

Parameters

context :

eventany data type

information with timestamp.

Returns:

event

class bspump.analyzer.AnalyzingSource(app, pipeline, matrix_id, id=None, config=None)[source]

Bases: TriggerSource

The AnalyzingSource is triggered source, which expects matrix_id as an input. Each trigger fire it calls analyze() function of the Matrix and expects a complex event as an output. A complex event can be array of events, aggregation of events (average, max, min etc.)

__init__(app, pipeline, matrix_id, id=None, config=None)[source]

Description:

Parameters

appApplication

Name of the Application

pipelinePipeline

Name of the Pipeline

matrix_idstr

ID of the matrix.

idstr, default = None

ID

configJSON, default = None

configuration file containing additional information.

async cycle()[source]

Description:

class bspump.analyzer.ThresholdAnalyzer(app, pipeline, id=None, config=None)[source]

Bases: TimeWindowAnalyzer

Threshold Analyzer is based on TimeWindowAnalyzer and detects, if any monitored value exceeded or subceeded the preconfigured bounds. This analyzer can be used also only through configuration.

predicate method - Check whether event contains related attributes

evaluate method - Take event attributes and sorts them into the matrix

analyze method - Check whether any value in the matrix is over the preconfigured bounds and if so, check the

occurrence of the symptom in the array and then possibly call the alarm. To analyze, whether the values are out of bounds, the ‘np.where()’ method is used. It pass the position of values out of bounds within the matrix to the alarm method. x = row position, y = column position To detect whether the symptom occurrence of the ‘values out of bounds’ is higher or equal, than number set in the configuration, an aggregation is used. Result of aggregation is the number of occurrences and indices of those values. It is then passed to the alarm method together with x and y arrays.

alarm method - Configurable, takes arguments.

Threshold settings:
exceedance >>> if ‘lower_bound’ is set to ‘-inf’ and upper_bound is not set to ‘inf’, then alarm is called when

any value in matrix exceed upper_bound

subceedance >>> if lower_bound is not set to ‘-inf’ and upper_bound is set to ‘inf’, then alarm is called when

any value in matrix subceed lower_bound

range >>> if lower_bound is not set to ‘-inf’ and upper_bound is not set to ‘inf’, then alarm is called when

any value in matrix is out of preconfigured range

ConfigDefaults: dict = {'analyze_period': 300, 'anomaly_occurrence': 1, 'event_attribute': '', 'event_value': '', 'lower_bound': '-inf', 'upper_bound': 'inf'}
__init__(app, pipeline, id=None, config=None)[source]

Description:

Parameters

appApplication

Name of the Application

pipelinePipeline

Name of the Pipeline

id : str, default = None

configJSON, default = None

configuration file with additional information.

predicate(context, event)[source]

Description:

Parameters

context :

eventany data type

Information with timestamp.

Returns:

True

evaluate(context, event)[source]

Description:

Parameters

context :

eventany data type

Information with timestamp.

analyze()[source]

Description:

alarm(*args)[source]

Description:

Analyzer

Base class for analyzers.

import bspump
from bspump.analyzer import Analyzer

class MyAnalyzer(Analyzer):
    def evaluate(self, context, event):
        # Analyze the event
        return event

TimeWindowAnalyzer

Aggregate events over fixed time windows.

from bspump.analyzer import TimeWindowAnalyzer

class HourlyAnalyzer(TimeWindowAnalyzer):
    def __init__(self, app, pipeline, id=None, config=None):
        super().__init__(
            app, pipeline, id, config,
            window_size=3600,  # 1 hour
            resolution=60      # 1 minute
        )

    def evaluate(self, context, event):
        # Process each event
        return event

    def on_tick(self, tick):
        # Called each resolution period
        pass

SessionAnalyzer

Track user sessions.

from bspump.analyzer import SessionAnalyzer

class UserSessionAnalyzer(SessionAnalyzer):
    def __init__(self, app, pipeline, id=None, config=None):
        super().__init__(
            app, pipeline, id, config,
            session_timeout=1800  # 30 minutes
        )

    def evaluate(self, context, event):
        user_id = event.get("user_id")
        session = self.get_session(user_id)
        if session is None:
            session = self.create_session(user_id, {})
        return event

    def on_session_end(self, session_id, session_data):
        # Called when session times out
        pass

GeoAnalyzer

Analyze geographic data.

class GeoAnalyzer(bspump.Analyzer):
    def __init__(self, app, pipeline, id=None, config=None):
        super().__init__(app, pipeline, id, config)
        self.matrix = Matrix(app, "GeoMatrix", (360, 180))

    def evaluate(self, context, event):
        lat = int(event.get("lat", 0) + 90)
        lon = int(event.get("lon", 0) + 180)
        self.matrix[lat, lon] += 1
        return event

Aggregation Example

class CountingAnalyzer(bspump.Analyzer):
    def __init__(self, app, pipeline, id=None, config=None):
        super().__init__(app, pipeline, id, config)
        self.counts = {}

    def evaluate(self, context, event):
        key = event.get("type")
        self.counts[key] = self.counts.get(key, 0) + 1
        event["running_count"] = self.counts[key]
        return event

Statistics Example

class StatisticsAnalyzer(bspump.Analyzer):
    def __init__(self, app, pipeline, id=None, config=None):
        super().__init__(app, pipeline, id, config)
        self.sum = 0
        self.count = 0

    def evaluate(self, context, event):
        value = event.get("value", 0)
        self.sum += value
        self.count += 1
        event["mean"] = self.sum / self.count
        return event

Using with Pipeline

class AnalysisPipeline(bspump.Pipeline):
    def __init__(self, app, pipeline_id):
        super().__init__(app, pipeline_id)
        self.build(
            bspump.kafka.KafkaSource(app, self, connection="KafkaConnection"),
            TimeWindowAnalyzer(app, self),
            SessionAnalyzer(app, self),
            bspump.kafka.KafkaSink(app, self, connection="KafkaConnection"),
        )