bspump.file

File-based sources and sinks for BSPump.

class bspump.file.FileBlockSink(app, pipeline, id=None, config=None)[source]

Bases: Sink

Description:

** Config Defaults **

path : ‘’

mode : wb

flags : O_CREAT

ConfigDefaults: dict = {'flags': 'O_CREAT', 'mode': 'wb', 'path': ''}
OFlagDict = {'O_CREAT': 64, 'O_EXCL': 128}
__init__(app, pipeline, id=None, config=None)[source]

Parameters

appApplication

Name of the Application

pipelinePipeline

Name of the Pipeline.

idID, default = None

ID

configJSON, default = None

Configuration file with additional information.

get_file_name(context, event)[source]

Override this method to gain control over output file name.

Parameters

context :

eventany type

a single unit of information that is propagated through the pipeline

Returns:

config path


process(context, event)[source]

Opens a file.

Parameters

context :

eventany type

a single unit of information that is propagated through the pipeline

class bspump.file.FileBlockSource(app, pipeline, id=None, config=None)[source]

Bases: FileABCSource

Description:

__init__(app, pipeline, id=None, config=None)[source]

Description:

Parameters

appApplication

Name of the Application.

pipelinePipeline

Name of the Pipeline.

idID, default = None

ID

configJSON, default = None

Configuration file with additional information.

async read(filename, f)[source]

Loads a file.

Parameters

filenamefile

Name of the file.

f :

class bspump.file.FileCSVSink(app, pipeline, id=None, config=None)[source]

Bases: Sink

Description:

** Default Config**

path : ‘’

dialect : ‘excel’

delimiter : ‘,’

doublequote : True

escapechar : “”

lineterminator : os.linesep

quotechar : ‘”’

quoting : csv.QUOTE_MINIMAL

skipinitialspace : False

strict : False

ConfigDefaults: dict = {'delimiter': ',', 'dialect': 'excel', 'doublequote': True, 'escapechar': '', 'lineterminator': '\n', 'path': '', 'quotechar': '"', 'quoting': 0, 'skipinitialspace': False, 'strict': False}
__init__(app, pipeline, id=None, config=None)[source]

Description:

get_file_name(context, event)[source]

Description: Override this method to gain control over output file name.

Parameters

context :

event :

Returns:

path of context and config


writer(f, fieldnames)[source]

Description:

Parameters

f :

fieldnamesfile

Name of the file.

Returns:

dialect and fieldnames


process(context, event)[source]

Description:

Parameters

context :

eventany data type

Information with timestamp.

rotate()[source]

Description: Call this to close the currently open file.

class bspump.file.FileCSVSource(app, pipeline, fieldnames=None, id=None, config=None)[source]

Bases: FileABCSource

Description:

ConfigDefaults: dict = {'delimiter': ',', 'dialect': 'excel', 'doublequote': True, 'escapechar': '', 'lineterminator': '\n', 'mode': 'r', 'newline': '', 'quotechar': '"', 'quoting': 0, 'skipinitialspace': False, 'strict': False}
__init__(app, pipeline, fieldnames=None, id=None, config=None)[source]

Description:

Parameters

appApplication

Name of the Application.

pipelinePipeline

Name of the Pipeline.

idID, default = None

ID

configJSON, default = None

Configuration file with additional information. path : str (required)

Path to the file. Can be a glob pattern to select multiple files.

modestr, default = ‘rb’

Mode in which the file is opened.

newlinestr, default = os.linesep

Newline character.

poststr, default = ‘move’

One of ‘delete’, ‘noop’ and ‘move’.

excludestr, default = ‘’

Glob of filenames that should be excluded (has precedence over ‘include’).

includestr, default = ‘’

Glob of filenames that should be included.

encodingstr, default = ‘’

Encoding of the file.

move_destinationstr, default = ‘’

Destination folder for ‘move’. Make sure it’s outside of the glob search.

lines_per_eventint, default = 10000

The number of lines after which the read method enters the idle state to allow other operations to perform their tasks.

event_idle_timefloat, default = 0.01

The time for which the read method enters the idle state (see above).

files_per_cycleint, default = 1

The number of files that are processed in one cycle.

reader(f)[source]

Description:

Parameters

f :

Returns:

??


async read(filename, f)[source]

Description:

Parameters

filename :

f :

class bspump.file.FileJSONSource(app, pipeline, id=None, config=None)[source]

