File Integration

BSPump provides file-based sources and sinks for processing CSV, JSON, and line-based files.

Components

  • FileLineSource: Reads files line by line

  • FileCSVSource: Reads CSV files

  • FileJSONSource: Reads JSON files

  • FileLineSink: Writes files line by line

  • FileCSVSink: Writes CSV files

FileLineSource

Reads a file line by line.

import bspump.file

source = bspump.file.FileLineSource(app, pipeline, config={
    "path": "/data/input.txt"
})

Configuration:

[pipeline:MyPipeline:FileLineSource]
path=/data/input.txt
# Encoding (optional)
encoding=utf-8

FileCSVSource

Reads CSV files with automatic parsing.

import bspump.file

source = bspump.file.FileCSVSource(app, pipeline, config={
    "path": "/data/input.csv"
})

Configuration:

[pipeline:MyPipeline:FileCSVSource]
path=/data/input.csv
# CSV options
delimiter=,
has_header=true

Each row becomes a dictionary with column names as keys.

FileJSONSource

Reads JSON files (one object per line or array).

import bspump.file

source = bspump.file.FileJSONSource(app, pipeline, config={
    "path": "/data/input.json"
})

Configuration:

[pipeline:MyPipeline:FileJSONSource]
path=/data/input.json

FileLineSink

Writes events to a file, one per line.

import bspump.file

sink = bspump.file.FileLineSink(app, pipeline, config={
    "path": "/data/output.txt"
})

Configuration:

[pipeline:MyPipeline:FileLineSink]
path=/data/output.txt
mode=w  # w for write, a for append

FileCSVSink

Writes events as CSV rows.

import bspump.file

sink = bspump.file.FileCSVSink(app, pipeline, config={
    "path": "/data/output.csv"
})

Configuration:

[pipeline:MyPipeline:FileCSVSink]
path=/data/output.csv
delimiter=,
# Columns to write (optional, defaults to all keys)
columns=id,name,value

Complete Example

Process a CSV file and output results:

from bspump.jupyter import *
import bspump.file

auto_pipeline(
    source=lambda app, pipeline: bspump.file.FileCSVSource(
        app, pipeline, config={"path": "/data/input.csv"}
    ),
    sink=lambda app, pipeline: bspump.file.FileCSVSink(
        app, pipeline, config={"path": "/data/output.csv"}
    ),
    name="CSVProcessingPipeline",
)

# Process each row (event is a dict with column names as keys)
event["processed"] = True
event["value"] = float(event["value"]) * 2

Glob Patterns

Process multiple files using glob patterns:

[pipeline:MyPipeline:FileLineSource]
path=/data/logs/*.log

File Watching

Watch for new files in a directory:

import bspump.file

source = bspump.file.FileWatchSource(app, pipeline, config={
    "path": "/data/incoming/",
    "pattern": "*.csv"
})

Processing Large Files

For large files, use streaming:

class StreamingSource(bspump.file.FileLineSource):
    def __init__(self, app, pipeline, id=None, config=None):
        super().__init__(app, pipeline, id, config)
        # Process in chunks
        self.batch_size = 1000

Temporary Files

Write to temporary files and move on completion:

import bspump.file
import shutil

class SafeFileSink(bspump.file.FileLineSink):
    def __init__(self, app, pipeline, id=None, config=None):
        self.final_path = config.get("path")
        config["path"] = self.final_path + ".tmp"
        super().__init__(app, pipeline, id, config)

    async def on_completed(self):
        shutil.move(self.path, self.final_path)

Configuration Reference

Source Options

Option

Default

Description

path

(required)

File path or glob pattern

encoding

utf-8

File encoding

delimiter

, (CSV only)

CSV delimiter

has_header

true (CSV only)

Whether CSV has header row

Sink Options

Option

Default

Description

path

(required)

Output file path

mode

w

Write mode (w or a)

encoding

utf-8

File encoding