Source code for bspump.elasticsearch.connection

import asyncio
import logging
import random
import re

import aiohttp

from ..abc.connection import Connection

#

L = logging.getLogger(__name__)

#


class ElasticSearchBulk(object):
    """
    Description:

    """

    def __init__(self, connection, index, max_size):
        """
        Initializes the variables

        **Parameters**

        connection : Connection
                        Name of the Connection.

        index : str
                        ???

        max_size : int
                        Maximal size of bulks.

        """
        self.Index = index
        self.Aging = 0
        self.Capacity = max_size
        self.Items = []
        self.InsertMetric = connection.InsertMetric
        self.FailLogMaxSize = connection.FailLogMaxSize
        self.FilterPath = connection.FilterPath

    def consume(self, data_feeder_generator):
        """
        Appends all items in data_feeder_generator to Items list. Consumer also resets Aging and Capacity.

        **Parameters**

        data_feeder_generator : list
                        list of our data that will be passed to a generator and later Uploaded to ElasticSearch.

        :return: self.Capacity <= 0

        """
        for item in data_feeder_generator:
            self.Items.append(item)
            self.Capacity -= len(item)

        self.Aging = 0
        return self.Capacity <= 0

    async def _get_data_from_items(self):
        """
        Description:

        :return:
        """
        for item in self.Items:
            yield item

    async def upload(self, url, session, timeout):
        """
        Uploads data to Elastic Search.

        **Parameters**

        url : string
                        Uses URL from config to connect to ElasticSearch Rest API.

        session : ?
                        ?

        timeout : int
                        uses timeout value from config. Value of time for how long we want to be connected to ElasticSearch.

        :return: ?

        |

        """
        items_count = len(self.Items)
        if items_count == 0:
            return

        url = url + "{}/_bulk?filter_path={}".format(self.Index, self.FilterPath)

        try:
            resp = await session.post(
                url,
                data=self._get_data_from_items(),
                headers={"Content-Type": "application/json"},
                timeout=timeout,
            )
        except OSError as e:
            # This means that there was a hard error such as network or DNS failure
            # Likely no communication took place with ElasticSearch
            L.warn("{}".format(e))
            return False

        # Obtain the response from ElasticSearch, which should always be a json
        try:
            resp_body = await resp.json()
        except Exception as e:
            # We received something else than JSON, that's bad
            # Let's assume that the bulk did not reach ElasticSearch
            L.warn("{}".format(e))
            return False

        if resp.status == 200:
            # Check that all documents were successfully inserted to ElasticSearch
            # If there are no error messages, we are done here
            if not resp_body.get("errors", False):
                self.InsertMetric.add("ok", items_count)
                return True

            # Some of the documents were not inserted properly,
            # usually because of attributes mismatch, see:
            # https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html

            response_items = resp_body.get("items")
            self.partial_error_callback(response_items)

            # When the log handling is not precise,
            # the iteration only happens on error items natively
            # Log first 20 errors
            counter = 0
            for response_item in response_items:
                if "error" not in response_item.get("index", ""):
                    continue

                if counter < self.FailLogMaxSize:
                    L.error(
                        "Failed to insert document into ElasticSearch: '{}'".format(
                            str(response_item)
                        )
                    )

                counter += 1

            # Show remaining log messages
            if counter > self.FailLogMaxSize:
                L.error(
                    "Failed to insert document into ElasticSearch: '{}' more insertions of documents failed".format(
                        counter - self.FailLogMaxSize
                    )
                )

            # Insert metrics
            self.InsertMetric.add("fail", counter)
            self.InsertMetric.add("ok", items_count - counter)

        else:
            # The major ElasticSearch error occurred while inserting documents, response was not 200
            self.InsertMetric.add("fail", items_count)
            L.error(
                "Failed to insert document into ElasticSearch status:{} body:{}".format(
                    resp.status, resp_body
                )
            )
            return self.full_error_callback(self.Items, resp.status)

        return True

    def partial_error_callback(self, response_items):
        """
        Description: When an upload to ElasticSearch fails for error items (document could not be inserted),
        this callback is called.

        **Parameters**

        response_items :

        :param response_items: list with dict items: {"index": {"_id": ..., "error": ...}}

        :return:
        """
        L.error(
            "Failed to insert items in the elasticsearch: {}".format(
                response_items[:10]
            )
        )

    def full_error_callback(self, bulk_items, return_code):
        """
        Description: When an upload to ElasticSearch fails b/c of ElasticSearch error,
        this callback is called.

        **Parameters**

        bulk_items : list
                        list with tuple items: (_id, data)

        return_code :
                        ElasticSearch return code

        :return: False if the bulk is to be resumbitted again

        |

        """
        return False


