bspump.common

Common utilities, processors, and sinks for general use.

class bspump.common.BytesToStringParser(app, pipeline, id=None, config=None)[source]

Bases: Processor

Description:


ConfigDefaults: dict = {'encoding': 'utf-8'}
__init__(app, pipeline, id=None, config=None)[source]

Description:


process(context, event)[source]

Description:

Returns:

event.decode(self.Encoding)


class bspump.common.StringToBytesParser(app, pipeline, id=None, config=None)[source]

Bases: Processor

Description:


** Default Config **

encoding : utf-8

ConfigDefaults: dict = {'encoding': 'utf-8'}
__init__(app, pipeline, id=None, config=None)[source]

Description:


process(context, event)[source]

Description:

Returns:

event.decode(self.Encoding)


class bspump.common.FlattenDictProcessor(app, pipeline, id=None, config=None)[source]

Bases: Processor

Description: ….

Example:

“person”: {
“details”: {

“first_name”: “John”, “last_name”: “Doe”

}, “address”: {

“country”: “GB”, “city”: “London”, “postal_code”: “WC2N 5DU”

}

}

Gets converted to:

{

“person.details.first_name”: “John”, “person.details.last_name”: “Doe”, “person.address.country”: “GB”, “person.address.city”: “London”, “person.address.postal_code”: “WC2N 5DU”

}

ConfigDefaults: dict = {'separator': '.'}
__init__(app, pipeline, id=None, config=None)[source]

Description:


flatten(nested_dict)[source]

Description:

Returns:

flattened_dict


process(context, event)[source]

Description:

Returns:

event


class bspump.common.HexlifyProcessor(app, pipeline, id=None, config=None)[source]

Bases: Processor

Description:


process(context, event)[source]

Description:

Returns:

binascii.hexlify(event)


class bspump.common.IteratorGenerator(app, pipeline, id=None, config=None)[source]

Bases: Generator

Description:


async generate(context, event, depth)[source]

Description:


class bspump.common.IteratorSource(app, pipeline, iterator, id=None, config=None)[source]

Bases: TriggerSource

Description:


Parameters:

iterator (Iterator)

__init__(app, pipeline, iterator, id=None, config=None)[source]

Description:


Parameters:

iterator (Iterator)

async cycle(*args, **kwags)[source]

Description:


class bspump.common.StdDictToJsonParser(app, pipeline, id=None, config=None)[source]

Bases: Processor

Description:


process(context, event)[source]

Description:

Returns:

?


class bspump.common.StdJsonToDictParser(app, pipeline, id=None, config=None)[source]

Bases: Processor

Description:


process(context, event)[source]

Description:

Returns:

???


class bspump.common.MappingKeysGenerator(app, pipeline, id=None, config=None)[source]

Bases: Generator

Description:


async generate(context, event, depth)[source]

Description:

class bspump.common.MappingValuesGenerator(app, pipeline, id=None, config=None)[source]

Bases: Generator

Description:


async generate(context, event, depth)[source]

Description:


class bspump.common.MappingItemsGenerator(app, pipeline, id=None, config=None)[source]

Bases: Generator

Description:


async generate(context, event, depth)[source]

Description:


class bspump.common.MappingKeysProcessor(app, pipeline, id=None, config=None)[source]

Bases: Processor

Description: Mapping Keys Processor


process(context, event)[source]

Description: process is a method of a Mapping Keys Processor

Returns:

event.keys()

Parameters:

event (Mapping)

Return type:

list


class bspump.common.MappingValuesProcessor(app, pipeline, id=None, config=None)[source]

Bases: Processor

Description:


process(context, event)[source]

Description:

Returns:

event.values()

Parameters:

event (Mapping)

Return type:

list


class bspump.common.MappingItemsProcessor(app, pipeline, id=None, config=None)[source]

Bases: Processor

Description:


process(context, event)[source]

Description:

Returns:

