Source code for bspump.file.filecsvsource
import csv
import logging
import os
from .fileabcsource import FileABCSource
L = logging.getLogger(__file__)
[docs]
class FileCSVSource(FileABCSource):
"""
Description:
"""
ConfigDefaults = {
"mode": "r",
"newline": "", # Required by CSV parser
"dialect": "excel",
"delimiter": ",",
"doublequote": True,
"escapechar": "",
"lineterminator": os.linesep,
"quotechar": '"',
"quoting": csv.QUOTE_MINIMAL,
"skipinitialspace": False,
"strict": False,
}
[docs]
def __init__(self, app, pipeline, fieldnames=None, id=None, config=None):
super().__init__(app, pipeline, id=id, config=config)
self.Dialect = csv.get_dialect(self.Config["dialect"])
self.FieldNames = fieldnames
[docs]
def reader(self, f):
"""
Description:
**Parameters**
f :
:returns: ??
|
"""
kwargs = {}
v = self.Config.get("delimiter")
if v is not None:
kwargs["delimiter"] = v
v = self.Config.get("doublequote")
if v is not None:
kwargs["doublequote"] = v
v = self.Config.get("escapechar")
if v is not None:
kwargs["escapechar"] = v
v = self.Config.get("lineterminator")
if v is not None:
kwargs["lineterminator"] = v
v = self.Config.get("quotechar")
if v is not None:
kwargs["quotechar"] = v
v = self.Config.get("quoting")
if v is not None:
kwargs["quoting"] = v
v = self.Config.get("skipinitialspace")
if v is not None:
kwargs["skipinitialspace"] = v
v = self.Config.get("strict")
if v is not None:
kwargs["strict"] = v
return csv.DictReader(
f, dialect=self.Dialect, fieldnames=self.FieldNames, **kwargs
)
[docs]
async def read(self, filename, f):
"""
Description:
**Parameters**
filename :
f :
"""
for line in self.reader(f):
await self.process(line, {"filename": filename})
await self.simulate_event()