Processor¶
Processors transform, filter, enrich, or route events as they flow through a pipeline. They sit between the source and sink.
Basic Processor¶
A processor implements the process method:
import bspump
class MyProcessor(bspump.Processor):
def process(self, context, event):
# Transform the event
event["processed"] = True
return event
The process method receives:
context: A dictionary with metadata about the event
event: The event to process
It must return:
The transformed event
Noneto drop the event (filter it out)
Filtering Events¶
Return None to filter out events:
class FilterProcessor(bspump.Processor):
def process(self, context, event):
if event.get("type") == "spam":
return None # Drop spam events
return event
Async Processors¶
For I/O-bound operations, use async processors:
class AsyncProcessor(bspump.Processor):
async def process(self, context, event):
# Async operations are supported
result = await self.fetch_enrichment_data(event["id"])
event["enriched"] = result
return event
Generator (1-to-Many)¶
When you need to produce multiple events from a single input:
import bspump
class SplitGenerator(bspump.Generator):
async def generate(self, context, event, depth):
# Split a batch into individual events
for item in event["items"]:
self.Pipeline.inject(context, item, depth)
Built-in Processors¶
BSPump includes many utility processors:
JSON Parsing
import bspump.common
# Parse JSON bytes to dict
processor = bspump.common.JsonBytesToDictParser(app, pipeline)
# Convert dict to JSON bytes
processor = bspump.common.DictToJsonBytesParser(app, pipeline)
Mapping and Transformation
# Apply a function to each event
processor = bspump.common.MappingProcessor(app, pipeline, mapping={
"old_key": "new_key"
})
Routing
# Route events to different pipelines
processor = bspump.common.RouterProcessor(app, pipeline, routing={
"type_a": "PipelineA",
"type_b": "PipelineB"
})
Processor Chains¶
Processors are chained together in the pipeline:
class MyPipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
MySource(app, self),
ValidateProcessor(app, self), # Step 1
EnrichProcessor(app, self), # Step 2
TransformProcessor(app, self), # Step 3
MySink(app, self),
)
Events flow through processors in order.
Error Handling¶
Handle errors gracefully in processors:
class SafeProcessor(bspump.Processor):
def process(self, context, event):
try:
return self.risky_transform(event)
except Exception as e:
L.error(f"Processing error: {e}")
# Either return None to drop, or return original
return None
For recoverable errors, consider routing failed events to a dead-letter queue.