import logging
import numpy as np
from .analyzer import Analyzer
###
L = logging.getLogger(__name__)
###
[docs]
class TimeDriftAnalyzer(Analyzer):
"""
The analyzer, which shows how different is time of the stream from the current time.
The output of the analysis is a metric with average time, median time, minimum time,
maximum time and a standard deviation.
**Default Config**
analyze_period : 5*60
Once per 5 minutes.
history_size : 100
Keep maximum 100 array members.
sparse_count : 1
Process every single event.
timestamp_attr : @timestamp
Timestamp attribute present in the event to perform the drift analyzer on.
"""
ConfigDefaults = {
"analyze_period": 5 * 60, # once per 5 minutes
"history_size": 100, # keep maximum 100 array members
"sparse_count": 1, # process every single event
"timestamp_attr": "@timestamp", # timestamp attribute present in the event to perform the drift analyzer on
}
[docs]
def __init__(self, app, pipeline, id=None, config=None):
"""
Description:
**Parameters**
app : Application
Name of the Application.
pipeline : Pipeline
Name of the Pipeline.
id : str, default = None
ID
config : JSON, default = None
Configuration file with additional information.
"""
# def __init__(self, app, pipeline, analyze_on_clock=False, analyze_period=None, id=None, config=None):
super().__init__(app, pipeline, analyze_on_clock=True, id=id, config=config)
self.History = []
self.HistorySize = int(self.Config["history_size"])
metrics_service = app.get_service("asab.MetricsService")
self.DifferenceCounter = metrics_service.create_counter(
"timedrift.difference", tags={}, init_values={"positive": 0, "negative": 0}
)
self.Gauge = metrics_service.create_gauge(
"timedrift",
tags={
"pipeline": pipeline.Id,
},
init_values={
"avg": 0.0,
"median": 0.0,
"stddev": 0.0,
"min": 0.0,
"max": 0.0,
},
)
self.EventCount = 0
self.SparseCount = int(self.Config["sparse_count"])
self.TimestampAttr = self.Config["timestamp_attr"]
self.App = app
[docs]
def predicate(self, context, event):
"""
Description:
**Parameters**
context :
event : any data type
information with timestamp
:return: True
"""
if self.TimestampAttr not in event:
return False
self.EventCount += 1
if (self.EventCount % self.SparseCount) != 0:
return False
return True
[docs]
def get_diff(self, event_timestamp):
"""
Returns the time difference of current event.
**Parameters**
event_timestamp : ?
:return: diff
"""
diff = self.App.time() - event_timestamp
return diff
[docs]
def evaluate(self, context, event):
"""
Description:
"""
timestamp = event[self.TimestampAttr]
diff = self.get_diff(timestamp)
if diff < 0:
self.DifferenceCounter.add("negative", 1)
return
self.DifferenceCounter.add("positive", 1)
self.History.append(diff)
while len(self.History) > self.HistorySize:
self.History.pop(0)
[docs]
def analyze(self):
"""
Description:
"""
# in seconds
if len(self.History) > 0:
avg = np.mean(self.History)
median = np.median(self.History)
stddev = np.std(self.History)
min_v = np.min(self.History)
max_v = np.max(self.History)
self.History = []
else:
avg = median = stddev = min_v = max_v = 0.0
self.Gauge.set("avg", avg)
self.Gauge.set("median", median)
self.Gauge.set("stddev", stddev)
self.Gauge.set("min", min_v)
self.Gauge.set("max", max_v)