event.items()

Parameters:

event (Mapping)

Return type:

list


class bspump.common.NullSink(app, pipeline, id=None, config=None)[source]

Bases: Sink

Description:


process(context, event)[source]

Description:


class bspump.common.PrintSink(app, pipeline, id=None, config=None, stream=None)[source]

Bases: Sink

Description:


__init__(app, pipeline, id=None, config=None, stream=None)[source]

Description:


process(context, event)[source]

Description:


class bspump.common.PPrintSink(app, pipeline, id=None, config=None, stream=None)[source]

Bases: Sink

Description:


__init__(app, pipeline, id=None, config=None, stream=None)[source]

Description:


process(context, event)[source]

Description:


class bspump.common.PrintProcessor(app, pipeline, id=None, config=None, stream=None)[source]

Bases: Processor

Description:


__init__(app, pipeline, id=None, config=None, stream=None)[source]

Description:


process(context, event)[source]

Description:


Returns:

event

class bspump.common.PPrintProcessor(app, pipeline, id=None, config=None, stream=None)[source]

Bases: Processor

Description:


__init__(app, pipeline, id=None, config=None, stream=None)[source]

Description:


process(context, event)[source]

Description:


Returns:

event

class bspump.common.PrintContextProcessor(app, pipeline, id=None, config=None, stream=None)[source]

Bases: Processor

Description:


__init__(app, pipeline, id=None, config=None, stream=None)[source]

Description:


process(context, event)[source]

Description:


Returns:

event

class bspump.common.PPrintContextProcessor(app, pipeline, id=None, config=None, stream=None)[source]

Bases: Processor

Description:


__init__(app, pipeline, id=None, config=None, stream=None)[source]

Description:


process(context, event)[source]

Description:


Returns:

event

class bspump.common.InternalSource(app, pipeline, id=None, config=None)[source]

Bases: Source

Description:


ConfigDefaults: dict = {'backpressure': 0.8, 'queue_max_size': 10}
__init__(app, pipeline, id=None, config=None)[source]

Description:


put(context, event, copy_context=False, copy_event=False)[source]

Description: Context can be an empty dictionary if is not provided.

If you are getting a asyncio.queues.QueueFull exception, you likely did not implemented backpressure handling. The simplest approach is to use RouterSink / RouterProcessor.


async put_async(context, event, copy_context=False, copy_event=False)[source]

Description: This method allows to put an event into InternalSource asynchronously. Since a processing in the pipeline is synchronous, this method is useful mainly for situation, when an event is created outside of the pipeline processing. It is designed to handle situation when the queue is becoming full.

Context can be an empty dictionary if is not provided.


async main()[source]

Description:


rest_get()[source]

Description:

Returns:

rest


class bspump.common.RouterProcessor(app, pipeline, id=None, config=None)[source]

Bases: Processor, RouterMixIn

Description: Abstract Processor that dispatches events to other internal sources. One should override the process() method and call route() with target source id.


__init__(app, pipeline, id=None, config=None)[source]

Description:


class bspump.common.RouterSink(app, pipeline, id=None, config=None)[source]

Bases: Sink, RouterMixIn

Description: Abstract Sink that dispatches events to other internal sources. One should override the process() method and call route() with target source id.


__init__(app, pipeline, id=None, config=None)[source]

Initializes the Parameters

Parameters

appobject

Application object.

pipelinePipeline

Name of the Pipeline.

idstr, default=None,

ID of the class of config.

configJSON, or other compatible formats, default=None

Configuration file.

class bspump.common.TeeProcessor(app, pipeline, id=None, config=None)[source]

Bases: RouterProcessor

Description: See TeeSource for details.


ConfigDefaults: dict = {}
__init__(app, pipeline, id=None, config=None)[source]

Description:


bind(target)[source]

Description: Target is a bspump.PumpService.locate() string

Returns:

?

Parameters:

target (str)


