Source code for bspump.analyzer.analyzer
import logging
import time
import bspump.asab as asab
from ..abc.processor import Processor
###
L = logging.getLogger(__name__)
###
[docs]
class Analyzer(Processor):
"""
Description:
"""
ConfigDefaults = {
"analyze_period": 60, # every 60 seconds
}
[docs]
def __init__(self, app, pipeline, analyze_on_clock=False, id=None, config=None):
super().__init__(app, pipeline, id=id, config=config)
self.AnalyzePeriod = float(self.Config["analyze_period"])
self.AnalyzeOnClock = analyze_on_clock
if analyze_on_clock:
self.Timer = asab.Timer(app, self.on_clock_tick, autorestart=True)
app.PubSub.subscribe("Application.run!", self.start_timer)
else:
self.Timer = None
# Implementation interface
[docs]
def start_timer(self, event_type):
"""
Description:
"""
self.Timer.start(self.AnalyzePeriod)
[docs]
def analyze(self):
"""
Description:
"""
pass
[docs]
def evaluate(self, context, event):
"""
The function which records the information from the event into the analyzed object.
Specific for each analyzer.
**Parameters**
context :
event : any data type
information with timestamp.
"""
pass
[docs]
def predicate(self, context, event):
"""
This function is meant to check, if the event is worth to process.
If it is, should return True.
specific for each analyzer, but default one always returns True.
**Parameters**
context :
event : any data type
information with timestamp.
:return: True
"""
return True
[docs]
def process(self, context, event):
"""
The event passes through `process(context, event)` unchanged.
Meanwhile it is evaluated.
**Parameters**
context :
event : any data type
information with timestamp.
:return: event
"""
if self.predicate(context, event):
self.evaluate(context, event)
return event
[docs]
async def on_clock_tick(self):
"""
Run analysis every tick.
"""
t0 = time.perf_counter()
self.analyze()
self.Pipeline.ProfilerCounter["analyzer_" + self.Id].add(
"duration", time.perf_counter() - t0
)
self.Pipeline.ProfilerCounter["analyzer_" + self.Id].add("run", 1)