Source code for bspump.analyzer.latch
import collections
import logging
import mongoquery
from .analyzer import Analyzer
###
L = logging.getLogger(__name__)
###
[docs]
class LatchAnalyzer(Analyzer):
"""
The `LatchAnalyzer` accumulates events in the `Latch` variable.
The `Latch` is a queue of maximum size specified in configuration - `latch_max_size`
If `latch_max_size` is 0 then `Latch` is not limited
If accumulated events exceeds `latch_max_size` then first event is dropped.
`Latch` can be filled based on the `query` variable (True by default).
The query may be:
1. `True`, then all events will be added to `Latch`.
2. `False`, all events will be skipped.
3. Dictionary, following the mongo-like query syntaxis (see the rules in `ContentFilter`).
In this case only events matched with this query will be added to the `Latch`.
The query can be injected with an API call to allow to control events in the latch.
"""
ConfigDefaults = {
"latch_max_size": 50, # 0 means unlimited size
}
[docs]
def __init__(
self,
app,
pipeline,
query=True,
analyze_on_clock=False,
inclusive=False,
id=None,
config=None,
):
"""
Description:
**Parameters**
app : Application
Name of the Application.
pipeline : Pipeline
Name of the Pipeline.
query : bool, default = True
description
analyze_on_clock : bool, default = False
inclusive : bool, default = False
id : str, default = None
config : JSON, default = None
configuration file with additional information.
"""
super().__init__(
app, pipeline, analyze_on_clock=analyze_on_clock, id=id, config=config
)
max_size = int(self.Config.get("latch_max_size"))
if max_size == 0:
self.Latch = collections.deque()
else:
self.Latch = collections.deque(maxlen=max_size)
# Check if the query is correctly implemented
if isinstance(query, bool):
self.Query = query
else:
try:
self.Query = mongoquery.Query(query)
self.Query.match({})
except mongoquery.QueryError:
L.warning("Incorrect query")
raise
self.Inclusive = inclusive
[docs]
def process(self, context, event):
"""
Description:
**Parameters**
context :
event : any data type
information with timestamp.
:return: event
"""
if self.Query is True:
self.Latch.append(event)
elif self.Query is False:
return event
elif self.Query.match(event) != self.Inclusive:
self.Latch.append(event)
return event