Source code for bspump.file.filecsvsink

import csv
import logging
import os

from ..abc.sink import Sink

#

L = logging.getLogger(__file__)


#


[docs] class FileCSVSink(Sink): """ Description: ** Default Config** path : '' dialect : 'excel' delimiter : ',' doublequote : True escapechar : "" lineterminator : os.linesep quotechar : '"' quoting : csv.QUOTE_MINIMAL skipinitialspace : False strict : False """ ConfigDefaults = { "path": "", "dialect": "excel", "delimiter": ",", "doublequote": True, "escapechar": "", "lineterminator": os.linesep, "quotechar": '"', "quoting": csv.QUOTE_MINIMAL, # 0 - 3 for [QUOTE_MINIMAL, QUOTE_ALL, QUOTE_NONNUMERIC, QUOTE_NONE] "skipinitialspace": False, "strict": False, }
[docs] def __init__(self, app, pipeline, id=None, config=None): """ Description: """ super().__init__(app, pipeline, id=id, config=config) self.Dialect = csv.get_dialect(self.Config["dialect"]) self._csv_writer = None
[docs] def get_file_name(self, context, event): """ Description: Override this method to gain control over output file name. **Parameters** context : event : :return: path of context and config | """ # Here we are able to modify the sink behavior from outside using context. # If provided, the filename (and path) is taken from the context instead of the config return context.get("path", self.Config["path"])
[docs] def writer(self, f, fieldnames): """ Description: **Parameters** f : fieldnames : file Name of the file. :return: dialect and fieldnames | """ kwargs = dict() kwargs["delimiter"] = self.Config.get("delimiter") kwargs["doublequote"] = bool(self.Config.get("doublequote")) escape_char = self.Config.get("escapechar") escape_char = None if escape_char == "" else escape_char kwargs["escapechar"] = escape_char kwargs["lineterminator"] = self.Config.get("lineterminator") kwargs["quotechar"] = self.Config.get("quotechar") kwargs["quoting"] = int(self.Config.get("quoting")) kwargs["skipinitialspace"] = bool(self.Config.get("skipinitialspace")) kwargs["strict"] = bool(self.Config.get("strict")) return csv.DictWriter(f, dialect=self.Dialect, fieldnames=fieldnames, **kwargs)
[docs] def process(self, context, event): """ Description: **Parameters** context : event : any data type Information with timestamp. """ if self._csv_writer is None: # Open CSV file if needed fieldnames = event.keys() fname = self.get_file_name(context, event) self.fd = open(fname, "w", newline="") self._csv_writer = self.writer(self.fd, fieldnames) self._csv_writer.writeheader() self._csv_writer.writerow(event) self.fd.flush()
[docs] def rotate(self): """ Description: Call this to close the currently open file. """ del self._csv_writer self._csv_writer = None