unbind(target)[source]

Description:

Returns:

?

Parameters:

target (str)


process(context, event)[source]

Description:

Returns:

event


class bspump.common.TeeSource(app, pipeline, id=None, config=None)[source]

Bases: InternalSource

Description:

class SamplePipeline(bspump.Pipeline):

def __init__(self, app, pipeline_id):

super().__init__(app, pipeline_id)

self.build(

bspump.socket.TCPStreamSource(app, self, config={‘port’: 7000}), bspump.common.TeeProcessor(app, self).bind(“SampleTeePipeline.*TeeSource”), bspump.common.PPrintSink(app, self)

)

class SampleTeePipeline(bspump.Pipeline):

def __init__(self, app, pipeline_id):

super().__init__(app, pipeline_id)

self.build(

bspump.common.TeeSource(app, self), bspump.common.PPrintSink(app, self)

)


__init__(app, pipeline, id=None, config=None)[source]

Description:


bind(target)[source]

Description:

Returns:


async main()[source]

Description:

Returns:


class bspump.common.TimeZoneNormalizer(app, pipeline, id=None, config=None)[source]

Bases: Processor

Description: Normalizes datetime from local timezone (e.g. in config) to UTC, which is preferred internal datetime form


ConfigDefaults: dict = {'timezone': 'CET'}
__init__(app, pipeline, id=None, config=None)[source]

Description:


normalize(local_time)[source]

Description: If local_time doesn’t contain a time zone (e.g. it is naive), the timezone will be added from config

Returns:

Normalized local_time in UTC

Parameters:

local_time (datetime)

Return type:

datetime


process(context, event)[source]

Description: Abstract method to process the event. Must be customized.

Example

>>> native_time = event["@timestamp"]
>>> local_time = self.normalize(native_time)
>>> event["@timestamp"] = local_time

class bspump.common.MappingTransformator(app, pipeline, id=None, config=None)[source]

Bases: Processor

Description:


__init__(app, pipeline, id=None, config=None)[source]

Description:


build(app)[source]

Description:


process(context, event)[source]

Description:

Returns:

dict(map(self._map, event.items()))

Parameters:

event (Mapping)

Return type:

dict


class bspump.common.DirectSource(app, pipeline, id=None, config=None)[source]

Bases: Source

Description: This source processes inserted event synchronously.


__init__(app, pipeline, id=None, config=None)[source]

Description:


put(context, event, copy_context=False, copy_event=False)[source]

This method serves to put events into the pipeline and process them right away.

Context can be an empty dictionary if is not provided.


async main()[source]

Description:


class bspump.common.AggregationStrategy[source]

Bases: ABC

Aggregation Strategy


abstractmethod append(context, event)[source]

Appends

Parameters

context :

event :

abstractmethod flush()[source]

Flushes


abstractmethod is_empty()[source]

Description:


Return type:

bool

class bspump.common.Aggregator(app, pipeline, aggregation_strategy=<bspump.common.aggregator.ListAggregationStrategy object>, id=None, config=None)[source]

Bases: Generator

Description:


Parameters:

aggregation_strategy (AggregationStrategy)

ConfigDefaults: dict = {'completion_interval': 0, 'completion_size': 10, 'completion_timeout': 0}
__init__(app, pipeline, aggregation_strategy=<bspump.common.aggregator.ListAggregationStrategy object>, id=None, config=None)[source]

Description:


Parameters:

aggregation_strategy (AggregationStrategy)

flush()[source]

Description:

Returns:

??


process(context, event)[source]

Description:

Parameters

context :

event :

async generate(context, aggregated_event, depth)[source]

Description:

Parameters

context :

aggregated_event :

depth :

class bspump.common.ListAggregationStrategy[source]

Bases: AggregationStrategy

Description: … test


__init__()[source]

Description:


Return type:

None

append(context, event)[source]

Description:

Parameters

context :

event :

