Source code for bspump.file.fileblocksink

import os
import logging
from ..abc.sink import Sink


L = logging.getLogger(__file__)


[docs] class FileBlockSink(Sink): """ Description: ** Config Defaults ** path : '' mode : wb flags : O_CREAT """ ConfigDefaults = { "path": "", "mode": "wb", "flags": "O_CREAT", } OFlagDict = {"O_CREAT": os.O_CREAT, "O_EXCL": os.O_EXCL}
[docs] def __init__(self, app, pipeline, id=None, config=None): """ **Parameters** app : Application Name of the Application pipeline : Pipeline Name of the Pipeline. id : ID, default = None ID config : JSON, default = None Configuration file with additional information. """ super().__init__(app, pipeline, id=id, config=config) self._oflags = 0 for flag in self.Config["flags"].split(","): flag = flag.strip() try: self._oflags |= self.OFlagDict[flag] except KeyError: L.warning("Unknown oflag '{}'".format(flag))
[docs] def get_file_name(self, context, event): """ Override this method to gain control over output file name. **Parameters** context : event : any type a single unit of information that is propagated through the pipeline :return: config path | """ return self.Config["path"]
[docs] def process(self, context, event): """ Opens a file. **Parameters** context : event : any type a single unit of information that is propagated through the pipeline """ fname = self.get_file_name(context, event) fd = os.open(fname, os.O_WRONLY | self._oflags) with os.fdopen(fd, self.Config["mode"]) as fo: fo.write(event)