bspump.file¶
File-based sources and sinks for BSPump.
- class bspump.file.FileBlockSink(app, pipeline, id=None, config=None)[source]¶
Bases:
SinkDescription:
** Config Defaults **
path : ‘’
mode : wb
flags : O_CREAT
- OFlagDict = {'O_CREAT': 64, 'O_EXCL': 128}¶
- __init__(app, pipeline, id=None, config=None)[source]¶
Parameters
- appApplication
Name of the Application
- pipelinePipeline
Name of the Pipeline.
- idID, default = None
ID
- configJSON, default = None
Configuration file with additional information.
- class bspump.file.FileBlockSource(app, pipeline, id=None, config=None)[source]¶
Bases:
FileABCSourceDescription:
- class bspump.file.FileCSVSink(app, pipeline, id=None, config=None)[source]¶
Bases:
SinkDescription:
** Default Config**
path : ‘’
dialect : ‘excel’
delimiter : ‘,’
doublequote : True
escapechar : “”
lineterminator : os.linesep
quotechar : ‘”’
quoting : csv.QUOTE_MINIMAL
skipinitialspace : False
strict : False
- ConfigDefaults: dict = {'delimiter': ',', 'dialect': 'excel', 'doublequote': True, 'escapechar': '', 'lineterminator': '\n', 'path': '', 'quotechar': '"', 'quoting': 0, 'skipinitialspace': False, 'strict': False}¶
- get_file_name(context, event)[source]¶
Description: Override this method to gain control over output file name.
Parameters
context :
event :
- Returns:
path of context and config
- writer(f, fieldnames)[source]¶
Description:
Parameters
f :
- fieldnamesfile
Name of the file.
- Returns:
dialect and fieldnames
- class bspump.file.FileCSVSource(app, pipeline, fieldnames=None, id=None, config=None)[source]¶
Bases:
FileABCSourceDescription:
- ConfigDefaults: dict = {'delimiter': ',', 'dialect': 'excel', 'doublequote': True, 'escapechar': '', 'lineterminator': '\n', 'mode': 'r', 'newline': '', 'quotechar': '"', 'quoting': 0, 'skipinitialspace': False, 'strict': False}¶
- __init__(app, pipeline, fieldnames=None, id=None, config=None)[source]¶
Description:
Parameters
- appApplication
Name of the Application.
- pipelinePipeline
Name of the Pipeline.
- idID, default = None
ID
- configJSON, default = None
Configuration file with additional information. path : str (required)
Path to the file. Can be a glob pattern to select multiple files.
- modestr, default = ‘rb’
Mode in which the file is opened.
- newlinestr, default = os.linesep
Newline character.
- poststr, default = ‘move’
One of ‘delete’, ‘noop’ and ‘move’.
- excludestr, default = ‘’
Glob of filenames that should be excluded (has precedence over ‘include’).
- includestr, default = ‘’
Glob of filenames that should be included.
- encodingstr, default = ‘’
Encoding of the file.
- move_destinationstr, default = ‘’
Destination folder for ‘move’. Make sure it’s outside of the glob search.
- lines_per_eventint, default = 10000
The number of lines after which the read method enters the idle state to allow other operations to perform their tasks.
- event_idle_timefloat, default = 0.01
The time for which the read method enters the idle state (see above).
- files_per_cycleint, default = 1
The number of files that are processed in one cycle.
- class bspump.file.FileJSONSource(app, pipeline, id=None, config=None)[source]¶
Bases:
FileABCSourceDescription: This file source is optimized to load even large JSONs from a file and parse that. The loading & parsing is off-loaded to the worker thread so that it doesn’t block the IO loop.
- class bspump.file.FileLineSource(app, pipeline, id=None, config=None)[source]¶
Bases:
FileABCSourceDescription:
- __init__(app, pipeline, id=None, config=None)[source]¶
Description:
Parameters
- app: Application
Name of the Application
- pipelinePipeline
Name of the Pipeline
id : ID, default = None
- configJSON, default = None
Configuration file with additional information
- class bspump.file.FileMultiLineSource(app, pipeline, separator, id=None, config=None)[source]¶
Bases:
FileABCSourceDescription: Read file line by line but try to join multi-line events by separator. Separator is a (fixed) pattern that should present at the begin of the line, if it is a new event.
Example: <133>1 2018-03-24T02:37:01+00:00 machine program 22068 - Start of the multiline event
2nd line of the event 3rd line of the event
<133>1 2018-03-24T02:37:01+00:00 machine program 22068 - New event
The separatpr is ‘<’ string in this case
- __init__(app, pipeline, separator, id=None, config=None)[source]¶
Description:
Parameters
- app: Application
Name of the Application
- pipelinePipeline
Name of the Pipeline
separator :
id : ID, default = None
- configJSON, default = None
Configuration file with additional information
- class bspump.file.FileBatchLookupProvider(lookup, url, id=None, config=None)[source]¶
Bases:
LookupBatchProviderABCLoads lookup data from a file on local filesystem.
Sources¶
FileLineSource¶
Reads files line by line.
import bspump.file
source = bspump.file.FileLineSource(app, pipeline, config={
"path": "/data/input.txt"
})
Configuration:
[pipeline:MyPipeline:FileLineSource]
path=/data/input.txt
encoding=utf-8
FileCSVSource¶
Reads CSV files with automatic parsing.
source = bspump.file.FileCSVSource(app, pipeline, config={
"path": "/data/input.csv"
})
Configuration:
[pipeline:MyPipeline:FileCSVSource]
path=/data/input.csv
delimiter=,
has_header=true
Each row becomes a dictionary with column names as keys.
FileJSONSource¶
Reads JSON files.
source = bspump.file.FileJSONSource(app, pipeline, config={
"path": "/data/input.json"
})
Sinks¶
FileLineSink¶
Writes events as lines to a file.
sink = bspump.file.FileLineSink(app, pipeline, config={
"path": "/data/output.txt"
})
Configuration:
[pipeline:MyPipeline:FileLineSink]
path=/data/output.txt
mode=w
Modes:
w- Write (overwrite)a- Append
FileCSVSink¶
Writes events as CSV rows.
sink = bspump.file.FileCSVSink(app, pipeline, config={
"path": "/data/output.csv"
})
Configuration:
[pipeline:MyPipeline:FileCSVSink]
path=/data/output.csv
columns=id,name,value
delimiter=,
Glob Patterns¶
Process multiple files:
[pipeline:MyPipeline:FileLineSource]
path=/data/logs/*.log
Configuration Options¶
Common Source Options:
path- File path or glob patternencoding- File encoding (default: utf-8)
CSV Options:
delimiter- Field delimiter (default: ,)has_header- Whether file has header row (default: true)
Sink Options:
path- Output file pathmode- Write mode (w or a)encoding- File encoding (default: utf-8)
Example Pipeline¶
import bspump
import bspump.file
class CSVProcessingPipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.file.FileCSVSource(app, self, config={
"path": "/data/input.csv"
}),
TransformProcessor(app, self),
bspump.file.FileCSVSink(app, self, config={
"path": "/data/output.csv"
}),
)