bspump.common¶
Common utilities, processors, and sinks for general use.
- class bspump.common.BytesToStringParser(app, pipeline, id=None, config=None)[source]¶
Bases:
ProcessorDescription:
- class bspump.common.StringToBytesParser(app, pipeline, id=None, config=None)[source]¶
Bases:
ProcessorDescription:
** Default Config **
encoding : utf-8
- class bspump.common.FlattenDictProcessor(app, pipeline, id=None, config=None)[source]¶
Bases:
ProcessorDescription: ….
Inspired by https://github.com/amirziai/flatten
Example:
- “person”: {
- “details”: {
“first_name”: “John”, “last_name”: “Doe”
}, “address”: {
“country”: “GB”, “city”: “London”, “postal_code”: “WC2N 5DU”
}
}
Gets converted to:
- {
“person.details.first_name”: “John”, “person.details.last_name”: “Doe”, “person.address.country”: “GB”, “person.address.city”: “London”, “person.address.postal_code”: “WC2N 5DU”
}
- class bspump.common.HexlifyProcessor(app, pipeline, id=None, config=None)[source]¶
Bases:
ProcessorDescription:
- class bspump.common.IteratorGenerator(app, pipeline, id=None, config=None)[source]¶
Bases:
GeneratorDescription:
- class bspump.common.IteratorSource(app, pipeline, iterator, id=None, config=None)[source]¶
Bases:
TriggerSourceDescription:
- Parameters:
iterator (Iterator)
- class bspump.common.StdDictToJsonParser(app, pipeline, id=None, config=None)[source]¶
Bases:
ProcessorDescription:
- class bspump.common.StdJsonToDictParser(app, pipeline, id=None, config=None)[source]¶
Bases:
ProcessorDescription:
- class bspump.common.MappingKeysGenerator(app, pipeline, id=None, config=None)[source]¶
Bases:
GeneratorDescription:
- class bspump.common.MappingValuesGenerator(app, pipeline, id=None, config=None)[source]¶
Bases:
GeneratorDescription:
- class bspump.common.MappingItemsGenerator(app, pipeline, id=None, config=None)[source]¶
Bases:
GeneratorDescription:
- class bspump.common.MappingKeysProcessor(app, pipeline, id=None, config=None)[source]¶
Bases:
ProcessorDescription: Mapping Keys Processor
- class bspump.common.MappingValuesProcessor(app, pipeline, id=None, config=None)[source]¶
Bases:
ProcessorDescription:
- class bspump.common.MappingItemsProcessor(app, pipeline, id=None, config=None)[source]¶
Bases:
ProcessorDescription:
- class bspump.common.PrintSink(app, pipeline, id=None, config=None, stream=None)[source]¶
Bases:
SinkDescription:
- class bspump.common.PPrintSink(app, pipeline, id=None, config=None, stream=None)[source]¶
Bases:
SinkDescription:
- class bspump.common.PrintProcessor(app, pipeline, id=None, config=None, stream=None)[source]¶
Bases:
ProcessorDescription:
- class bspump.common.PPrintProcessor(app, pipeline, id=None, config=None, stream=None)[source]¶
Bases:
ProcessorDescription:
- class bspump.common.PrintContextProcessor(app, pipeline, id=None, config=None, stream=None)[source]¶
Bases:
ProcessorDescription:
- class bspump.common.PPrintContextProcessor(app, pipeline, id=None, config=None, stream=None)[source]¶
Bases:
ProcessorDescription:
- class bspump.common.InternalSource(app, pipeline, id=None, config=None)[source]¶
Bases:
SourceDescription:
- put(context, event, copy_context=False, copy_event=False)[source]¶
Description: Context can be an empty dictionary if is not provided.
If you are getting a asyncio.queues.QueueFull exception, you likely did not implemented backpressure handling. The simplest approach is to use RouterSink / RouterProcessor.
- async put_async(context, event, copy_context=False, copy_event=False)[source]¶
Description: This method allows to put an event into InternalSource asynchronously. Since a processing in the pipeline is synchronous, this method is useful mainly for situation, when an event is created outside of the pipeline processing. It is designed to handle situation when the queue is becoming full.
Context can be an empty dictionary if is not provided.
- class bspump.common.RouterProcessor(app, pipeline, id=None, config=None)[source]¶
Bases:
Processor,RouterMixInDescription: Abstract Processor that dispatches events to other internal sources. One should override the process() method and call route() with target source id.
- class bspump.common.RouterSink(app, pipeline, id=None, config=None)[source]¶
Bases:
Sink,RouterMixInDescription: Abstract Sink that dispatches events to other internal sources. One should override the process() method and call route() with target source id.
- class bspump.common.TeeProcessor(app, pipeline, id=None, config=None)[source]¶
Bases:
RouterProcessorDescription: See TeeSource for details.
- class bspump.common.TeeSource(app, pipeline, id=None, config=None)[source]¶
Bases:
InternalSourceDescription:
class SamplePipeline(bspump.Pipeline):
- def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
- self.build(
bspump.socket.TCPStreamSource(app, self, config={‘port’: 7000}), bspump.common.TeeProcessor(app, self).bind(“SampleTeePipeline.*TeeSource”), bspump.common.PPrintSink(app, self)
)
class SampleTeePipeline(bspump.Pipeline):
- def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
- self.build(
bspump.common.TeeSource(app, self), bspump.common.PPrintSink(app, self)
)
- class bspump.common.TimeZoneNormalizer(app, pipeline, id=None, config=None)[source]¶
Bases:
ProcessorDescription: Normalizes datetime from local timezone (e.g. in config) to UTC, which is preferred internal datetime form
- class bspump.common.MappingTransformator(app, pipeline, id=None, config=None)[source]¶
Bases:
ProcessorDescription:
- class bspump.common.DirectSource(app, pipeline, id=None, config=None)[source]¶
Bases:
SourceDescription: This source processes inserted event synchronously.
- class bspump.common.Aggregator(app, pipeline, aggregation_strategy=<bspump.common.aggregator.ListAggregationStrategy object>, id=None, config=None)[source]¶
Bases:
GeneratorDescription:
- Parameters:
aggregation_strategy (AggregationStrategy)
- __init__(app, pipeline, aggregation_strategy=<bspump.common.aggregator.ListAggregationStrategy object>, id=None, config=None)[source]¶
Description:
- Parameters:
aggregation_strategy (AggregationStrategy)
- class bspump.common.ListAggregationStrategy[source]¶
Bases:
AggregationStrategyDescription: … test
- class bspump.common.ListEventAggregationStrategy[source]¶
Bases:
AggregationStrategyDescription:
- class bspump.common.StringAggregationStrategy(delimiter='\n')[source]¶
Bases:
AggregationStrategyDescription:
- class bspump.common.DictToJsonBytesParser(app, pipeline, id=None, config=None)[source]¶
Bases:
ProcessorDictToJsonBytesParser transforms a dictionary to JSON-string encoded in bytes. The encoding charset can be specified in the configuration in encoding field.
Sinks¶
PPrintSink¶
Pretty-prints events to console. Useful for debugging.
import bspump.common
sink = bspump.common.PPrintSink(app, pipeline)
NullSink¶
Discards all events. Useful for testing or when output is handled elsewhere.
sink = bspump.common.NullSink(app, pipeline)
Processors¶
JsonBytesToDictParser¶
Parses JSON bytes to Python dictionary.
processor = bspump.common.JsonBytesToDictParser(app, pipeline)
# Input: b'{"key": "value"}'
# Output: {"key": "value"}
DictToJsonBytesParser¶
Converts Python dictionary to JSON bytes.
processor = bspump.common.DictToJsonBytesParser(app, pipeline)
# Input: {"key": "value"}
# Output: b'{"key": "value"}'
MappingProcessor¶
Maps/renames event keys.
processor = bspump.common.MappingProcessor(app, pipeline, mapping={
"old_key": "new_key",
"source_field": "dest_field"
})
StdJsonToDictParser¶
Standard JSON parsing with error handling.
processor = bspump.common.StdJsonToDictParser(app, pipeline)
StdDictToJsonParser¶
Standard dictionary to JSON conversion.
processor = bspump.common.StdDictToJsonParser(app, pipeline)
StringToBytesParser¶
Converts string to bytes.
processor = bspump.common.StringToBytesParser(app, pipeline)
BytesToStringParser¶
Converts bytes to string.
processor = bspump.common.BytesToStringParser(app, pipeline)
RoutingProcessor¶
Routes events to different pipelines.
processor = bspump.common.RoutingProcessor(app, pipeline, routing={
"type_a": "PipelineA",
"type_b": "PipelineB"
})
FilterProcessor¶
Filters events based on a condition.
class MyFilter(bspump.common.FilterProcessor):
def filter(self, context, event):
return event.get("valid", False)
TeeProcessor¶
Duplicates events to another pipeline.
processor = bspump.common.TeeProcessor(app, pipeline, target_pipeline_id="AuditPipeline")
Utility Functions¶
These utilities are commonly used in pipelines:
JSON parsing and serialization
String/bytes conversion
Event routing and filtering
Debug output