bspump.lookup

Lookup tables for data enrichment.

class bspump.lookup.IPGeoLookup(app, id=None, config=None)[source]

Bases: DictionaryLookup

This lookup performs transformation of IP address into a geographical location. It uses a file database from ip2location.com. Lookup provides locations both in ipv4 and ipv6 formats. NOTICE: IPv6 database includes also all IPv4 locations, see “ipv4mapped” config. option.

Free versions: IP2LOCATION-LITE-DB5.IPV6.CSV and IP2LOCATION-LITE-DB5.IPV4.CSV For better precision visit https://lite.ip2location.com to buy a commercial version of database.

Usage: specify in configuration the path to the database in csv format.

ConfigDefaults: dict = {'ipv4mapped': 'no', 'path': ''}
__init__(app, id=None, config=None)[source]

Description:


async load()[source]

Description:

set(tree)[source]

Description:


rest_get()[source]

Description:

Returns:

rest


sorted_array_to_bst(arr)[source]
search(value)[source]
lookup_location_ipv4(address)[source]
lookup_location_ipv6(address)[source]
lookup_location(address)[source]
class bspump.lookup.MatrixLookup(app, matrix_id=None, dtype='float_', on_clock_update=False, id=None, config=None, lazy=False)[source]

Bases: Lookup

Numpy Lookup

ConfigDefaults: dict = {'update_period': 5}
__init__(app, matrix_id=None, dtype='float_', on_clock_update=False, id=None, config=None, lazy=False)[source]

Description:

update_indexes()[source]
search(condition, target_column)[source]

Default search, override if optimized with indexes

serialize()[source]

Description:

deserialize(data_json)[source]

Description:


rest_get()[source]

Description:

Returns:

create_index(index_class, *args, **kwarg)[source]
class bspump.lookup.Index(id=None)[source]

Bases: object

__init__(id=None)[source]
update(matrix)[source]
serialize()[source]
search(*args)[source]
Return type:

set

class bspump.lookup.BitMapIndex(column, matrix, id=None)[source]

Bases: Index

__init__(column, matrix, id=None)[source]

Make sure, that column values are discrete. Also the creation procedure is relatively slow.

search(value)[source]

Returns set of matrix indexes.

update(matrix)[source]
serialize()[source]
deserialize(data)[source]
class bspump.lookup.TreeRangeIndex(column_start, column_end, matrix, id=None)[source]

Bases: Index

__init__(column_start, column_end, matrix, id=None)[source]
search(value)[source]
sorted_array_to_bst(matrix, arr, path, mask)[source]
update(matrix)[source]
serialize()[source]
deserialize(data)[source]
class bspump.lookup.SliceIndex(column_start, column_end, matrix, resolution=None, id=None)[source]

Bases: Index

__init__(column_start, column_end, matrix, resolution=None, id=None)[source]
search(value)[source]
update(matrix)[source]
serialize()[source]
deserialize(data)[source]

Base Classes

Lookup

Base class for all lookups.

import bspump

class MyLookup(bspump.Lookup):
    def get(self, key):
        return self.data.get(key)

DictionaryLookup

Simple dictionary-based lookup.

lookup = bspump.DictionaryLookup(app, "StatusLookup", {
    "1": "active",
    "2": "inactive",
    "3": "pending"
})

MappingLookup

Lookup with set/get operations.

lookup = bspump.MappingLookup(app, "MappingLookup")
lookup.set("key1", {"field": "value"})
lookup.set("key2", {"field": "other"})

File-based Lookups

JSONLookup

Load lookup data from JSON file.

import bspump.lookup

lookup = bspump.lookup.JSONLookup(app, "JSONLookup", config={
    "path": "/data/lookup.json"
})

CSVLookup

Load lookup data from CSV file.

lookup = bspump.lookup.CSVLookup(app, "CSVLookup", config={
    "path": "/data/lookup.csv",
    "key_column": "id"
})

Using Lookups

Access lookups in processors:

class EnrichProcessor(bspump.Processor):
    def __init__(self, app, pipeline, id=None, config=None):
        super().__init__(app, pipeline, id, config)
        svc = app.get_service("bspump.PumpService")
        self.lookup = svc.locate_lookup("UserLookup")

    def process(self, context, event):
        user_id = event.get("user_id")
        user_info = self.lookup.get(user_id)
        if user_info:
            event["user_name"] = user_info.get("name")
        return event

Registering Lookups

app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")

lookup = bspump.DictionaryLookup(app, "StatusLookup", {
    "1": "active",
    "2": "inactive"
})
svc.add_lookup(lookup)

Jupyter Registration

from bspump.jupyter import register_lookup

@register_lookup
def user_lookup(app):
    return bspump.DictionaryLookup(app, "UserLookup", {
        "u1": {"name": "Alice"},
        "u2": {"name": "Bob"}
    })

Configuration

[lookup:JSONLookup]
path=/data/lookup.json
reload_interval=3600

Custom Lookups

class RedisLookup(bspump.Lookup):
    def __init__(self, app, lookup_id, redis_client):
        super().__init__(app, lookup_id)
        self.redis = redis_client

    def get(self, key):
        value = self.redis.get(key)
        if value:
            return json.loads(value)
        return None

    def set(self, key, value):
        self.redis.set(key, json.dumps(value))