Elasticsearch Integration

BSPump provides Elasticsearch integration through bspump.elasticsearch.

Installation

pip install elasticsearch[async]

Components

  • ElasticSearchConnection: Shared connection to Elasticsearch

  • ElasticSearchSource: Reads documents from Elasticsearch

  • ElasticSearchSink: Writes documents to Elasticsearch

ElasticSearchConnection

import bspump.elasticsearch

connection = bspump.elasticsearch.ElasticSearchConnection(
    app, "ElasticSearchConnection"
)

Configuration:

[connection:ElasticSearchConnection]
url=http://localhost:9200

# Authentication (optional)
# url=https://user:password@localhost:9200

# Multiple nodes
# url=http://node1:9200,http://node2:9200,http://node3:9200

ElasticSearchSink

Index documents to Elasticsearch.

import bspump.elasticsearch

sink = bspump.elasticsearch.ElasticSearchSink(
    app, pipeline,
    connection="ElasticSearchConnection"
)

Configuration:

[pipeline:MyPipeline:ElasticSearchSink]
index=events
# Time-based index pattern (optional)
# index=events-%Y-%m-%d
# Bulk settings
bulk_size=500
bulk_timeout=5.0

ElasticSearchSource

Query documents from Elasticsearch.

import bspump.elasticsearch

source = bspump.elasticsearch.ElasticSearchSource(
    app, pipeline,
    connection="ElasticSearchConnection"
)

Configuration:

[pipeline:MyPipeline:ElasticSearchSource]
index=events
query={"match_all": {}}
scroll=5m
size=1000

Complete Example

from bspump.jupyter import *
import bspump.elasticsearch
import bspump.kafka
import json

@register_connection
def es_connection(app):
    return bspump.elasticsearch.ElasticSearchConnection(
        app, "ElasticSearchConnection"
    )

@register_connection
def kafka_connection(app):
    return bspump.kafka.KafkaConnection(app, "KafkaConnection")

auto_pipeline(
    source=lambda app, pipeline: bspump.kafka.KafkaSource(
        app, pipeline, connection="KafkaConnection"
    ),
    sink=lambda app, pipeline: bspump.elasticsearch.ElasticSearchSink(
        app, pipeline, connection="ElasticSearchConnection"
    ),
    name="KafkaToElasticPipeline",
)

# Prepare document for indexing
data = json.loads(event.decode("utf-8"))
event = {
    "_id": data.get("id"),  # Document ID
    "@timestamp": datetime.now().isoformat(),
    **data
}

Time-Based Indices

Create daily indices for time-series data:

[pipeline:MyPipeline:ElasticSearchSink]
index=logs-%Y-%m-%d

The index name is formatted with the current date.

Dynamic Index Routing

Route events to different indices:

class IndexRouter(bspump.Processor):
    def process(self, context, event):
        event_type = event.get("type", "default")
        context["elasticsearch_index"] = f"events-{event_type}"
        return event

Bulk Operations

Configure bulk indexing for performance:

[pipeline:MyPipeline:ElasticSearchSink]
index=events
bulk_size=1000      # Documents per bulk request
bulk_timeout=10.0   # Timeout for bulk operations

Configuration Reference

Connection Options

Option

Default

Description

url

http://localhost:9200

Elasticsearch URL(s)

Sink Options

Option

Default

Description

index

(required)

Index name or pattern

bulk_size

500

Documents per bulk request

bulk_timeout

5.0

Bulk timeout in seconds

Source Options

Option

Default

Description

index

(required)

Index to query

query

{“match_all”: {}}

Elasticsearch query DSL

scroll

5m

Scroll timeout

size

1000

Documents per scroll request