[docs] class ElasticSearchConnection(Connection): """ Description: **Sample Config** url : ''http'://{ip/localhost}:{port}' URL of the source. Could be multi-URL. Each URL should be separated by ';' to a node in ElasticSearch cluster. username : 'string' , default = ' ' Used when authentication is required password : 'string', default = ' ' Used when authentication is required loader_per_url : int, default = 4 Number of parallel loaders per URL. output_queue_max_size : int, default = 10 Maximum queue size. bulk_out_max_size : ? * ? * ?, default = 12 * 1024 * 1024 ?? timeout : int, default = 300 Timeout value. fail_log_max_size : int, default = 20 Maximum size of failed log messages. precise_error_handling : bool, default = False If True all Errors will be logged, If false soft errors will be omitted in the Logs. """ ConfigDefaults = { "url": "http://localhost:9200/", # Could be multi-URL. Each URL should be separated by ';' to a node in ElasticSearch cluster "username": "", "password": "", "loader_per_url": 4, # Number of parallel loaders per URL "output_queue_max_size": 10, "bulk_out_max_size": 12 * 1024 * 1024, "timeout": 300, "fail_log_max_size": 20, "precise_error_handling": False, }
[docs] def __init__(self, app, id=None, config=None): """ Description: **Parameters** app : Application Name of the Application id : ID, default= None ID config : JSON or dict, default= None configuration file with additional information for the methods. """ super().__init__(app, id=id, config=config) self._output_queue_max_size = int(self.Config["output_queue_max_size"]) self._output_queue = asyncio.Queue() username = self.Config.get("username") password = self.Config.get("password") if username == "": self._auth = None else: self._auth = aiohttp.BasicAuth(login=username, password=password) # Contains URLs of each node in the cluster self.node_urls = [] # for url in self.Config['url'].split(';'): for url in re.split(r"\s+", self.Config["url"]): url = url.strip() if len(url) == 0: continue if url[-1] != "/": url += "/" self.node_urls.append(url) self._loader_per_url = int(self.Config["loader_per_url"]) self._bulk_out_max_size = int(self.Config["bulk_out_max_size"]) self._bulks = {} self._timeout = float(self.Config["timeout"]) self._started = True self.Loop = app.Loop self.PubSub = app.PubSub self.PubSub.subscribe("Application.run!", self._start) self.PubSub.subscribe("Application.exit!", self._on_exit) self._futures = [] for url in self.node_urls: for i in range(self._loader_per_url): self._futures.append((url, None)) self.FailLogMaxSize = int(self.Config["fail_log_max_size"]) # Precise error handling if self.Config.getboolean("precise_error_handling"): self.FilterPath = "errors,took,items.*.error,items.*._id" else: self.FilterPath = "errors,took,items.*.error" # Create metrics counters metrics_service = app.get_service("asab.MetricsService") self.InsertMetric = metrics_service.create_counter( "elasticsearch.insert", init_values={ "ok": 0, "fail": 0, }, ) self.QueueMetric = metrics_service.create_gauge( "elasticsearch.outputqueue", init_values={ "size": 0, }, )
[docs] def get_url(self): """ :return: list of URLS of nodes connected to the cluster """ return random.choice(self.node_urls)
[docs] def get_session(self): """ Returns current Client Session Authentication and Loop :return: aiohttp.ClientSession(auth=self._auth, loop=self.Loop) :return: """ return aiohttp.ClientSession(auth=self._auth)
[docs] def consume(self, index, data_feeder_generator, bulk_class=ElasticSearchBulk): """ Checks the content of data_feeder_generator and bulk and if There is data to be send it calls enqueue method. **Parameters** index : data_feeder_generator : bulk_class=ElasticSearchBulk : creates a instance of the ElasticSearchBulk class """ if data_feeder_generator is None: return bulk = self._bulks.get(index) if bulk is None: bulk = bulk_class(self, index, self._bulk_out_max_size) self._bulks[index] = bulk if bulk.consume(data_feeder_generator): # Bulk is ready, schedule to be send del self._bulks[index] self.enqueue(bulk)
def _start(self, event_name): """ Description: :return: """ self.PubSub.subscribe("Application.tick!", self._on_tick) self._on_tick("simulated!") async def _on_exit(self, event_name): """ Description: :return: """ # Wait till the queue is empty self.flush(forced=True) while self._output_queue.qsize() > 0: self.flush(forced=True) await asyncio.sleep(1) if self._output_queue.qsize() > 0: L.warn( "Still have {} bulk in output queue".format( self._output_queue.qsize() ) ) self._started = False # Wait till the _loader() terminates (one after another) pending = [item[1] for item in self._futures] self._futures = [] while len(pending) > 0: # By sending None via queue, we signalize end of life await self._output_queue.put(None) done, pending = await asyncio.wait( pending, return_when=asyncio.FIRST_COMPLETED ) def _on_tick(self, event_name): """ Description: :return: """ self.QueueMetric.set("size", int(self._output_queue.qsize())) for i in range(len(self._futures)): # 1) Check for exited futures url, future = self._futures[i] if future is not None and future.done(): # Ups, _loader() task crashed during runtime, we need to restart it try: future.result() if self._started: L.error("ElasticSearch issue detected, will retry shortly") except Exception as e: L.exception( f"ElasticSearch issue detected '{e}', will retry shortly" ) self._futures[i] = (url, None) # 2) Start _loader() futures that are exited if self._started: url, future = self._futures[i] if future is None: future = asyncio.ensure_future(self._loader(url)) self._futures[i] = (url, future) self.flush()
[docs] def flush(self, forced=False): """ It goes through the list of bulks and calls enqueue for each of them. **Parameters** forced : bool, default = False """ aged = [] for index, bulk in self._bulks.items(): if bulk is None: continue bulk.Aging += 1 if (bulk.Aging >= 2) or forced: aged.append(index) for index in aged: bulk = self._bulks.pop(index) self.enqueue(bulk)
[docs] def enqueue(self, bulk): """ Properly enqueue the bulk. **Parameters** bulk : """ self._output_queue.put_nowait(bulk) # Signalize need for throttling if self._output_queue.qsize() == self._output_queue_max_size: self.PubSub.publish("ElasticSearchConnection.pause!", self)
async def _loader(self, url): """ Description: :return: """ async with self.get_session() as session: # Preflight check try: async with session.get(url + "_cluster/health") as resp: await resp.json() if resp.status != 200: L.error( "Cluster is not ready", struct_data={"status": resp.status} ) await asyncio.sleep(5) # Throttle a bit before next try return except aiohttp.client_exceptions.ServerDisconnectedError: L.error("Cluster is not ready, server disconnected or not ready") await asyncio.sleep(5) # Throttle a bit before next try return except OSError as e: L.error("{}, cluster is not ready".format(e)) await asyncio.sleep(5) # Throttle a bit before next try return except aiohttp.client_exceptions.ContentTypeError as e: L.error("Failed communication {}".format(e)) await asyncio.sleep(20) # Throttle a lot before next try return except GeneratorExit as e: L.info("Generator exited {}".format(e)) return # Push bulks into the ElasticSearch while self._started: bulk = await self._output_queue.get() if bulk is None: break if self._output_queue.qsize() == self._output_queue_max_size - 1: self.PubSub.publish("ElasticSearchConnection.unpause!", self) success = await bulk.upload(url, session, self._timeout) if not success: # Requeue the bulk for another delivery attempt to ES self.enqueue(bulk) await asyncio.sleep(5) # Throttle a bit before next try break # Exit the loader (new will be started automatically) # Make sure the memory is emptied bulk.Items = []