Lookup¶
Lookups provide data enrichment capabilities. They allow you to add context to events by looking up related data from various sources.
What are Lookups?¶
Lookups are key-value stores that can be:
Loaded from files (JSON, CSV)
Populated from databases
Built dynamically at runtime
Shared across pipelines
Using Lookups¶
Access lookups in processors:
import bspump
class EnrichProcessor(bspump.Processor):
def __init__(self, app, pipeline, id=None, config=None):
super().__init__(app, pipeline, id, config)
svc = app.get_service("bspump.PumpService")
self.lookup = svc.locate_lookup("UserLookup")
def process(self, context, event):
user_id = event.get("user_id")
user_info = self.lookup.get(user_id)
if user_info:
event["user_name"] = user_info.get("name")
return event
Built-in Lookup Types¶
DictionaryLookup
Simple in-memory key-value lookup:
import bspump
lookup = bspump.DictionaryLookup(app, "StatusLookup", {
"1": "active",
"2": "inactive",
"3": "pending"
})
MappingLookup
For more complex mapping scenarios:
import bspump
lookup = bspump.MappingLookup(app, "MappingLookup")
lookup.set("key1", {"field": "value"})
Loading from Files¶
Load lookup data from external files:
import bspump.lookup
# Load from JSON file
lookup = bspump.lookup.JSONLookup(app, "JSONLookup", config={
"path": "/data/lookup.json"
})
# Load from CSV file
lookup = bspump.lookup.CSVLookup(app, "CSVLookup", config={
"path": "/data/lookup.csv",
"key_column": "id"
})
Database-backed Lookups¶
Lookups can be populated from databases:
import bspump.postgresql
lookup = bspump.postgresql.PostgreSQLLookup(
app, "PostgreSQLLookup",
connection="PostgreSQLConnection",
config={
"query": "SELECT id, name, email FROM users",
"key": "id"
}
)
Registering Lookups¶
Register lookups with the application:
app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
lookup = bspump.DictionaryLookup(app, "StatusLookup", {
"1": "active",
"2": "inactive"
})
svc.add_lookup(lookup)
Jupyter Lookup Registration¶
In Jupyter notebooks:
from bspump.jupyter import *
@register_lookup
def status_lookup(app):
return bspump.DictionaryLookup(app, "StatusLookup", {
"1": "active",
"2": "inactive"
})
Lookup Updates¶
Lookups can be updated at runtime:
class DynamicLookup(bspump.MappingLookup):
async def load(self):
# Reload data periodically
data = await self.fetch_latest_data()
self.clear()
for key, value in data.items():
self.set(key, value)
Lookup Configuration¶
Configure lookups in pipelines.conf:
[lookup:StatusLookup]
path=/data/status.json
reload_interval=3600
Custom Lookups¶
Create custom lookups for specialized needs:
import bspump
class RedisLookup(bspump.Lookup):
def __init__(self, app, lookup_id, redis_connection):
super().__init__(app, lookup_id)
self.redis = redis_connection
def get(self, key):
return self.redis.get(key)
def set(self, key, value):
self.redis.set(key, value)