Source code for bspump.file.filelinesource
import logging
from .fileabcsource import FileABCSource
#
L = logging.getLogger(__file__)
#
[docs]
class FileLineSource(FileABCSource):
"""
Description:
|
"""
[docs]
def __init__(self, app, pipeline, id=None, config=None):
"""
Description:
**Parameters**
app: Application
Name of the `Application <https://asab.readthedocs.io/en/latest/asab/application.html>`_
pipeline : Pipeline
Name of the Pipeline
id : ID, default = None
config : JSON, default = None
Configuration file with additional information
"""
super().__init__(app, pipeline, id=id, config=config)
[docs]
async def read(self, filename, f):
"""
Description:
**Parameters**
filename :
f :
"""
for line in f:
await self.process(line, {"filename": filename})
await self.simulate_event()
#
[docs]
class FileMultiLineSource(FileABCSource):
"""
Description: Read file line by line but try to join multi-line events by separator.
Separator is a (fixed) pattern that should present at the begin of the line, if it is a new event.
Example:
<133>1 2018-03-24T02:37:01+00:00 machine program 22068 - Start of the multiline event
2nd line of the event
3rd line of the event
<133>1 2018-03-24T02:37:01+00:00 machine program 22068 - New event
The separatpr is '<' string in this case
|
"""
[docs]
def __init__(self, app, pipeline, separator, id=None, config=None):
"""
Description:
**Parameters**
app: Application
Name of the `Application <https://asab.readthedocs.io/en/latest/asab/application.html>`_
pipeline : Pipeline
Name of the Pipeline
separator :
id : ID, default = None
config : JSON, default = None
Configuration file with additional information
"""
super().__init__(app, pipeline, id=id, config=config)
if isinstance(separator, str):
separator = separator.encode("utf-8")
self._separator = separator
# TODO: self._max_latch_size = 10000
[docs]
async def read(self, filename, f):
"""
Description:
**Parameters**
filename :
f :
"""
latch = None
for line in f:
if line.startswith(self._separator) and latch is not None:
await self.process(latch)
latch = line
else:
if latch is None:
latch = line
else:
latch = latch + b"\n" + line
await self.simulate_event()
if latch is not None:
await self.process(latch, {"filename": filename})