Source code for bspump.service

import logging
import asyncio
from bspump.asab import Service


L = logging.getLogger(__file__)


[docs] class BSPumpService(Service): """ Service registry based on Service object. Read more in ASAB documentation `Service <https://asab.readthedocs.io/en/latest/asab/service.html`_. """
[docs] def __init__(self, app, service_name="bspump.PumpService"): """ Initializes parameters passed to the Service class. **Parameters** app : Application Name of the `Application <https://asab.readthedocs.io/en/latest/asab/application.html>`_. service_name : str, Service name string variable containing ""bspump.PumpService | """ super().__init__(app, service_name) self.Pipelines = dict() self.Connections = dict() self.Lookups = dict() self.LookupFactories = [] self.Matrixes = dict() self.App = app
[docs] def locate(self, address): """ Locates a pipeline, source or processor based on the addressed parameter. **Parameters** address : str, ID Address of an pipeline component. 1. To locate a Pipeline, use the pipeline ID. 2. To locate a Processor or a Sink, use the pipeline ID and the component ID separated by a dot, like 'pipelineId.componentId'. 3. To locate a Source, add '*' before ID of the source, like 'pipeline.*SourceId'. **Returns** Pipeline, Source or Processor object if it is found in the components list, otherwise None. **Usage** >>> pipeline = service.locate("SuperCoolPipeline") >>> pipeline.Id SuperCoolPipeline >>> processor = service.locate("SuperCoolPipeline.PPrintProcessor") >>> processor.Id PPrintProcessor >>> source = service.locate("SuperCoolPipeline.*MySource") >>> source.Id MySource """ if "." in address: p, t = address.split(".", 1) else: p = address t = None pipeline = self.Pipelines.get(p) if pipeline is None: return None elif t is None: return pipeline if t[:1] == "*": for source in pipeline.Sources: if source.Id == t[1:]: return source else: for processor in pipeline.iter_processors(): if processor.Id == t: return processor return None
# Pipelines
[docs] def add_pipeline(self, pipeline): """ Adds a pipeline to the BSPump. **Parameters** pipeline : Pipeline Name of the Pipeline. """ if pipeline.Id in self.Pipelines: raise RuntimeError( "Pipeline with id '{}' is already registered".format(pipeline.Id) ) self.Pipelines[pipeline.Id] = pipeline
[docs] def add_pipelines(self, *pipelines): """ Adds a pipelines the BSPump. **Parameters** *pipelines : list List of pipelines that are add to the BSPump. """ for pipeline in pipelines: self.add_pipeline(pipeline)
[docs] def del_pipeline(self, pipeline): """ Deletes a pipeline from a list of Pipelines. **Parameters* pipeline : str, ID ID of a pipeline. """ del self.Pipelines[pipeline.Id]
# Connections
[docs] def add_connection(self, connection): """ Adds a connection to the Connection dictionary. **Parameters** connection : str, ID ID of a connection. :return: connection """ if connection.Id in self.Connections: raise RuntimeError("Connection '{}' already created".format(connection.Id)) self.Connections[connection.Id] = connection return connection
[docs] def add_connections(self, *connections): """ Adds a connections to the Connection dictionary. **Parameters** *connection : str, ID list of IDs of a connections. """ for connection in connections: self.add_connection(connection)
[docs] def locate_connection(self, connection_id): """ Locates connection based on connection ID. **Parameters** connection_id : ID Connection ID. """ from .abc.connection import Connection if isinstance(connection_id, Connection): return connection_id try: return self.Connections[connection_id] except KeyError: raise KeyError( "Cannot find connection id '{}' (did you call add_connection() before add_pipeline() ?)".format( connection_id ) )
# Lookups
[docs] def add_lookup(self, lookup): """ Sets a lookup based on Lookup. **Parameters** lookup : Lookup Name of the Lookup. :return: lookup """ if lookup.Id in self.Lookups: raise RuntimeError("Lookup '{}' already created".format(lookup.Id)) self.Lookups[lookup.Id] = lookup return lookup
[docs] def add_lookups(self, *lookups): """ Adds a list of lookups to the Pipeline. **Parameters** lookup : Lookup List of Lookups. """ for lookup in lookups: self.add_lookup(lookup)
[docs] def locate_lookup(self, lookup_id, context=None): """ Locates lookup based on ID. **Parameters** lookup_id : ID ID of a Lookup. context : ,default = None Additional information. :return: lookup from the lookup service or form the internal dictionary. """ from .abc.lookup import Lookup if isinstance(lookup_id, Lookup): return lookup_id # TODO: Make sure the lookup is always properly returned # #1 - Return lookup from the lookup service for lookup_factory in self.LookupFactories: lookup = lookup_factory.locate_lookup(lookup_id, context) if lookup is not None: return lookup # #2 - Return lookup from the internal dictionary try: return self.Lookups[lookup_id] except KeyError: pass raise KeyError( "Cannot find lookup id '{}' (did you call add_lookup() ?)".format(lookup_id) )
[docs] def add_lookup_factory(self, lookup_factory): """ Adds a lookup factory **Parameters** lookup_factory : Name of lookup factory. """ self.LookupFactories.append(lookup_factory)
# Matrixes
[docs] def add_matrix(self, matrix): """ Adds a matrix to the Pipeline. **Parameters** matrix : Matrix Name of Matrix. :return: matrix """ if matrix.Id in self.Matrixes: raise RuntimeError("Matrix '{}' already created".format(matrix.Id)) self.Matrixes[matrix.Id] = matrix return matrix
[docs] def add_matrixes(self, *matrixes): """ Adds a list of Matrices to the Pipeline. **Parameters** *matrixes : list List of matrices. """ for matrix in matrixes: self.add_matrix(matrix)
[docs] def locate_matrix(self, matrix_id): """ Locates a matrix based on matrix ID **Parameters** matrix_id : str, ID ID of a matrix. """ from .matrix.matrix import Matrix if isinstance(matrix_id, Matrix): return matrix_id try: return self.Matrixes[matrix_id] except KeyError: raise KeyError( "Cannot find matrix id '{}' (did you call add_matrix() ?)".format( matrix_id ) )
#
[docs] async def initialize(self, app): """ Initializes an Application based on ASAB `Application <https://asab.readthedocs.io/en/latest/asab/application.html>`_ **Parameters** app : Application Name of the `Application <https://asab.readthedocs.io/en/latest/asab/application.html>`_ """ # Run initialization of lookups lookup_update_tasks = [] for lookup in self.Lookups.values(): if not lookup.Lazy: lookup_update_tasks.append(lookup.ensure_future_update(app.Loop)) # Await all lookups if len(lookup_update_tasks) > 0: done, pending = await asyncio.wait(lookup_update_tasks) # Start all pipelines for pipeline in self.Pipelines.values(): pipeline.start() if self.App.MQTTService is not None: self.App.MQTTService.components_initialize()
[docs] async def finalize(self, app): """ Stops all the pipelines **Parameters** app : Application Name of the `Application <https://asab.readthedocs.io/en/latest/asab/application.html>`_ :return: """ # Stop all started pipelines if len(self.Pipelines) > 0: await asyncio.gather( *[pipeline.stop() for pipeline in self.Pipelines.values()] )