import json
import logging
import requests
from ..abc.lookup import MappingLookup
from ..abc.lookup import AsyncLookupMixin
from ..cache import CacheDict
L = logging.getLogger(__name__)
[docs]
class ElasticSearchLookup(MappingLookup, AsyncLookupMixin):
"""
The lookup that is linked with a ES.
It provides a mapping (dictionary-like) interface to pipelines.
It feeds lookup data from ES using a query.
It also has a simple cache to reduce a number of database hits.
**configs**
*index* - Elastic's index
*key* - field name to match
*timefield* - field name to use for sorting
*sort_order* - order of sorting (default is 'desc')
*scroll_timeout* - Timeout of single scroll request (default is '1m'). Allowed time units:
https://www.elastic.co/guide/en/elasticsearch/reference/current/common-options.html#time-units
Example:
.. code:: python
The ElasticSearchLookup can be then located and used inside a custom enricher:
class AsyncEnricher(bspump.Generator):
def __init__(self, app, pipeline, id=None, config=None):
super().__init__(app, pipeline, id, config)
svc = app.get_service("bspump.PumpService")
self.Lookup = svc.locate_lookup("MySQLLookup")
async def generate(self, context, event, depth):
if 'user' not in event:
return None
info = await self.Lookup.get(event['user'])
# Inject a new event into a next depth of the pipeline
self.Pipeline.inject(context, event, depth)
"""
ConfigDefaults = {
"index": "", # Specify an index
"key": "", # Specify field name to match
"cache_non_existent": False, # True - if key is not found, it will be cached as None
"sort_order": "desc",
"scroll_timeout": "1m",
}
[docs]
def __init__(self, app, connection, id=None, config=None, cache=None, lazy=False):
"""
Description:
**Parameters**
app : Application
Name of the Application.
connection : Connection
Name of the Connection
id : ID, default= None
ID
config : JSON, default= None
Configuration file with additional information.
cache : ?,default= None
lazy : ?, default= None
"""
super().__init__(app, id=id, config=config, lazy=lazy)
self.Connection = connection
self.Index = self.Config["index"]
self.ScrollTimeout = self.Config["scroll_timeout"]
self.Key = self.Config["key"]
self.CacheNonExistent = {"True": True, "False": False}.get(
self.Config["cache_non_existent"], False
)
self.Timefield = self.Config.get("timefield")
self.SortOrder = self.Config["sort_order"]
self.Count = -1
if cache is None:
self.Cache = CacheDict()
else:
self.Cache = cache
metrics_service = app.get_service("asab.MetricsService")
self.CacheCounter = metrics_service.create_counter(
"es.lookup.cache", tags={}, init_values={"hit": 0, "miss": 0}
)
self.SuccessCounter = metrics_service.create_counter(
"es.lookup.success", tags={}, init_values={"hit": 0, "miss": 0}
)
async def _find_one(self, key):
prefix = "_search"
request = {"size": 1, "query": self.build_find_one_query(key)}
if self.Timefield:
request["sort"] = [{self.Timefield: self.SortOrder}]
url = self.Connection.get_url() + "{}/{}".format(self.Index, prefix)
async with self.Connection.get_session() as session:
async with session.post(
url, json=request, headers={"Content-Type": "application/json"}
) as response:
if response.status != 200:
data = await response.text()
L.error(
"Failed to fetch data from ElasticSearch: {} from {}\n{}".format(
response.status, url, data
)
)
msg = await response.json()
try:
hit = msg["hits"]["hits"][0]
except Exception:
return None
return hit["_source"]
[docs]
async def get(self, key):
"""
Obtain the value from lookup asynchronously.
**Parameters**
key : ?
:return: value
|
"""
value = None
try:
value = self.Cache[key]
self.CacheCounter.add("hit", 1)
except KeyError:
try:
value = await self._find_one(key)
if value is not None:
self.Cache[key] = value
self.CacheCounter.add("miss", 1)
elif self.CacheNonExistent:
self.Cache[key] = None
self.CacheCounter.add("miss", 1)
except Exception as e:
L.warn("There was an exception {}".format(e))
if value is None:
self.SuccessCounter.add("miss", 1)
else:
self.SuccessCounter.add("hit", 1)
return value
[docs]
def build_find_one_query(self, key) -> dict:
"""
Override this method to build your own lookup query
**Parameters**
key : ?
:return: Default single-key query
|
"""
return {"match": {self.Key: key}}
async def _count(self):
prefix = "_count"
request = {"query": {"match_all": {}}}
url = self.Connection.get_url() + "{}/{}".format(self.Index, prefix)
async with self.Connection.get_session() as session:
async with session.post(
url, json=request, headers={"Content-Type": "application/json"}
) as response:
if response.status != 200:
data = await response.text()
L.error(
"Failed to fetch data from ElasticSearch: {} from {}\n{}".format(
response.status, url, data
)
)
msg = await response.json()
return int(msg["count"])
[docs]
async def load(self):
"""
Sets the length of Cache to Count.
:return: True
|
"""
self.Count = len(self.Cache)
return True
def __len__(self):
return self.Count
def __getitem__(self, key):
# To avoid synchronous operations completely
raise NotImplementedError()
def __iter__(self):
scroll_id = None
request = {"size": 10000, "query": {"match_all": {}}}
all_hits = []
while True:
if scroll_id is None:
path = "{}/_search?scroll={}".format(self.Index, self.ScrollTimeout)
request_body = request
else:
path = "_search/scroll"
request_body = {"scroll": self.ScrollTimeout, "scroll_id": scroll_id}
url = self.Connection.get_url() + path
response = requests.post(url, json=request_body)
if response.status_code != 200:
data = response.text()
L.error(
"Failed to fetch data from ElasticSearch: {} from {}\n{}".format(
response.status_code, url, data
)
)
break
data = json.loads(response.text)
scroll_id = data.get("_scroll_id")
if scroll_id is None:
break
hits = data["hits"]["hits"]
if len(hits) == 0:
break
all_hits.extend(hits)
self.Iterator = all_hits.__iter__()
return self
def __next__(self):
element = next(self.Iterator)
key = element["_source"].get(self.Key)
if key is not None:
self.Cache[key] = element["_source"]
return key
[docs]
@classmethod
def construct(cls, app, definition: dict):
"""
Constructs config, id, and connection based on config.
**Parameters**
app : Application
Name of the Application.
definition:dict : Definition
Definition containing information about certain variables.
:return: cls(app, newid, connection, config)
|
"""
newid = definition.get("id")
config = definition.get("config")
connection = definition["args"]["connection"]
return cls(app, newid, connection, config)