Processor

Processors transform, filter, enrich, or route events as they flow through a pipeline. They sit between the source and sink.

Basic Processor

A processor implements the process method:

import bspump

class MyProcessor(bspump.Processor):
    def process(self, context, event):
        # Transform the event
        event["processed"] = True
        return event

The process method receives:

  • context: A dictionary with metadata about the event

  • event: The event to process

It must return:

  • The transformed event

  • None to drop the event (filter it out)

Filtering Events

Return None to filter out events:

class FilterProcessor(bspump.Processor):
    def process(self, context, event):
        if event.get("type") == "spam":
            return None  # Drop spam events
        return event

Async Processors

For I/O-bound operations, use async processors:

class AsyncProcessor(bspump.Processor):
    async def process(self, context, event):
        # Async operations are supported
        result = await self.fetch_enrichment_data(event["id"])
        event["enriched"] = result
        return event

Generator (1-to-Many)

When you need to produce multiple events from a single input:

import bspump

class SplitGenerator(bspump.Generator):
    async def generate(self, context, event, depth):
        # Split a batch into individual events
        for item in event["items"]:
            self.Pipeline.inject(context, item, depth)

Built-in Processors

BSPump includes many utility processors:

JSON Parsing

import bspump.common

# Parse JSON bytes to dict
processor = bspump.common.JsonBytesToDictParser(app, pipeline)

# Convert dict to JSON bytes
processor = bspump.common.DictToJsonBytesParser(app, pipeline)

Mapping and Transformation

# Apply a function to each event
processor = bspump.common.MappingProcessor(app, pipeline, mapping={
    "old_key": "new_key"
})

Routing

# Route events to different pipelines
processor = bspump.common.RouterProcessor(app, pipeline, routing={
    "type_a": "PipelineA",
    "type_b": "PipelineB"
})

Processor Chains

Processors are chained together in the pipeline:

class MyPipeline(bspump.Pipeline):
    def __init__(self, app, pipeline_id):
        super().__init__(app, pipeline_id)
        self.build(
            MySource(app, self),
            ValidateProcessor(app, self),      # Step 1
            EnrichProcessor(app, self),        # Step 2
            TransformProcessor(app, self),    # Step 3
            MySink(app, self),
        )

Events flow through processors in order.

Error Handling

Handle errors gracefully in processors:

class SafeProcessor(bspump.Processor):
    def process(self, context, event):
        try:
            return self.risky_transform(event)
        except Exception as e:
            L.error(f"Processing error: {e}")
            # Either return None to drop, or return original
            return None

For recoverable errors, consider routing failed events to a dead-letter queue.