Source code for bspump.common.tee

import logging
from .routing import InternalSource, RouterProcessor


L = logging.getLogger(__name__)


[docs] class TeeSource(InternalSource): """ Description: class SamplePipeline(bspump.Pipeline): def __init__(self, app, pipeline_id): super().__init__(app, pipeline_id) self.build( bspump.socket.TCPStreamSource(app, self, config={'port': 7000}), bspump.common.TeeProcessor(app, self).bind("SampleTeePipeline.*TeeSource"), bspump.common.PPrintSink(app, self) ) class SampleTeePipeline(bspump.Pipeline): def __init__(self, app, pipeline_id): super().__init__(app, pipeline_id) self.build( bspump.common.TeeSource(app, self), bspump.common.PPrintSink(app, self) ) | """
[docs] def __init__(self, app, pipeline, id=None, config=None): """ Description: | """ super().__init__(app, pipeline, id=id, config=config) self.Targets = [] self._svc = app.get_service("bspump.PumpService")
[docs] def bind(self, target): """ Description: :return: | """ self.Targets.append(target) return self
[docs] async def main(self): """ Description: :return: | """ unbind_processor = [] for target in self.Targets: processor = self._svc.locate(target) if processor is None: L.warning( "TeeSource '{}' cannot find processor '{}'".format(self.Id, target) ) return if not isinstance(processor, TeeProcessor): L.warning( "TeeSource '{}' requires TeeProcessor as target, not '{}'".format( self.Id, target ) ) return processor.bind(self.locate_address()) unbind_processor.append(processor) try: await super().main() finally: for processor in unbind_processor: processor.unbind(self.locate_address())
#
[docs] class TeeProcessor(RouterProcessor): """ Description: See TeeSource for details. | """ ConfigDefaults = {}
[docs] def __init__(self, app, pipeline, id=None, config=None): """ Description: | """ super().__init__(app, pipeline, id=id, config=config) self.Targets = []
[docs] def bind(self, target: str): """ Description: Target is a bspump.PumpService.locate() string :return: ? | """ self.Targets.append(target) return self
[docs] def unbind(self, target: str): """ Description: :return: ? | """ self.Targets.remove(target) self.unlocate(target) return self
[docs] def process(self, context, event): """ Description: :return: event | """ for source in self.Targets: self.route(context, event, source) return event