bspump.analyzer¶
Analyzers for aggregation, statistics, and streaming analysis.
- class bspump.analyzer.Analyzer(app, pipeline, analyze_on_clock=False, id=None, config=None)[source]¶
Bases:
ProcessorDescription:
- __init__(app, pipeline, analyze_on_clock=False, id=None, config=None)[source]¶
Initializes the Parameters
Parameters
- evaluate(context, event)[source]¶
- The function which records the information from the event into the analyzed object.
Specific for each analyzer.
Parameters
context :
- eventany data type
information with timestamp.
- predicate(context, event)[source]¶
This function is meant to check, if the event is worth to process. If it is, should return True. specific for each analyzer, but default one always returns True.
Parameters
context :
- eventany data type
information with timestamp.
- Returns:
True
- class bspump.analyzer.TimeWindowAnalyzer(app, pipeline, matrix_id=None, dtype='float_', columns=15, analyze_on_clock=False, resolution=60, start_time=None, clock_driven=False, persistent=False, id=None, config=None)[source]¶
Bases:
AnalyzerThis is the analyzer for events with a temporal dimension (aka timestamp). Configurable sliding window records events within specified windows and implements functions to find the exact time slot. Timer periodically shifts the window by time window resolution, dropping previous events.
TimeWindowAnalyzer operates over the TimeWindowMatrix object.
resolution`is how many seconds fit in one time cell, default value is `60. start_time is a unix timestamp for time to start. Default value is None, which will be equivalent current time. clock_driven is a boolean parameter, specifying how the matrix should be advanced. If True, it advances on timer’s tick, else manually. Default value is True. matrix_id is an id of TimeWindowMatrix object alternatively passed, if not provided, the new matrix will be created with and ID derived from the Analyzer Id analyze_on_clock enables enables analysis by timer.
If the TimeWindowAnalyzer is clock_driven, the time should be periodically shifted (on_clock_tick()). The same function runs analysis, if it’s enabled.
- class bspump.analyzer.TimeDriftAnalyzer(app, pipeline, id=None, config=None)[source]¶
Bases:
AnalyzerThe analyzer, which shows how different is time of the stream from the current time. The output of the analysis is a metric with average time, median time, minimum time, maximum time and a standard deviation.
Default Config
- analyze_period5*60
Once per 5 minutes.
- history_size100
Keep maximum 100 array members.
- sparse_count1
Process every single event.
- timestamp_attr@timestamp
Timestamp attribute present in the event to perform the drift analyzer on.
- ConfigDefaults: dict = {'analyze_period': 300, 'history_size': 100, 'sparse_count': 1, 'timestamp_attr': '@timestamp'}¶
- __init__(app, pipeline, id=None, config=None)[source]¶
Description:
Parameters
- appApplication
Name of the Application.
- pipelinePipeline
Name of the Pipeline.
- idstr, default = None
ID
- configJSON, default = None
Configuration file with additional information.
- predicate(context, event)[source]¶
Description:
Parameters
context :
- eventany data type
information with timestamp
- Returns:
True
- class bspump.analyzer.SessionAnalyzer(app, pipeline, matrix_id=None, dtype='float_', analyze_on_clock=False, persistent=False, id=None, config=None)[source]¶
Bases:
AnalyzerThis is the analyzer for events with multiple different dimensions.
SessionAnalyzer operates over the SessionMatrix object. column_formats is an array, each element contains the letter from the table + number:
Name
Definition
‘b’
Byte
‘i’
Signed integer
‘u’
Unsigned integer
‘f’
Floating point
‘c’
Complex floating point
‘S’
String
‘U’
Unicode string
‘V’
Raw data
Example: ‘i8’ stands for int64. Important! However it is possible to use all these letters, it is recommended to use only ‘i’ for integers, ‘f’ for floats, ‘U’ for strings. Anything else might cause problems in serialization. It is possible to create a matrix with elements of specified format. The tuple with number of dimensions should stand before the letter. Example: ‘(6, 3)i8’ will create the matrix with n rows, 6 columns and 3 third dimensions with integer elements. column_names is an array with names of each column, with the same length as column_formats. matrix_id is an id of SessionMatrix object defined alternatively.
- __init__(app, pipeline, matrix_id=None, dtype='float_', analyze_on_clock=False, persistent=False, id=None, config=None)[source]¶
Description:
Parameters
- appApplication
Name of the Application.
- pipelinePipeline
Name of the Pipeline
matrix_id : str, default = None
dtype : str, default = ‘float_’
analyze_on_clock : bool, default = False
persistent : bool, default = False
id : str, default = None
- configJSON, default = None
configuration file with additional information.
- class bspump.analyzer.GeoAnalyzer(app, pipeline, matrix_id=None, dtype='float_', analyze_on_clock=False, bbox=None, resolution=5, persistent=False, id=None, config=None)[source]¶
Bases:
AnalyzerThis is the analyzer for events with geographical points dimension.
GeoAnalyzer operates over the GeoMatrix object. matrix_id is an id of GeoMatrix object defined alternatively.
Config Defaults
max_lat : 71.26 min_lat : 23.33 min_lon : -10.10 max_lon : 40.6
- __init__(app, pipeline, matrix_id=None, dtype='float_', analyze_on_clock=False, bbox=None, resolution=5, persistent=False, id=None, config=None)[source]¶
Description:
Parameters
- appApplication
Name of the Application.
- pipelinePipeline
Name of the Pipeline.
matrix_id : str, default = None
dtype : ??, default = “float_”
analyze_on_clock : bool, default = False
bbox : ??, default = None
resolution : int, default = 5
persistent : bool, default = False
id : str, default = None
- config = JSON, default = None
configuration file with additional information
- class bspump.analyzer.LatchAnalyzer(app, pipeline, query=True, analyze_on_clock=False, inclusive=False, id=None, config=None)[source]¶
Bases:
AnalyzerThe LatchAnalyzer accumulates events in the Latch variable. The Latch is a queue of maximum size specified in configuration - latch_max_size
If latch_max_size is 0 then Latch is not limited
If accumulated events exceeds latch_max_size then first event is dropped.
Latch can be filled based on the query variable (True by default). The query may be: 1. True, then all events will be added to Latch. 2. False, all events will be skipped. 3. Dictionary, following the mongo-like query syntaxis (see the rules in ContentFilter). In this case only events matched with this query will be added to the Latch.
The query can be injected with an API call to allow to control events in the latch.
- __init__(app, pipeline, query=True, analyze_on_clock=False, inclusive=False, id=None, config=None)[source]¶
Description:
Parameters
- appApplication
Name of the Application.
- pipelinePipeline
Name of the Pipeline.
- querybool, default = True
description
analyze_on_clock : bool, default = False
inclusive : bool, default = False
id : str, default = None
- configJSON, default = None
configuration file with additional information.
- class bspump.analyzer.AnalyzingSource(app, pipeline, matrix_id, id=None, config=None)[source]¶
Bases:
TriggerSourceThe AnalyzingSource is triggered source, which expects matrix_id as an input. Each trigger fire it calls analyze() function of the Matrix and expects a complex event as an output. A complex event can be array of events, aggregation of events (average, max, min etc.)
- class bspump.analyzer.ThresholdAnalyzer(app, pipeline, id=None, config=None)[source]¶
Bases:
TimeWindowAnalyzerThreshold Analyzer is based on TimeWindowAnalyzer and detects, if any monitored value exceeded or subceeded the preconfigured bounds. This analyzer can be used also only through configuration.
predicate method - Check whether event contains related attributes
evaluate method - Take event attributes and sorts them into the matrix
- analyze method - Check whether any value in the matrix is over the preconfigured bounds and if so, check the
occurrence of the symptom in the array and then possibly call the alarm. To analyze, whether the values are out of bounds, the ‘np.where()’ method is used. It pass the position of values out of bounds within the matrix to the alarm method. x = row position, y = column position To detect whether the symptom occurrence of the ‘values out of bounds’ is higher or equal, than number set in the configuration, an aggregation is used. Result of aggregation is the number of occurrences and indices of those values. It is then passed to the alarm method together with x and y arrays.
alarm method - Configurable, takes arguments.
…
- Threshold settings:
- exceedance >>> if ‘lower_bound’ is set to ‘-inf’ and upper_bound is not set to ‘inf’, then alarm is called when
any value in matrix exceed upper_bound
- subceedance >>> if lower_bound is not set to ‘-inf’ and upper_bound is set to ‘inf’, then alarm is called when
any value in matrix subceed lower_bound
- range >>> if lower_bound is not set to ‘-inf’ and upper_bound is not set to ‘inf’, then alarm is called when
any value in matrix is out of preconfigured range
- ConfigDefaults: dict = {'analyze_period': 300, 'anomaly_occurrence': 1, 'event_attribute': '', 'event_value': '', 'lower_bound': '-inf', 'upper_bound': 'inf'}¶
- __init__(app, pipeline, id=None, config=None)[source]¶
Description:
Parameters
- appApplication
Name of the Application
- pipelinePipeline
Name of the Pipeline
id : str, default = None
- configJSON, default = None
configuration file with additional information.
- predicate(context, event)[source]¶
Description:
Parameters
context :
- eventany data type
Information with timestamp.
- Returns:
True
Analyzer¶
Base class for analyzers.
import bspump
from bspump.analyzer import Analyzer
class MyAnalyzer(Analyzer):
def evaluate(self, context, event):
# Analyze the event
return event
TimeWindowAnalyzer¶
Aggregate events over fixed time windows.
from bspump.analyzer import TimeWindowAnalyzer
class HourlyAnalyzer(TimeWindowAnalyzer):
def __init__(self, app, pipeline, id=None, config=None):
super().__init__(
app, pipeline, id, config,
window_size=3600, # 1 hour
resolution=60 # 1 minute
)
def evaluate(self, context, event):
# Process each event
return event
def on_tick(self, tick):
# Called each resolution period
pass
SessionAnalyzer¶
Track user sessions.
from bspump.analyzer import SessionAnalyzer
class UserSessionAnalyzer(SessionAnalyzer):
def __init__(self, app, pipeline, id=None, config=None):
super().__init__(
app, pipeline, id, config,
session_timeout=1800 # 30 minutes
)
def evaluate(self, context, event):
user_id = event.get("user_id")
session = self.get_session(user_id)
if session is None:
session = self.create_session(user_id, {})
return event
def on_session_end(self, session_id, session_data):
# Called when session times out
pass
GeoAnalyzer¶
Analyze geographic data.
class GeoAnalyzer(bspump.Analyzer):
def __init__(self, app, pipeline, id=None, config=None):
super().__init__(app, pipeline, id, config)
self.matrix = Matrix(app, "GeoMatrix", (360, 180))
def evaluate(self, context, event):
lat = int(event.get("lat", 0) + 90)
lon = int(event.get("lon", 0) + 180)
self.matrix[lat, lon] += 1
return event
Aggregation Example¶
class CountingAnalyzer(bspump.Analyzer):
def __init__(self, app, pipeline, id=None, config=None):
super().__init__(app, pipeline, id, config)
self.counts = {}
def evaluate(self, context, event):
key = event.get("type")
self.counts[key] = self.counts.get(key, 0) + 1
event["running_count"] = self.counts[key]
return event
Statistics Example¶
class StatisticsAnalyzer(bspump.Analyzer):
def __init__(self, app, pipeline, id=None, config=None):
super().__init__(app, pipeline, id, config)
self.sum = 0
self.count = 0
def evaluate(self, context, event):
value = event.get("value", 0)
self.sum += value
self.count += 1
event["mean"] = self.sum / self.count
return event
Using with Pipeline¶
class AnalysisPipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.kafka.KafkaSource(app, self, connection="KafkaConnection"),
TimeWindowAnalyzer(app, self),
SessionAnalyzer(app, self),
bspump.kafka.KafkaSink(app, self, connection="KafkaConnection"),
)