bspump.elasticsearch

Elasticsearch integration for BSPump.

class bspump.elasticsearch.ElasticSearchConnection(app, id=None, config=None)[source]

Bases: Connection

Description:

Sample Config

url‘’http’://{ip/localhost}:{port}’

URL of the source. Could be multi-URL. Each URL should be separated by ‘;’ to a node in ElasticSearch cluster.

username‘string’ , default = ‘ ‘

Used when authentication is required

password‘string’, default = ‘ ‘

Used when authentication is required

loader_per_urlint, default = 4

Number of parallel loaders per URL.

output_queue_max_sizeint, default = 10

Maximum queue size.

bulk_out_max_size? * ? * ?, default = 12 * 1024 * 1024

??

timeoutint, default = 300

Timeout value.

fail_log_max_sizeint, default = 20

Maximum size of failed log messages.

precise_error_handlingbool, default = False

If True all Errors will be logged, If false soft errors will be omitted in the Logs.

ConfigDefaults: dict = {'bulk_out_max_size': 12582912, 'fail_log_max_size': 20, 'loader_per_url': 4, 'output_queue_max_size': 10, 'password': '', 'precise_error_handling': False, 'timeout': 300, 'url': 'http://localhost:9200/', 'username': ''}
__init__(app, id=None, config=None)[source]

Description:

Parameters

appApplication

Name of the Application

idID, default= None

ID

configJSON or dict, default= None

configuration file with additional information for the methods.

get_url()[source]
Returns:

list of URLS of nodes connected to the cluster

get_session()[source]

Returns current Client Session Authentication and Loop

Returns:

aiohttp.ClientSession(auth=self._auth, loop=self.Loop)

Returns:

consume(index, data_feeder_generator, bulk_class=<class 'bspump.elasticsearch.connection.ElasticSearchBulk'>)[source]

Checks the content of data_feeder_generator and bulk and if There is data to be send it calls enqueue method.

Parameters

index :

data_feeder_generator :

bulk_class=ElasticSearchBulk :

creates a instance of the ElasticSearchBulk class

flush(forced=False)[source]

It goes through the list of bulks and calls enqueue for each of them.

Parameters

forced : bool, default = False

enqueue(bulk)[source]

Properly enqueue the bulk.

Parameters

bulk :

class bspump.elasticsearch.ElasticSearchSink(app, pipeline, connection, id=None, config=None, bulk_class=<class 'bspump.elasticsearch.connection.ElasticSearchBulk'>, data_feeder=<function data_feeder_create_or_index>)[source]

Bases: Sink

ElasticSearchSink allows you to insert events into ElasticSearch through POST requests

The following attributes can be passed to the context and thus override the default behavior of the sink:

es_index (STRING): ElasticSearch index name

data_feeder accepts the event as its only parameter and yields data as Python generator The example implementation is:

def data_feeder_create_or_index(event):

_id = event.pop(“_id”, None)

if _id is None:

yield b’{“create”:{}}

else:
yield orjson.dumps(

{“index”: {“_id”: _id}}, option=orjson.OPT_APPEND_NEWLINE

)

yield orjson.dumps(event, option=orjson.OPT_APPEND_NEWLINE)


ConfigDefaults: dict = {'index': 'bspump_', 'index_prefix': 'bspump_'}
__init__(app, pipeline, connection, id=None, config=None, bulk_class=<class 'bspump.elasticsearch.connection.ElasticSearchBulk'>, data_feeder=<function data_feeder_create_or_index>)[source]

Description:

Parameters

appApplication

Name of the Application

pipelinePipeline

Name of the Pipeline

connectionConnection

Name of the Connection

idID, default= None

ID

configJSON, default= None

Configuration file with additional information.

bulk_class=ElasticBulk :

data_feeder=data_feeder_create_or_index :


process(context, event)[source]

Description:

Parameters

context :

eventany data type

Information with timestamp.

class bspump.elasticsearch.ElasticSearchSource(app, pipeline, connection, request_body=None, paging=True, id=None, config=None)[source]

Bases: TriggerSource

Description:

ConfigDefaults: dict = {'index': 'index-*', 'scroll_timeout': '1m'}
__init__(app, pipeline, connection, request_body=None, paging=True, id=None, config=None)[source]

Parameters

appApplication

Name of the Application.

pipelinePipeline

Name of the Pipeline.

connectionConnection

Information of the connection.

request_body JSON, default = None

Request body needed for the request API call.

paging : ?, default = True

idID, default = None

ID

configJSON/dict, default = None

Configuration file with additional information.

async cycle()[source]

Gets data from Elastic and injects them into the pipeline.

class bspump.elasticsearch.ElasticSearchAggsSource(app, pipeline, connection, request_body=None, id=None, config=None)[source]

Bases: TriggerSource

Description:

ConfigDefaults: dict = {'index': 'index-*'}
__init__(app, pipeline, connection, request_body=None, id=None, config=None)[source]

Description:

Parameters

appApplication

Name of the Application.

pipelinePipeline

Name of the Pipeline.

connectionConnection

Information of the connection.

