Source code for bspump.file.filelinesource

import logging

from .fileabcsource import FileABCSource

#

L = logging.getLogger(__file__)

#


[docs] class FileLineSource(FileABCSource): """ Description: | """
[docs] def __init__(self, app, pipeline, id=None, config=None): """ Description: **Parameters** app: Application Name of the `Application <https://asab.readthedocs.io/en/latest/asab/application.html>`_ pipeline : Pipeline Name of the Pipeline id : ID, default = None config : JSON, default = None Configuration file with additional information """ super().__init__(app, pipeline, id=id, config=config)
[docs] async def read(self, filename, f): """ Description: **Parameters** filename : f : """ for line in f: await self.process(line, {"filename": filename}) await self.simulate_event()
#
[docs] class FileMultiLineSource(FileABCSource): """ Description: Read file line by line but try to join multi-line events by separator. Separator is a (fixed) pattern that should present at the begin of the line, if it is a new event. Example: <133>1 2018-03-24T02:37:01+00:00 machine program 22068 - Start of the multiline event 2nd line of the event 3rd line of the event <133>1 2018-03-24T02:37:01+00:00 machine program 22068 - New event The separatpr is '<' string in this case | """
[docs] def __init__(self, app, pipeline, separator, id=None, config=None): """ Description: **Parameters** app: Application Name of the `Application <https://asab.readthedocs.io/en/latest/asab/application.html>`_ pipeline : Pipeline Name of the Pipeline separator : id : ID, default = None config : JSON, default = None Configuration file with additional information """ super().__init__(app, pipeline, id=id, config=config) if isinstance(separator, str): separator = separator.encode("utf-8") self._separator = separator
# TODO: self._max_latch_size = 10000
[docs] async def read(self, filename, f): """ Description: **Parameters** filename : f : """ latch = None for line in f: if line.startswith(self._separator) and latch is not None: await self.process(latch) latch = line else: if latch is None: latch = line else: latch = latch + b"\n" + line await self.simulate_event() if latch is not None: await self.process(latch, {"filename": filename})