Source code for bspump.abc.lookup
import asyncio
import collections.abc
import json
import logging
from typing import Optional
import bspump.asab as asab
from .lookupprovider import LookupProviderABC
###
L = logging.getLogger(__name__)
###
[docs]
class Lookup(asab.Configurable):
"""
Description:
|
:return:
"""
ConfigDefaults = {
"source_url": "", # Specifies complete url to source file
# zk://zookeeper1:2181/base/path/to/config.yaml
# zk:///base/path/to/config.yaml == zk:/base/path/to/config.yaml
# zk:///./path/to/config.yaml == zk:/./path/to/config.yaml
# http://localhost:8080/path/to/config.yaml
# file:/root/path/to/config.yaml == /root/path/to/config.yaml
# Backwards compatibility
"master_url": "http://localhost:8080",
"master_url_endpoint": "/bspump/v1/lookup/",
"master_lookup_id": "", # If not empty, it specify the lookup id that will be used for loading from master
}
[docs]
def __init__(self, app, id=None, config=None, lazy=False):
"""
Description:
"""
_id = id if id is not None else self.__class__.__name__
super().__init__("lookup:{}".format(_id), config=config)
self.Id = _id
self.App = app
self.Loop = app.Loop
self.Lazy = lazy
self.PubSub = asab.PubSub(app)
self.MasterURL = None
self.Provider: Optional[LookupProviderABC] = None
url = self.Config.get("source_url", "").strip()
if len(url) == 0:
# Construct URL from the old "master_" params
server = self.Config.get("master_url", "").strip()
if len(server) == 0:
L.error(
"Neither `source_url` nor `master_url` specified in lookup {} config.".format(
self.Id
)
)
return
if not server.startswith("http:"):
server = "http://{}".format(server)
server = server.strip("/")
master_url_endpoint = self.Config["master_url_endpoint"].strip("/")
master_lookup_id = self.Config["master_lookup_id"]
if master_lookup_id == "":
master_lookup_id = self.Id
url = "{}/{}/{}".format(server, master_url_endpoint, master_lookup_id)
self._create_provider(url)
def __getitem__(self, key):
raise NotImplementedError(
"Lookup '{}' __getitem__() method not implemented".format(self.Id)
)
def __iter__(self):
raise NotImplementedError(
"Lookup '{}' __iter__() method not implemented".format(self.Id)
)
def __len__(self):
raise NotImplementedError(
"Lookup '{}' __len__() method not implemented".format(self.Id)
)
def __contains__(self, item):
raise NotImplementedError(
"Lookup '{}' __contains__() method not implemented".format(self.Id)
)
def _create_provider(self, path: str):
"""
Description:
:return:
"""
if path.startswith("zk:"):
from bspump.zookeeper import ZooKeeperBatchLookupProvider
self.Provider = ZooKeeperBatchLookupProvider(self, path)
self.MasterURL = path
elif path.startswith("http:") or path.startswith("https:"):
from bspump.http import HTTPBatchLookupProvider
config = {}
if "use_cache" in self.Config:
config["use_cache"] = self.Config.getboolean("use_cache")
if "cache_dir" in self.Config:
config["cache_dir"] = self.Config.get("source_url", None)
self.Provider = HTTPBatchLookupProvider(self, path, config=config)
self.MasterURL = path
else:
from bspump.file import FileBatchLookupProvider
# Local file source -> lookup is considered master
self.Provider = FileBatchLookupProvider(self, path)
self.MasterURL = None
[docs]
def time(self):
"""
Description:
:return: time
|
"""
return self.App.time()
[docs]
def ensure_future_update(self, loop):
"""
Description:
:return:
|
"""
return asyncio.ensure_future(self._do_update())
async def _do_update(self):
"""
Description:
:return:
"""
updated = await self.load()
if updated:
L.warning(f"{self.Id} bspump.Lookup.changed!")
self.PubSub.publish("bspump.Lookup.changed!")
[docs]
async def load(self) -> bool:
"""
Description:
"""
data = await self.Provider.load()
if data is None or data is False:
L.warning("No data loaded from {}.".format(self.Provider.Id))
return False
self.deserialize(data)
return True
[docs]
def serialize(self):
"""
Description:
"""
raise NotImplementedError(
"Lookup '{}' serialize() method not implemented".format(self.Id)
)
[docs]
def deserialize(self, data):
"""
Description:
|
"""
raise NotImplementedError(
"Lookup '{}' deserialize() method not implemented".format(self.Id)
)
[docs]
def rest_get(self):
"""
Description:
:return:
"""
response = {"Id": self.Id}
if self.Provider.ETag is not None:
response["ETag"] = self.Provider.ETag
if self.MasterURL is not None:
response["MasterURL"] = self.MasterURL
return response
[docs]
def is_master(self):
"""
Description:
:return: ??
|
"""
return self.MasterURL is None
[docs]
class MappingLookup(Lookup, collections.abc.Mapping):
"""
Description:
|
"""
pass
[docs]
class AsyncLookupMixin(Lookup):
"""
Description:
"""
[docs]
async def get(self, key):
raise NotImplementedError()
[docs]
class DictionaryLookup(MappingLookup):
"""
Description:
"""
[docs]
def __init__(self, app, id=None, config=None, lazy=False):
"""
Description:
|
"""
self.Dictionary = {}
super().__init__(app, id, config=config, lazy=lazy)
def __getitem__(self, key):
return self.Dictionary.__getitem__(key)
def __iter__(self):
return self.Dictionary.__iter__()
def __len__(self):
return self.Dictionary.__len__()
[docs]
def serialize(self):
"""
Description:
:return: json data
|
"""
return (json.dumps(self.Dictionary)).encode("utf-8")
[docs]
def deserialize(self, data):
"""
Description:
|
"""
self.Dictionary.update(json.loads(data.decode("utf-8")))
# REST
[docs]
def rest_get(self):
"""
Description:
:return: rest
|
"""
rest = super().rest_get()
rest["Dictionary"] = self.Dictionary
return rest
[docs]
def set(self, dictionary: dict):
"""
Description:
|
"""
if self.is_master() is False:
L.warning("'master_url' provided, set() method can not be used")
self.Dictionary.clear()
self.Dictionary.update(dictionary)