Source code for bspump.file.lookupprovider
import logging
import os
import bspump.asab.proactor as proactor
from bspump.abc.lookupprovider import LookupBatchProviderABC
###
L = logging.getLogger(__name__)
###
[docs]
class FileBatchLookupProvider(LookupBatchProviderABC):
"""
Loads lookup data from a file on local filesystem.
|
"""
[docs]
def __init__(self, lookup, url, id=None, config=None):
super().__init__(lookup, url, id, config)
self.App.add_module(proactor.Module)
self.ProactorService = self.App.get_service("asab.ProactorService")
[docs]
async def load(self):
"""
Description:
:returns: result
|
"""
result = await self.ProactorService.execute(
self.load_on_thread,
)
return result
[docs]
def load_on_thread(self):
"""
Description:
|
"""
if not os.path.isfile(self.URL):
L.warning("Source '{}' is not a file".format(self.URL))
return None
if not os.access(self.URL, os.R_OK):
L.warning("Insufficient permissions: '{}'".format(self.URL))
return None
try:
with open(self.URL, "rb") as f:
data = f.read()
return data
except Exception as e:
L.warning("Failed to read content of file '{}': {}".format(self.URL, e))
return None
[docs]
async def save(self, data):
"""
**Parameters**
data :
"""
dirname = os.path.dirname(self.URL)
if not os.path.isdir(dirname):
os.makedirs(dirname)
with open(self.URL, "wb") as fo:
fo.write(data)