flush()[source]

Description:

Returns:

result


is_empty()[source]

Description:

Returns:

Aggregated Event

Return type:

bool


class bspump.common.ListEventAggregationStrategy[source]

Bases: AggregationStrategy

Description:


__init__()[source]

Description:


Return type:

None

append(context, event)[source]

Description:

Parameters

context :

event :

flush()[source]

Description:

Returns:

result


is_empty()[source]

Description:

Returns:

Aggregated event

Return type:

bool


class bspump.common.StringAggregationStrategy(delimiter='\n')[source]

Bases: AggregationStrategy

Description:


__init__(delimiter='\n')[source]

Description:

Return type:

None

append(context, event)[source]

Description:

Parameters

context :

eventData with time stamp stored in any data type usually is in JSON.

You can specify an event that is passed to the method.

flush()[source]

Description:

Returns:

result


is_empty()[source]

Description:

Returns:

Aggregated event

Return type:

bool


class bspump.common.DictToJsonBytesParser(app, pipeline, id=None, config=None)[source]

Bases: Processor

DictToJsonBytesParser transforms a dictionary to JSON-string encoded in bytes. The encoding charset can be specified in the configuration in encoding field.

ConfigDefaults: dict = {'encoding': 'utf-8'}
__init__(app, pipeline, id=None, config=None)[source]

Initializes the Parameters

Parameters

appobject

Application object.

pipelinePipeline

Name of the Pipeline.

idstr, default=None,

ID of the class of config.

configJSON, or other compatible formats, default=None

Configuration file.

process(context, event)[source]

Can be implemented to return event based on a given logic.

Parameters

context :

Additional information passed to the method.

eventData with time stamp stored in any data type, usually it is in JSON.

You can specify an event that is passed to the method.

Sinks

PPrintSink

Pretty-prints events to console. Useful for debugging.

import bspump.common

sink = bspump.common.PPrintSink(app, pipeline)

NullSink

Discards all events. Useful for testing or when output is handled elsewhere.

sink = bspump.common.NullSink(app, pipeline)

Processors

JsonBytesToDictParser

Parses JSON bytes to Python dictionary.

processor = bspump.common.JsonBytesToDictParser(app, pipeline)

# Input: b'{"key": "value"}'
# Output: {"key": "value"}

DictToJsonBytesParser

Converts Python dictionary to JSON bytes.

processor = bspump.common.DictToJsonBytesParser(app, pipeline)

# Input: {"key": "value"}
# Output: b'{"key": "value"}'

MappingProcessor

Maps/renames event keys.

processor = bspump.common.MappingProcessor(app, pipeline, mapping={
    "old_key": "new_key",
    "source_field": "dest_field"
})

StdJsonToDictParser

Standard JSON parsing with error handling.

processor = bspump.common.StdJsonToDictParser(app, pipeline)

StdDictToJsonParser

Standard dictionary to JSON conversion.

processor = bspump.common.StdDictToJsonParser(app, pipeline)

StringToBytesParser

Converts string to bytes.

processor = bspump.common.StringToBytesParser(app, pipeline)

BytesToStringParser

Converts bytes to string.

processor = bspump.common.BytesToStringParser(app, pipeline)

RoutingProcessor

Routes events to different pipelines.

processor = bspump.common.RoutingProcessor(app, pipeline, routing={
    "type_a": "PipelineA",
    "type_b": "PipelineB"
})

FilterProcessor

Filters events based on a condition.

class MyFilter(bspump.common.FilterProcessor):
    def filter(self, context, event):
        return event.get("valid", False)

TeeProcessor

Duplicates events to another pipeline.

processor = bspump.common.TeeProcessor(app, pipeline, target_pipeline_id="AuditPipeline")

Utility Functions

These utilities are commonly used in pipelines:

  • JSON parsing and serialization

  • String/bytes conversion

  • Event routing and filtering

  • Debug output