Source code for bspump.file.filecsvsource

import csv
import logging
import os

from .fileabcsource import FileABCSource


L = logging.getLogger(__file__)


[docs] class FileCSVSource(FileABCSource): """ Description: """ ConfigDefaults = { "mode": "r", "newline": "", # Required by CSV parser "dialect": "excel", "delimiter": ",", "doublequote": True, "escapechar": "", "lineterminator": os.linesep, "quotechar": '"', "quoting": csv.QUOTE_MINIMAL, "skipinitialspace": False, "strict": False, }
[docs] def __init__(self, app, pipeline, fieldnames=None, id=None, config=None): super().__init__(app, pipeline, id=id, config=config) self.Dialect = csv.get_dialect(self.Config["dialect"]) self.FieldNames = fieldnames
[docs] def reader(self, f): """ Description: **Parameters** f : :returns: ?? | """ kwargs = {} v = self.Config.get("delimiter") if v is not None: kwargs["delimiter"] = v v = self.Config.get("doublequote") if v is not None: kwargs["doublequote"] = v v = self.Config.get("escapechar") if v is not None: kwargs["escapechar"] = v v = self.Config.get("lineterminator") if v is not None: kwargs["lineterminator"] = v v = self.Config.get("quotechar") if v is not None: kwargs["quotechar"] = v v = self.Config.get("quoting") if v is not None: kwargs["quoting"] = v v = self.Config.get("skipinitialspace") if v is not None: kwargs["skipinitialspace"] = v v = self.Config.get("strict") if v is not None: kwargs["strict"] = v return csv.DictReader( f, dialect=self.Dialect, fieldnames=self.FieldNames, **kwargs )
[docs] async def read(self, filename, f): """ Description: **Parameters** filename : f : """ for line in self.reader(f): await self.process(line, {"filename": filename}) await self.simulate_event()