Source code for bspump.common.aggregator
from abc import ABC, abstractmethod
from bspump import Generator
[docs]
class AggregationStrategy(ABC):
"""
Aggregation Strategy
|
"""
[docs]
@abstractmethod
def append(self, context, event):
"""
Appends
**Parameters**
context :
event :
"""
raise NotImplementedError()
[docs]
@abstractmethod
def flush(self):
"""
Flushes
|
"""
raise NotImplementedError()
[docs]
@abstractmethod
def is_empty(self) -> bool:
"""
Description:
|
"""
raise NotImplementedError()
[docs]
class ListAggregationStrategy(AggregationStrategy):
"""
Description: ... test
|
"""
[docs]
def __init__(self) -> None:
"""
Description:
|
"""
super().__init__()
self.AggregatedEvent = []
[docs]
def append(self, context, event):
"""
Description:
**Parameters**
context :
event :
"""
self.AggregatedEvent.append((context, event))
[docs]
def flush(self):
"""
Description:
:return: result
|
"""
result = self.AggregatedEvent
self.AggregatedEvent = []
return result
[docs]
def is_empty(self) -> bool:
"""
Description:
:return: Aggregated Event
|
"""
return len(self.AggregatedEvent) == 0
[docs]
class ListEventAggregationStrategy(AggregationStrategy):
"""
Description:
|
"""
[docs]
def __init__(self) -> None:
"""
Description:
|
"""
super().__init__()
self.AggregatedEvent = []
[docs]
def append(self, context, event):
"""
Description:
**Parameters**
context :
event :
"""
self.AggregatedEvent.append(event)
[docs]
def flush(self):
"""
Description:
:return: result
|
"""
result = self.AggregatedEvent
self.AggregatedEvent = []
return result
[docs]
def is_empty(self) -> bool:
"""
Description:
:return: Aggregated event
|
"""
return len(self.AggregatedEvent) == 0
[docs]
class StringAggregationStrategy(AggregationStrategy):
"""
Description:
|
"""
[docs]
def __init__(self, delimiter="\n") -> None:
"""
Description:
"""
super().__init__()
self.Delimiter = delimiter
self.AggregatedEvent = ""
[docs]
def append(self, context, event):
"""
Description:
**Parameters**
context :
event : Data with time stamp stored in any data type usually is in JSON.
You can specify an event that is passed to the method.
"""
self.AggregatedEvent += str(event) + self.Delimiter
[docs]
def flush(self):
"""
Description:
:return: result
|
"""
result = self.AggregatedEvent[
0 : -len(self.Delimiter)
] # Remove trailing delimiter
self.AggregatedEvent = ""
return result
[docs]
def is_empty(self) -> bool:
"""
Description:
:return: Aggregated event
|
"""
return len(self.AggregatedEvent) == 0
[docs]
class Aggregator(Generator):
"""
Description:
|
"""
ConfigDefaults = {
"completion_size": 10,
"completion_timeout": 0, # 0 means no timeout,
"completion_interval": 0, # 0 means no completion interval
}
[docs]
def __init__(
self,
app,
pipeline,
aggregation_strategy: AggregationStrategy = ListAggregationStrategy(),
id=None,
config=None,
):
"""
Description:
|
"""
super().__init__(app, pipeline, id, config)
self.CompletionSize = int(self.Config["completion_size"])
self.CompletionTimeout = int(self.Config["completion_timeout"])
self.CompletionInterval = int(self.Config["completion_interval"])
if self.CompletionTimeout > 0 and self.CompletionInterval > 0:
raise ValueError(
"completion_timeout and completion_interval cannot be combined"
)
self.AggregationStrategy = aggregation_strategy
self.CurrentSize = 0
self.LastFlushTime = self.App.time()
self.LastPeriodicFlushTime = self.App.time()
app.PubSub.subscribe("Application.stop!", self._on_application_stop)
if self.CompletionTimeout > 0:
app.PubSub.subscribe("Application.tick!", self._check_timeout)
if self.CompletionInterval > 0:
app.PubSub.subscribe("Application.tick!", self._check_periodic_flush)
def _check_timeout(self, _):
if (
self.CurrentSize > 0
and self.App.time() - self.LastFlushTime > self.CompletionTimeout
):
self.flush()
def _check_periodic_flush(self, _):
if (
self.CurrentSize > 0
and self.App.time() - self.LastPeriodicFlushTime > self.CompletionInterval
):
self.LastPeriodicFlushTime = self.App.time()
self.flush()
def _on_application_stop(self, _, __):
self.flush()
[docs]
def flush(self):
"""
Description:
:return: ??
|
"""
if self.AggregationStrategy.is_empty():
return
aggregated = self.AggregationStrategy.flush()
self.Pipeline.ensure_future(
self.generate({}, aggregated, self.PipelineDepth + 1)
)
[docs]
def process(self, context, event):
"""
Description:
**Parameters**
context :
event :
"""
self.AggregationStrategy.append(context, event)
self.CurrentSize += 1
if self.CurrentSize >= self.CompletionSize:
self.CurrentSize = 0
self.flush()
return None
[docs]
async def generate(self, context, aggregated_event, depth):
"""
Description:
**Parameters**
context :
aggregated_event :
depth :
"""
self.LastFlushTime = self.App.time()
await self.Pipeline.inject(context, aggregated_event, depth)