Bases: FileABCSource

Description: This file source is optimized to load even large JSONs from a file and parse that. The loading & parsing is off-loaded to the worker thread so that it doesn’t block the IO loop.

__init__(app, pipeline, id=None, config=None)[source]

Description:

Parameters

app :

pipeline :

idID, default= None

ID

configJSON, default = None

configuration file with additional information

async read(filename, f)[source]

Description:

Parameters

filename :

f :

class bspump.file.FileLineSource(app, pipeline, id=None, config=None)[source]

Bases: FileABCSource

Description:


__init__(app, pipeline, id=None, config=None)[source]

Description:

Parameters

app: Application

Name of the Application

pipelinePipeline

Name of the Pipeline

id : ID, default = None

configJSON, default = None

Configuration file with additional information

async read(filename, f)[source]

Description:

Parameters

filename :

f :

class bspump.file.FileMultiLineSource(app, pipeline, separator, id=None, config=None)[source]

Bases: FileABCSource

Description: Read file line by line but try to join multi-line events by separator. Separator is a (fixed) pattern that should present at the begin of the line, if it is a new event.

Example: <133>1 2018-03-24T02:37:01+00:00 machine program 22068 - Start of the multiline event

2nd line of the event 3rd line of the event

<133>1 2018-03-24T02:37:01+00:00 machine program 22068 - New event

The separatpr is ‘<’ string in this case


__init__(app, pipeline, separator, id=None, config=None)[source]

Description:

Parameters

app: Application

Name of the Application

pipelinePipeline

Name of the Pipeline

separator :

id : ID, default = None

configJSON, default = None

Configuration file with additional information

async read(filename, f)[source]

Description:

Parameters

filename :

f :

class bspump.file.FileBatchLookupProvider(lookup, url, id=None, config=None)[source]

Bases: LookupBatchProviderABC

Loads lookup data from a file on local filesystem.


__init__(lookup, url, id=None, config=None)[source]

Description:

async load()[source]

Description:

Returns:

result


load_on_thread()[source]

Description:


async save(data)[source]

Parameters

data :

Sources

FileLineSource

Reads files line by line.

import bspump.file

source = bspump.file.FileLineSource(app, pipeline, config={
    "path": "/data/input.txt"
})

Configuration:

[pipeline:MyPipeline:FileLineSource]
path=/data/input.txt
encoding=utf-8

FileCSVSource

Reads CSV files with automatic parsing.

source = bspump.file.FileCSVSource(app, pipeline, config={
    "path": "/data/input.csv"
})

Configuration:

[pipeline:MyPipeline:FileCSVSource]
path=/data/input.csv
delimiter=,
has_header=true

Each row becomes a dictionary with column names as keys.

FileJSONSource

Reads JSON files.

source = bspump.file.FileJSONSource(app, pipeline, config={
    "path": "/data/input.json"
})

Sinks

FileLineSink

Writes events as lines to a file.

sink = bspump.file.FileLineSink(app, pipeline, config={
    "path": "/data/output.txt"
})

Configuration:

[pipeline:MyPipeline:FileLineSink]
path=/data/output.txt
mode=w

Modes:

  • w - Write (overwrite)

  • a - Append

FileCSVSink

Writes events as CSV rows.

sink = bspump.file.FileCSVSink(app, pipeline, config={
    "path": "/data/output.csv"
})

Configuration:

[pipeline:MyPipeline:FileCSVSink]
path=/data/output.csv
columns=id,name,value
delimiter=,

Glob Patterns

Process multiple files:

[pipeline:MyPipeline:FileLineSource]
path=/data/logs/*.log

Configuration Options

Common Source Options:

  • path - File path or glob pattern

  • encoding - File encoding (default: utf-8)

CSV Options:

  • delimiter - Field delimiter (default: ,)

  • has_header - Whether file has header row (default: true)

Sink Options:

  • path - Output file path

  • mode - Write mode (w or a)

  • encoding - File encoding (default: utf-8)

Example Pipeline

import bspump
import bspump.file

class CSVProcessingPipeline(bspump.Pipeline):
    def __init__(self, app, pipeline_id):
        super().__init__(app, pipeline_id)
        self.build(
            bspump.file.FileCSVSource(app, self, config={
                "path": "/data/input.csv"
            }),
            TransformProcessor(app, self),
            bspump.file.FileCSVSink(app, self, config={
                "path": "/data/output.csv"
            }),
        )