Sink

Sinks are the exit points of a pipeline. They receive processed events and write them to external systems, files, or other destinations.

Basic Sink

A sink implements the process method:

import bspump

class MySink(bspump.Sink):
    def process(self, context, event):
        # Write event to destination
        self.write_to_database(event)

Async Sinks

For I/O-bound operations, use async sinks:

class AsyncSink(bspump.Sink):
    async def process(self, context, event):
        await self.async_write(event)

Built-in Sinks

BSPump includes many built-in sinks:

Debug/Development

import bspump.common

# Pretty print events to console
sink = bspump.common.PPrintSink(app, pipeline)

# Null sink (discard events)
sink = bspump.common.NullSink(app, pipeline)

Kafka

import bspump.kafka

sink = bspump.kafka.KafkaSink(
    app, pipeline,
    connection="KafkaConnection"
)

HTTP

import bspump.http.client

sink = bspump.http.client.HTTPClientSink(
    app, pipeline,
    config={"url": "https://api.example.com/events"}
)

File Output

import bspump.file

# Line-by-line output
sink = bspump.file.FileLineSink(app, pipeline, config={
    "path": "/data/output.txt"
})

See Integrations for the full list of available sinks.

Batching

Some sinks support batching for efficiency:

sink = bspump.kafka.KafkaSink(
    app, pipeline,
    connection="KafkaConnection",
    config={
        "batch_size": 100,
        "batch_timeout": 1.0  # seconds
    }
)

The sink will accumulate events and write them in batches.

Backpressure

Sinks can signal backpressure to the pipeline when they cannot keep up:

class SlowSink(bspump.Sink):
    async def process(self, context, event):
        # If the destination is slow, this creates backpressure
        await self.slow_write(event)

When backpressure occurs, the pipeline throttles the source.

Error Handling

Handle sink errors appropriately:

class ResilientSink(bspump.Sink):
    async def process(self, context, event):
        for attempt in range(3):
            try:
                await self.write(event)
                return
            except ConnectionError:
                await asyncio.sleep(1 * attempt)

        # After retries, handle failure
        await self.send_to_dead_letter_queue(event)

Multiple Outputs

To send events to multiple destinations, use a routing processor or multiple pipelines sharing a source.

Sink Configuration

Sinks can be configured via pipelines.conf:

[pipeline:MyPipeline:MySink]
url=https://api.example.com/events
batch_size=100
timeout=30