Source code for bspump.common.tee
import logging
from .routing import InternalSource, RouterProcessor
L = logging.getLogger(__name__)
[docs]
class TeeSource(InternalSource):
"""
Description:
class SamplePipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.socket.TCPStreamSource(app, self, config={'port': 7000}),
bspump.common.TeeProcessor(app, self).bind("SampleTeePipeline.*TeeSource"),
bspump.common.PPrintSink(app, self)
)
class SampleTeePipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.common.TeeSource(app, self),
bspump.common.PPrintSink(app, self)
)
|
"""
[docs]
def __init__(self, app, pipeline, id=None, config=None):
"""
Description:
|
"""
super().__init__(app, pipeline, id=id, config=config)
self.Targets = []
self._svc = app.get_service("bspump.PumpService")
[docs]
def bind(self, target):
"""
Description:
:return:
|
"""
self.Targets.append(target)
return self
[docs]
async def main(self):
"""
Description:
:return:
|
"""
unbind_processor = []
for target in self.Targets:
processor = self._svc.locate(target)
if processor is None:
L.warning(
"TeeSource '{}' cannot find processor '{}'".format(self.Id, target)
)
return
if not isinstance(processor, TeeProcessor):
L.warning(
"TeeSource '{}' requires TeeProcessor as target, not '{}'".format(
self.Id, target
)
)
return
processor.bind(self.locate_address())
unbind_processor.append(processor)
try:
await super().main()
finally:
for processor in unbind_processor:
processor.unbind(self.locate_address())
#
[docs]
class TeeProcessor(RouterProcessor):
"""
Description: See TeeSource for details.
|
"""
ConfigDefaults = {}
[docs]
def __init__(self, app, pipeline, id=None, config=None):
"""
Description:
|
"""
super().__init__(app, pipeline, id=id, config=config)
self.Targets = []
[docs]
def bind(self, target: str):
"""
Description: Target is a bspump.PumpService.locate() string
:return: ?
|
"""
self.Targets.append(target)
return self
[docs]
def unbind(self, target: str):
"""
Description:
:return: ?
|
"""
self.Targets.remove(target)
self.unlocate(target)
return self
[docs]
def process(self, context, event):
"""
Description:
:return: event
|
"""
for source in self.Targets:
self.route(context, event, source)
return event