Source code for bspump.common.aggregator

from abc import ABC, abstractmethod

from bspump import Generator


[docs] class AggregationStrategy(ABC): """ Aggregation Strategy | """
[docs] @abstractmethod def append(self, context, event): """ Appends **Parameters** context : event : """ raise NotImplementedError()
[docs] @abstractmethod def flush(self): """ Flushes | """ raise NotImplementedError()
[docs] @abstractmethod def is_empty(self) -> bool: """ Description: | """ raise NotImplementedError()
[docs] class ListAggregationStrategy(AggregationStrategy): """ Description: ... test | """
[docs] def __init__(self) -> None: """ Description: | """ super().__init__() self.AggregatedEvent = []
[docs] def append(self, context, event): """ Description: **Parameters** context : event : """ self.AggregatedEvent.append((context, event))
[docs] def flush(self): """ Description: :return: result | """ result = self.AggregatedEvent self.AggregatedEvent = [] return result
[docs] def is_empty(self) -> bool: """ Description: :return: Aggregated Event | """ return len(self.AggregatedEvent) == 0
[docs] class ListEventAggregationStrategy(AggregationStrategy): """ Description: | """
[docs] def __init__(self) -> None: """ Description: | """ super().__init__() self.AggregatedEvent = []
[docs] def append(self, context, event): """ Description: **Parameters** context : event : """ self.AggregatedEvent.append(event)
[docs] def flush(self): """ Description: :return: result | """ result = self.AggregatedEvent self.AggregatedEvent = [] return result
[docs] def is_empty(self) -> bool: """ Description: :return: Aggregated event | """ return len(self.AggregatedEvent) == 0
[docs] class StringAggregationStrategy(AggregationStrategy): """ Description: | """
[docs] def __init__(self, delimiter="\n") -> None: """ Description: """ super().__init__() self.Delimiter = delimiter self.AggregatedEvent = ""
[docs] def append(self, context, event): """ Description: **Parameters** context : event : Data with time stamp stored in any data type usually is in JSON. You can specify an event that is passed to the method. """ self.AggregatedEvent += str(event) + self.Delimiter
[docs] def flush(self): """ Description: :return: result | """ result = self.AggregatedEvent[ 0 : -len(self.Delimiter) ] # Remove trailing delimiter self.AggregatedEvent = "" return result
[docs] def is_empty(self) -> bool: """ Description: :return: Aggregated event | """ return len(self.AggregatedEvent) == 0
[docs] class Aggregator(Generator): """ Description: | """ ConfigDefaults = { "completion_size": 10, "completion_timeout": 0, # 0 means no timeout, "completion_interval": 0, # 0 means no completion interval }
[docs] def __init__( self, app, pipeline, aggregation_strategy: AggregationStrategy = ListAggregationStrategy(), id=None, config=None, ): """ Description: | """ super().__init__(app, pipeline, id, config) self.CompletionSize = int(self.Config["completion_size"]) self.CompletionTimeout = int(self.Config["completion_timeout"]) self.CompletionInterval = int(self.Config["completion_interval"]) if self.CompletionTimeout > 0 and self.CompletionInterval > 0: raise ValueError( "completion_timeout and completion_interval cannot be combined" ) self.AggregationStrategy = aggregation_strategy self.CurrentSize = 0 self.LastFlushTime = self.App.time() self.LastPeriodicFlushTime = self.App.time() app.PubSub.subscribe("Application.stop!", self._on_application_stop) if self.CompletionTimeout > 0: app.PubSub.subscribe("Application.tick!", self._check_timeout) if self.CompletionInterval > 0: app.PubSub.subscribe("Application.tick!", self._check_periodic_flush)
def _check_timeout(self, _): if ( self.CurrentSize > 0 and self.App.time() - self.LastFlushTime > self.CompletionTimeout ): self.flush() def _check_periodic_flush(self, _): if ( self.CurrentSize > 0 and self.App.time() - self.LastPeriodicFlushTime > self.CompletionInterval ): self.LastPeriodicFlushTime = self.App.time() self.flush() def _on_application_stop(self, _, __): self.flush()
[docs] def flush(self): """ Description: :return: ?? | """ if self.AggregationStrategy.is_empty(): return aggregated = self.AggregationStrategy.flush() self.Pipeline.ensure_future( self.generate({}, aggregated, self.PipelineDepth + 1) )
[docs] def process(self, context, event): """ Description: **Parameters** context : event : """ self.AggregationStrategy.append(context, event) self.CurrentSize += 1 if self.CurrentSize >= self.CompletionSize: self.CurrentSize = 0 self.flush() return None
[docs] async def generate(self, context, aggregated_event, depth): """ Description: **Parameters** context : aggregated_event : depth : """ self.LastFlushTime = self.App.time() await self.Pipeline.inject(context, aggregated_event, depth)