bspump.lookup¶
Lookup tables for data enrichment.
- class bspump.lookup.IPGeoLookup(app, id=None, config=None)[source]¶
Bases:
DictionaryLookupThis lookup performs transformation of IP address into a geographical location. It uses a file database from ip2location.com. Lookup provides locations both in ipv4 and ipv6 formats. NOTICE: IPv6 database includes also all IPv4 locations, see “ipv4mapped” config. option.
Free versions: IP2LOCATION-LITE-DB5.IPV6.CSV and IP2LOCATION-LITE-DB5.IPV4.CSV For better precision visit https://lite.ip2location.com to buy a commercial version of database.
Usage: specify in configuration the path to the database in csv format.
- class bspump.lookup.MatrixLookup(app, matrix_id=None, dtype='float_', on_clock_update=False, id=None, config=None, lazy=False)[source]¶
Bases:
LookupNumpy Lookup
- class bspump.lookup.BitMapIndex(column, matrix, id=None)[source]¶
Bases:
Index
- class bspump.lookup.SliceIndex(column_start, column_end, matrix, resolution=None, id=None)[source]¶
Bases:
Index
Base Classes¶
Lookup¶
Base class for all lookups.
import bspump
class MyLookup(bspump.Lookup):
def get(self, key):
return self.data.get(key)
DictionaryLookup¶
Simple dictionary-based lookup.
lookup = bspump.DictionaryLookup(app, "StatusLookup", {
"1": "active",
"2": "inactive",
"3": "pending"
})
MappingLookup¶
Lookup with set/get operations.
lookup = bspump.MappingLookup(app, "MappingLookup")
lookup.set("key1", {"field": "value"})
lookup.set("key2", {"field": "other"})
File-based Lookups¶
JSONLookup¶
Load lookup data from JSON file.
import bspump.lookup
lookup = bspump.lookup.JSONLookup(app, "JSONLookup", config={
"path": "/data/lookup.json"
})
CSVLookup¶
Load lookup data from CSV file.
lookup = bspump.lookup.CSVLookup(app, "CSVLookup", config={
"path": "/data/lookup.csv",
"key_column": "id"
})
Using Lookups¶
Access lookups in processors:
class EnrichProcessor(bspump.Processor):
def __init__(self, app, pipeline, id=None, config=None):
super().__init__(app, pipeline, id, config)
svc = app.get_service("bspump.PumpService")
self.lookup = svc.locate_lookup("UserLookup")
def process(self, context, event):
user_id = event.get("user_id")
user_info = self.lookup.get(user_id)
if user_info:
event["user_name"] = user_info.get("name")
return event
Registering Lookups¶
app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
lookup = bspump.DictionaryLookup(app, "StatusLookup", {
"1": "active",
"2": "inactive"
})
svc.add_lookup(lookup)
Jupyter Registration¶
from bspump.jupyter import register_lookup
@register_lookup
def user_lookup(app):
return bspump.DictionaryLookup(app, "UserLookup", {
"u1": {"name": "Alice"},
"u2": {"name": "Bob"}
})
Configuration¶
[lookup:JSONLookup]
path=/data/lookup.json
reload_interval=3600
Custom Lookups¶
class RedisLookup(bspump.Lookup):
def __init__(self, app, lookup_id, redis_client):
super().__init__(app, lookup_id)
self.redis = redis_client
def get(self, key):
value = self.redis.get(key)
if value:
return json.loads(value)
return None
def set(self, key, value):
self.redis.set(key, json.dumps(value))