Source code for bspump.elasticsearch.sink

import logging

from ..abc.sink import Sink
from .connection import ElasticSearchBulk

from .data_feeder import data_feeder_create_or_index

#

L = logging.getLogger(__name__)

#


[docs] class ElasticSearchSink(Sink): """ ElasticSearchSink allows you to insert events into ElasticSearch through POST requests The following attributes can be passed to the context and thus override the default behavior of the sink: es_index (STRING): ElasticSearch index name data_feeder accepts the event as its only parameter and yields data as Python generator The example implementation is: def data_feeder_create_or_index(event): _id = event.pop("_id", None) if _id is None: yield b'{"create":{}}\n' else: yield orjson.dumps( {"index": {"_id": _id}}, option=orjson.OPT_APPEND_NEWLINE ) yield orjson.dumps(event, option=orjson.OPT_APPEND_NEWLINE) | """ ConfigDefaults = { "index_prefix": "bspump_", # Obsolete, use 'index' "index": "bspump_", }
[docs] def __init__( self, app, pipeline, connection, id=None, config=None, bulk_class=ElasticSearchBulk, data_feeder=data_feeder_create_or_index, ): """ Description: **Parameters** app : Application Name of the Application pipeline : Pipeline Name of the Pipeline connection : Connection Name of the Connection id : ID, default= None ID config : JSON, default= None Configuration file with additional information. bulk_class=ElasticBulk : data_feeder=data_feeder_create_or_index : | """ super().__init__(app, pipeline, id=id, config=config) self.Connection = pipeline.locate_connection(app, connection) self.BulkClass = bulk_class self.Index = self.Config.get("index") # intex_prefix is obsolete. It is supported currently ensure backward compatibility if ( self.Index == "bspump_" and self.Config.get("index_prefix") != "bspump_" and len(self.Config.get("index_prefix")) > 0 ): L.warning( "The 'index_prefix' has been renamed to 'index', please adjust the configuration." ) self.Index = self.Config.get("index_prefix") if data_feeder is None: raise RuntimeError("data_feeder must not be None.") self.__data_feeder = data_feeder app.PubSub.subscribe( "ElasticSearchConnection.pause!", self._connection_throttle ) app.PubSub.subscribe( "ElasticSearchConnection.unpause!", self._connection_throttle )
[docs] def process(self, context, event): """ Description: **Parameters** context : event : any data type Information with timestamp. """ try: _id = event.pop("_id", None) except TypeError: if isinstance(event, dict) is False: L.error( "You are trying to pass event of type: {} to ElasticSearchSink, but only dict is supported".format( type(event) ) ) raise self.Connection.consume( context.get("es_index", self.Index), self.__data_feeder(event, _id), bulk_class=self.BulkClass, )
def _connection_throttle(self, event_name, connection): if connection != self.Connection: return if event_name == "ElasticSearchConnection.pause!": self.Pipeline.throttle(self, True) elif event_name == "ElasticSearchConnection.unpause!": self.Pipeline.throttle(self, False) else: raise RuntimeError("Unexpected event name '{}'".format(event_name))