Source code for bspump.abc.processor

import bspump.asab as asab


[docs] class ProcessorBase(asab.Configurable): """ Description: | """
[docs] def __init__(self, app, pipeline, id=None, config=None): """ Initializes the Parameters **Parameters** app : object Application object. pipeline : :meth:`Pipeline <bspump.Pipeline()>` Name of the :meth:`Pipeline <bspump.Pipeline()>`. id : str, default=None, ID of the class of config. config : JSON, or other compatible formats, default=None Configuration file. """ super().__init__( "pipeline:{}:{}".format( pipeline.Id, id if id is not None else self.__class__.__name__ ), config=config, ) self.App = app self.Loop = app.Loop self.Id = id if id is not None else self.__class__.__name__ self.Pipeline = pipeline self.EventCount = 0
[docs] def time(self): """ Accurate representation of a time in the :meth:`Pipeline <bspump.Pipeline()>`. :return: App.time() """ return self.App.time()
[docs] @classmethod def construct(cls, app, pipeline, definition: dict): """ Can construct a :meth:`processor <bspump.Processor()>` based on a specific definition. For example, a JSON file. **Parameters** app : Application Name of the `Application <https://asab.readthedocs.io/en/latest/asab/application.html#>_`. pipeline : str Name of the :meth:`Pipeline <bspump.Pipeline()>`. definition : dict Set of instructions based on which :meth:`processor <bspump.Processor()>` can be constructed. :return: cls(app, pipeline, id=newid, config=config) | """ newid = definition.get("id") config = definition.get("config") args = definition.get("args") if args is not None: return cls(app, pipeline, id=newid, config=config, **args) else: return cls(app, pipeline, id=newid, config=config)
[docs] def process(self, context, event): """ Can be implemented to return event based on a given logic. **Parameters** context : Additional information passed to the method. event : Data with time stamp stored in any data type, usually it is in JSON. You can specify an event that is passed to the method. """ raise NotImplementedError()
[docs] def locate_address(self): """ Returns an ID of a :meth:`processor <bspump.Processor()>` and a :meth:`Pipeline <bspump.Pipeline()>`. :return: ID of the :meth:`Pipeline <bspump.Pipeline()>` and self.Id. | """ return "{}.{}".format(self.Pipeline.Id, self.Id)
[docs] def rest_get(self): """ Description: :return: """ return { "Id": self.Id, "Class": self.__class__.__name__, "PipelineId": self.Pipeline.Id, }
def __repr__(self): return "%s(%r)" % (self.__class__.__name__, self.locate_address())
[docs] class Processor(ProcessorBase): """ Inherits from ProcessorBase. | """ pass