bspump.elasticsearch¶
Elasticsearch integration for BSPump.
- class bspump.elasticsearch.ElasticSearchConnection(app, id=None, config=None)[source]¶
Bases:
ConnectionDescription:
Sample Config
- url‘’http’://{ip/localhost}:{port}’
URL of the source. Could be multi-URL. Each URL should be separated by ‘;’ to a node in ElasticSearch cluster.
- username‘string’ , default = ‘ ‘
Used when authentication is required
- password‘string’, default = ‘ ‘
Used when authentication is required
- loader_per_urlint, default = 4
Number of parallel loaders per URL.
- output_queue_max_sizeint, default = 10
Maximum queue size.
- bulk_out_max_size? * ? * ?, default = 12 * 1024 * 1024
??
- timeoutint, default = 300
Timeout value.
- fail_log_max_sizeint, default = 20
Maximum size of failed log messages.
- precise_error_handlingbool, default = False
If True all Errors will be logged, If false soft errors will be omitted in the Logs.
- ConfigDefaults: dict = {'bulk_out_max_size': 12582912, 'fail_log_max_size': 20, 'loader_per_url': 4, 'output_queue_max_size': 10, 'password': '', 'precise_error_handling': False, 'timeout': 300, 'url': 'http://localhost:9200/', 'username': ''}¶
- __init__(app, id=None, config=None)[source]¶
Description:
Parameters
- appApplication
Name of the Application
- idID, default= None
ID
- configJSON or dict, default= None
configuration file with additional information for the methods.
- get_session()[source]¶
Returns current Client Session Authentication and Loop
- Returns:
aiohttp.ClientSession(auth=self._auth, loop=self.Loop)
- Returns:
- consume(index, data_feeder_generator, bulk_class=<class 'bspump.elasticsearch.connection.ElasticSearchBulk'>)[source]¶
Checks the content of data_feeder_generator and bulk and if There is data to be send it calls enqueue method.
Parameters
index :
data_feeder_generator :
- bulk_class=ElasticSearchBulk :
creates a instance of the ElasticSearchBulk class
- class bspump.elasticsearch.ElasticSearchSink(app, pipeline, connection, id=None, config=None, bulk_class=<class 'bspump.elasticsearch.connection.ElasticSearchBulk'>, data_feeder=<function data_feeder_create_or_index>)[source]¶
Bases:
SinkElasticSearchSink allows you to insert events into ElasticSearch through POST requests
The following attributes can be passed to the context and thus override the default behavior of the sink:
es_index (STRING): ElasticSearch index name
data_feeder accepts the event as its only parameter and yields data as Python generator The example implementation is:
- def data_feeder_create_or_index(event):
_id = event.pop(“_id”, None)
- if _id is None:
yield b’{“create”:{}}
- ‘
- else:
- yield orjson.dumps(
{“index”: {“_id”: _id}}, option=orjson.OPT_APPEND_NEWLINE
)
yield orjson.dumps(event, option=orjson.OPT_APPEND_NEWLINE)
- __init__(app, pipeline, connection, id=None, config=None, bulk_class=<class 'bspump.elasticsearch.connection.ElasticSearchBulk'>, data_feeder=<function data_feeder_create_or_index>)[source]¶
Description:
Parameters
- appApplication
Name of the Application
- pipelinePipeline
Name of the Pipeline
- connectionConnection
Name of the Connection
- idID, default= None
ID
- configJSON, default= None
Configuration file with additional information.
bulk_class=ElasticBulk :
data_feeder=data_feeder_create_or_index :
- class bspump.elasticsearch.ElasticSearchSource(app, pipeline, connection, request_body=None, paging=True, id=None, config=None)[source]¶
Bases:
TriggerSourceDescription:
- __init__(app, pipeline, connection, request_body=None, paging=True, id=None, config=None)[source]¶
Parameters
- appApplication
Name of the Application.
- pipelinePipeline
Name of the Pipeline.
- connectionConnection
Information of the connection.
- request_body JSON, default = None
Request body needed for the request API call.
paging : ?, default = True
- idID, default = None
ID
- configJSON/dict, default = None
Configuration file with additional information.
- class bspump.elasticsearch.ElasticSearchAggsSource(app, pipeline, connection, request_body=None, id=None, config=None)[source]¶
Bases:
TriggerSourceDescription:
- __init__(app, pipeline, connection, request_body=None, id=None, config=None)[source]¶
Description:
Parameters
- appApplication
Name of the Application.
- pipelinePipeline
Name of the Pipeline.
- connectionConnection
Information of the connection.
- request_body JSON, default = None
Request body needed for the request API call.
- idID, default = None
ID info
- configJSON/dict, default = None
configuration file with additional information.
- async process_aggs(path, aggs_name, aggs)[source]¶
Description:
Parameters
path :
aggs_name :
agss :
- async process_buckets(path, parent, buckets)[source]¶
Recursive function for buckets processing. It iterates through keys of the dictionary, looking for ‘buckets’ or ‘value’. If there are ‘buckets’, calls itself, if there is ‘value’, calls process_aggs and sends an event to process
Parameters
path :
parent :
buckets :
- class bspump.elasticsearch.ElasticSearchLookup(app, connection, id=None, config=None, cache=None, lazy=False)[source]¶
Bases:
MappingLookup,AsyncLookupMixinThe lookup that is linked with a ES. It provides a mapping (dictionary-like) interface to pipelines. It feeds lookup data from ES using a query. It also has a simple cache to reduce a number of database hits.
configs
index - Elastic’s index
key - field name to match
timefield - field name to use for sorting
sort_order - order of sorting (default is ‘desc’)
scroll_timeout - Timeout of single scroll request (default is ‘1m’). Allowed time units: https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#time-units
Example:
The ElasticSearchLookup can be then located and used inside a custom enricher: class AsyncEnricher(bspump.Generator): def __init__(self, app, pipeline, id=None, config=None): super().__init__(app, pipeline, id, config) svc = app.get_service("bspump.PumpService") self.Lookup = svc.locate_lookup("MySQLLookup") async def generate(self, context, event, depth): if 'user' not in event: return None info = await self.Lookup.get(event['user']) # Inject a new event into a next depth of the pipeline self.Pipeline.inject(context, event, depth)
- ConfigDefaults: dict = {'cache_non_existent': False, 'index': '', 'key': '', 'scroll_timeout': '1m', 'sort_order': 'desc'}¶
- __init__(app, connection, id=None, config=None, cache=None, lazy=False)[source]¶
Description:
Parameters
- appApplication
Name of the Application.
- connectionConnection
Name of the Connection
- idID, default= None
ID
- configJSON, default= None
Configuration file with additional information.
cache : ?,default= None
lazy : ?, default= None
- async get(key)[source]¶
Obtain the value from lookup asynchronously.
Parameters
key : ?
- Returns:
value
- build_find_one_query(key)[source]¶
Override this method to build your own lookup query
Parameters
key : ?
- Returns:
Default single-key query
- Return type:
- classmethod construct(app, definition)[source]¶
Constructs config, id, and connection based on config.
Parameters
- appApplication
Name of the Application.
- definition:dictDefinition
Definition containing information about certain variables.
- Returns:
cls(app, newid, connection, config)
- Parameters:
definition (dict)
ElasticSearchConnection¶
Connection to Elasticsearch cluster.
import bspump.elasticsearch
connection = bspump.elasticsearch.ElasticSearchConnection(
app, "ElasticSearchConnection"
)
Configuration:
[connection:ElasticSearchConnection]
url=http://localhost:9200
Options:
url- Elasticsearch URL(s), comma-separated for multiple nodes
URL Examples:
# Single node
url=http://localhost:9200
# With authentication
url=https://user:password@localhost:9200
# Multiple nodes
url=http://node1:9200,http://node2:9200,http://node3:9200
ElasticSearchSource¶
Queries documents from Elasticsearch.
source = bspump.elasticsearch.ElasticSearchSource(
app, pipeline,
connection="ElasticSearchConnection"
)
Configuration:
[pipeline:MyPipeline:ElasticSearchSource]
index=events
query={"match_all": {}}
scroll=5m
size=1000
ElasticSearchSink¶
Indexes documents to Elasticsearch.
sink = bspump.elasticsearch.ElasticSearchSink(
app, pipeline,
connection="ElasticSearchConnection"
)
Configuration:
[pipeline:MyPipeline:ElasticSearchSink]
index=events
bulk_size=500
bulk_timeout=5.0
Time-Based Indices:
index=events-%Y-%m-%d
Dynamic Index Routing¶
class IndexRouter(bspump.Processor):
def process(self, context, event):
context["elasticsearch_index"] = f"events-{event['type']}"
return event
Document ID¶
Set the document ID:
class DocumentProcessor(bspump.Processor):
def process(self, context, event):
# Event dict can include _id for document ID
event["_id"] = event.pop("id")
return event
Example Pipeline¶
import bspump
import bspump.elasticsearch
import bspump.kafka
class KafkaToElasticPipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.kafka.KafkaSource(app, self, connection="KafkaConnection"),
TimestampProcessor(app, self),
bspump.elasticsearch.ElasticSearchSink(
app, self, connection="ElasticSearchConnection"
),
)