Source code for bspump.elasticsearch.lookup

import json
import logging

import requests

from ..abc.lookup import MappingLookup
from ..abc.lookup import AsyncLookupMixin
from ..cache import CacheDict


L = logging.getLogger(__name__)


[docs] class ElasticSearchLookup(MappingLookup, AsyncLookupMixin): """ The lookup that is linked with a ES. It provides a mapping (dictionary-like) interface to pipelines. It feeds lookup data from ES using a query. It also has a simple cache to reduce a number of database hits. **configs** *index* - Elastic's index *key* - field name to match *timefield* - field name to use for sorting *sort_order* - order of sorting (default is 'desc') *scroll_timeout* - Timeout of single scroll request (default is '1m'). Allowed time units: https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#time-units Example: .. code:: python The ElasticSearchLookup can be then located and used inside a custom enricher: class AsyncEnricher(bspump.Generator): def __init__(self, app, pipeline, id=None, config=None): super().__init__(app, pipeline, id, config) svc = app.get_service("bspump.PumpService") self.Lookup = svc.locate_lookup("MySQLLookup") async def generate(self, context, event, depth): if 'user' not in event: return None info = await self.Lookup.get(event['user']) # Inject a new event into a next depth of the pipeline self.Pipeline.inject(context, event, depth) """ ConfigDefaults = { "index": "", # Specify an index "key": "", # Specify field name to match "cache_non_existent": False, # True - if key is not found, it will be cached as None "sort_order": "desc", "scroll_timeout": "1m", }
[docs] def __init__(self, app, connection, id=None, config=None, cache=None, lazy=False): """ Description: **Parameters** app : Application Name of the Application. connection : Connection Name of the Connection id : ID, default= None ID config : JSON, default= None Configuration file with additional information. cache : ?,default= None lazy : ?, default= None """ super().__init__(app, id=id, config=config, lazy=lazy) self.Connection = connection self.Index = self.Config["index"] self.ScrollTimeout = self.Config["scroll_timeout"] self.Key = self.Config["key"] self.CacheNonExistent = {"True": True, "False": False}.get( self.Config["cache_non_existent"], False ) self.Timefield = self.Config.get("timefield") self.SortOrder = self.Config["sort_order"] self.Count = -1 if cache is None: self.Cache = CacheDict() else: self.Cache = cache metrics_service = app.get_service("asab.MetricsService") self.CacheCounter = metrics_service.create_counter( "es.lookup.cache", tags={}, init_values={"hit": 0, "miss": 0} ) self.SuccessCounter = metrics_service.create_counter( "es.lookup.success", tags={}, init_values={"hit": 0, "miss": 0} )
async def _find_one(self, key): prefix = "_search" request = {"size": 1, "query": self.build_find_one_query(key)} if self.Timefield: request["sort"] = [{self.Timefield: self.SortOrder}] url = self.Connection.get_url() + "{}/{}".format(self.Index, prefix) async with self.Connection.get_session() as session: async with session.post( url, json=request, headers={"Content-Type": "application/json"} ) as response: if response.status != 200: data = await response.text() L.error( "Failed to fetch data from ElasticSearch: {} from {}\n{}".format( response.status, url, data ) ) msg = await response.json() try: hit = msg["hits"]["hits"][0] except Exception: return None return hit["_source"]
[docs] async def get(self, key): """ Obtain the value from lookup asynchronously. **Parameters** key : ? :return: value | """ value = None try: value = self.Cache[key] self.CacheCounter.add("hit", 1) except KeyError: try: value = await self._find_one(key) if value is not None: self.Cache[key] = value self.CacheCounter.add("miss", 1) elif self.CacheNonExistent: self.Cache[key] = None self.CacheCounter.add("miss", 1) except Exception as e: L.warn("There was an exception {}".format(e)) if value is None: self.SuccessCounter.add("miss", 1) else: self.SuccessCounter.add("hit", 1) return value
[docs] def build_find_one_query(self, key) -> dict: """ Override this method to build your own lookup query **Parameters** key : ? :return: Default single-key query | """ return {"match": {self.Key: key}}
async def _count(self): prefix = "_count" request = {"query": {"match_all": {}}} url = self.Connection.get_url() + "{}/{}".format(self.Index, prefix) async with self.Connection.get_session() as session: async with session.post( url, json=request, headers={"Content-Type": "application/json"} ) as response: if response.status != 200: data = await response.text() L.error( "Failed to fetch data from ElasticSearch: {} from {}\n{}".format( response.status, url, data ) ) msg = await response.json() return int(msg["count"])
[docs] async def load(self): """ Sets the length of Cache to Count. :return: True | """ self.Count = len(self.Cache) return True
def __len__(self): return self.Count def __getitem__(self, key): # To avoid synchronous operations completely raise NotImplementedError() def __iter__(self): scroll_id = None request = {"size": 10000, "query": {"match_all": {}}} all_hits = [] while True: if scroll_id is None: path = "{}/_search?scroll={}".format(self.Index, self.ScrollTimeout) request_body = request else: path = "_search/scroll" request_body = {"scroll": self.ScrollTimeout, "scroll_id": scroll_id} url = self.Connection.get_url() + path response = requests.post(url, json=request_body) if response.status_code != 200: data = response.text() L.error( "Failed to fetch data from ElasticSearch: {} from {}\n{}".format( response.status_code, url, data ) ) break data = json.loads(response.text) scroll_id = data.get("_scroll_id") if scroll_id is None: break hits = data["hits"]["hits"] if len(hits) == 0: break all_hits.extend(hits) self.Iterator = all_hits.__iter__() return self def __next__(self): element = next(self.Iterator) key = element["_source"].get(self.Key) if key is not None: self.Cache[key] = element["_source"] return key
[docs] @classmethod def construct(cls, app, definition: dict): """ Constructs config, id, and connection based on config. **Parameters** app : Application Name of the Application. definition:dict : Definition Definition containing information about certain variables. :return: cls(app, newid, connection, config) | """ newid = definition.get("id") config = definition.get("config") connection = definition["args"]["connection"] return cls(app, newid, connection, config)