request_body JSON, default = None

Request body needed for the request API call.

idID, default = None

ID info

configJSON/dict, default = None

configuration file with additional information.

async cycle()[source]

Sets request body and path to create query call.


async process_aggs(path, aggs_name, aggs)[source]

Description:

Parameters

path :

aggs_name :

agss :

async process_buckets(path, parent, buckets)[source]

Recursive function for buckets processing. It iterates through keys of the dictionary, looking for ‘buckets’ or ‘value’. If there are ‘buckets’, calls itself, if there is ‘value’, calls process_aggs and sends an event to process

Parameters

path :

parent :

buckets :

class bspump.elasticsearch.ElasticSearchLookup(app, connection, id=None, config=None, cache=None, lazy=False)[source]

Bases: MappingLookup, AsyncLookupMixin

The lookup that is linked with a ES. It provides a mapping (dictionary-like) interface to pipelines. It feeds lookup data from ES using a query. It also has a simple cache to reduce a number of database hits.

configs

index - Elastic’s index

key - field name to match

timefield - field name to use for sorting

sort_order - order of sorting (default is ‘desc’)

scroll_timeout - Timeout of single scroll request (default is ‘1m’). Allowed time units: https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#time-units

Example:

The ElasticSearchLookup can be then located and used inside a custom enricher:

                class AsyncEnricher(bspump.Generator):

                                def __init__(self, app, pipeline, id=None, config=None):
                                                super().__init__(app, pipeline, id, config)
                                                svc = app.get_service("bspump.PumpService")
                                                self.Lookup = svc.locate_lookup("MySQLLookup")

                                async def generate(self, context, event, depth):
                                                if 'user' not in event:
                                                                return None

                                                info = await self.Lookup.get(event['user'])

                                                # Inject a new event into a next depth of the pipeline
                                                self.Pipeline.inject(context, event, depth)
ConfigDefaults: dict = {'cache_non_existent': False, 'index': '', 'key': '', 'scroll_timeout': '1m', 'sort_order': 'desc'}
__init__(app, connection, id=None, config=None, cache=None, lazy=False)[source]

Description:

Parameters

appApplication

Name of the Application.

connectionConnection

Name of the Connection

idID, default= None

ID

configJSON, default= None

Configuration file with additional information.

cache : ?,default= None

lazy : ?, default= None

async get(key)[source]

Obtain the value from lookup asynchronously.

Parameters

key : ?

Returns:

value


build_find_one_query(key)[source]

Override this method to build your own lookup query

Parameters

key : ?

Returns:

Default single-key query

Return type:

dict


async load()[source]

Sets the length of Cache to Count.

Returns:

True


classmethod construct(app, definition)[source]

Constructs config, id, and connection based on config.

Parameters

appApplication

Name of the Application.

definition:dictDefinition

Definition containing information about certain variables.

Returns:

cls(app, newid, connection, config)

Parameters:

definition (dict)


ElasticSearchConnection

Connection to Elasticsearch cluster.

import bspump.elasticsearch

connection = bspump.elasticsearch.ElasticSearchConnection(
    app, "ElasticSearchConnection"
)

Configuration:

[connection:ElasticSearchConnection]
url=http://localhost:9200

Options:

  • url - Elasticsearch URL(s), comma-separated for multiple nodes

URL Examples:

# Single node
url=http://localhost:9200

# With authentication
url=https://user:password@localhost:9200

# Multiple nodes
url=http://node1:9200,http://node2:9200,http://node3:9200

ElasticSearchSource

Queries documents from Elasticsearch.

source = bspump.elasticsearch.ElasticSearchSource(
    app, pipeline,
    connection="ElasticSearchConnection"
)

Configuration:

[pipeline:MyPipeline:ElasticSearchSource]
index=events
query={"match_all": {}}
scroll=5m
size=1000

ElasticSearchSink

Indexes documents to Elasticsearch.

sink = bspump.elasticsearch.ElasticSearchSink(
    app, pipeline,
    connection="ElasticSearchConnection"
)

Configuration:

[pipeline:MyPipeline:ElasticSearchSink]
index=events
bulk_size=500
bulk_timeout=5.0

Time-Based Indices:

index=events-%Y-%m-%d

Dynamic Index Routing

class IndexRouter(bspump.Processor):
    def process(self, context, event):
        context["elasticsearch_index"] = f"events-{event['type']}"
        return event

Document ID

Set the document ID:

class DocumentProcessor(bspump.Processor):
    def process(self, context, event):
        # Event dict can include _id for document ID
        event["_id"] = event.pop("id")
        return event

Example Pipeline

import bspump
import bspump.elasticsearch
import bspump.kafka

class KafkaToElasticPipeline(bspump.Pipeline):
    def __init__(self, app, pipeline_id):
        super().__init__(app, pipeline_id)
        self.build(
            bspump.kafka.KafkaSource(app, self, connection="KafkaConnection"),
            TimestampProcessor(app, self),
            bspump.elasticsearch.ElasticSearchSink(
                app, self, connection="ElasticSearchConnection"
            ),
        )