Source code for bspump.elasticsearch.source

import logging

from ..abc.source import TriggerSource

L = logging.getLogger(__name__)


[docs] class ElasticSearchSource(TriggerSource): """ Description: """ ConfigDefaults = { "index": "index-*", "scroll_timeout": "1m", }
[docs] def __init__( self, app, pipeline, connection, request_body=None, paging=True, id=None, config=None, ): """ **Parameters** app : Application Name of the `Application <https://asab.readthedocs.io/en/latest/asab/application.html>`_. pipeline : Pipeline Name of the Pipeline. connection : Connection Information of the connection. request_body JSON, default = None Request body needed for the request API call. paging : ?, default = True id : ID, default = None ID config : JSON/dict, default = None Configuration file with additional information. """ super().__init__(app, pipeline, id=id, config=config) self.Connection = pipeline.locate_connection(app, connection) self.Index = self.Config["index"] self.ScrollTimeout = self.Config["scroll_timeout"] self.Paging = paging if request_body is not None: self.RequestBody = request_body else: self.RequestBody = {"query": {"bool": {"must": {"match_all": {}}}}}
[docs] async def cycle(self): """ Gets data from Elastic and injects them into the pipeline. """ scroll_id = None while True: if scroll_id is None: path = "{}/_search?scroll={}".format(self.Index, self.ScrollTimeout) request_body = self.RequestBody else: path = "_search/scroll" request_body = {"scroll": self.ScrollTimeout, "scroll_id": scroll_id} url = self.Connection.get_url() + path async with self.Connection.get_session() as session: async with session.post( url, json=request_body, headers={"Content-Type": "application/json"} ) as response: if response.status != 200: data = await response.text() L.error( "Failed to fetch data from ElasticSearch: {} from {}\n{}".format( response.status, url, data ) ) break msg = await response.json() scroll_id = msg.get("_scroll_id") if scroll_id is None: break hits = msg["hits"]["hits"] if len(hits) == 0: break # Feed messages into a pipeline for hit in hits: await self.process(hit["_source"]) if not self.Paging: break
[docs] class ElasticSearchAggsSource(TriggerSource): """ Description: """ ConfigDefaults = { "index": "index-*", }
[docs] def __init__( self, app, pipeline, connection, request_body=None, id=None, config=None ): """ Description: **Parameters** app : Application Name of the `Application <https://asab.readthedocs.io/en/latest/asab/application.html>`_. pipeline : Pipeline Name of the Pipeline. connection : Connection Information of the connection. request_body JSON, default = None Request body needed for the request API call. id : ID, default = None ID info config : JSON/dict, default = None configuration file with additional information. """ super().__init__(app, pipeline, id=id, config=config) self.Connection = pipeline.locate_connection(app, connection) self.Index = self.Config["index"] if request_body is not None: self.RequestBody = request_body else: self.RequestBody = {"query": {"bool": {"must": {"match_all": {}}}}}
[docs] async def cycle(self): """ Sets request body and path to create query call. | """ request_body = self.RequestBody path = "{}/_search?".format(self.Index) url = self.Connection.get_url() + path async with self.Connection.get_session() as session: async with session.post( url, json=request_body, headers={"Content-Type": "application/json"} ) as response: if response.status != 200: data = await response.text() L.error( "Failed to fetch data from ElasticSearch: {} from {}\n{}".format( response.status, url, data ) ) return msg = await response.json() aggs = msg["aggregations"] if len(aggs) == 0: return start_name = list(aggs.keys())[0] start = aggs[start_name] path = {} await self.process_aggs(path, start_name, start)
[docs] async def process_aggs(self, path, aggs_name, aggs): """ Description: **Parameters** path : aggs_name : agss : """ if "buckets" in aggs: await self.process_buckets(path, aggs_name, aggs["buckets"]) if "value" in aggs: path[aggs_name] = aggs["value"] event = {} event.update(path) await self.process(event) path.pop(aggs_name)
[docs] async def process_buckets(self, path, parent, buckets): """ Recursive function for buckets processing. It iterates through keys of the dictionary, looking for 'buckets' or 'value'. If there are 'buckets', calls itself, if there is 'value', calls process_aggs and sends an event to process **Parameters** path : parent : buckets : """ for bucket in buckets: for k in bucket.keys(): if k == "key": path[parent] = bucket[k] elif isinstance(bucket[k], dict): await self.process_aggs(path, k, bucket[k])