Source code for bspump.lookup.ipgeolookup

import logging
import csv
import ipaddress

from bspump.abc.lookup import DictionaryLookup

###

L = logging.getLogger(__name__)

###


[docs] class IPGeoLookup(DictionaryLookup): """ This lookup performs transformation of IP address into a geographical location. It uses a file database from ip2location.com. Lookup provides locations both in ipv4 and ipv6 formats. NOTICE: IPv6 database includes also all IPv4 locations, see "ipv4mapped" config. option. Free versions: IP2LOCATION-LITE-DB5.IPV6.CSV and IP2LOCATION-LITE-DB5.IPV4.CSV For better precision visit https://lite.ip2location.com to buy a commercial version of database. Usage: specify in configuration the path to the database in csv format.""" ConfigDefaults = { "path": "", "ipv4mapped": "no", # IPv4-mapped IPv6 address (enables to use IPv6 lookups for IPv4 addresses) }
[docs] def __init__(self, app, id=None, config=None): super().__init__(app, id=id, config=config) self.TreeRoot = None self.Locations = {} if self.Config["ipv4mapped"].lower() == "yes": self.IP4Mapped = True else: self.IP4Mapped = False
[docs] async def load(self): fname = self.Config["path"] if fname == "": return with open(fname, "r") as f: array = [] for line in csv.reader(f, delimiter=","): ip_int_address_start = int(line[0]) ip_int_address_end = int(line[1]) array.append(ip_int_address_start) array.append(ip_int_address_end) lat = float(line[6]) lon = float(line[7]) if (lat == 0.0) or (lon == 0.0): d = {"lat": None, "lon": None} else: d = {"lat": lat, "lon": lon} if line[2] != "-": d["country"] = line[2] if line[4] != "-": d["region"] = line[4] if line[5] != "-": d["city"] = line[5] self.Locations[ip_int_address_start] = d self.Locations[ip_int_address_end] = d self.TreeRoot = self.sorted_array_to_bst(array) del array L.debug("IPGeoLookup {} was successfully created".format(self.Id)) return True
[docs] def set(self, tree): self.TreeRoot = tree
# REST
[docs] def rest_get(self): rest = super().rest_get() rest["TreeRoot"] = self.TreeRoot rest["Locations"] = self.Locations rest["IP4Mapped"] = self.IP4Mapped return rest
[docs] def sorted_array_to_bst(self, arr): if not arr: return None mid = int(len(arr) / 2) root = Node(arr[mid]) root.left = self.sorted_array_to_bst(arr[:mid]) root.right = self.sorted_array_to_bst(arr[mid + 1 :]) return root
[docs] def search(self, value): root = self.TreeRoot while True: if root.data == value: return root.data if root.data > value: left = root.left if left is None: return root.data else: root = left if root.data < value: right = root.right if right is None: return root.data else: root = right
[docs] def lookup_location_ipv4(self, address): if self.TreeRoot is None: # L.warning("Cannot enrich the location") return None address_int = int(ipaddress.IPv4Address(address)) if self.IP4Mapped: # https://blog.ip2location.com/knowledge-base/ipv4-mapped-ipv6-address/ # 191.239.213.197 -> ::ffff:191.239.213.197 address_int += 281470681743360 value = self.search(address_int) return self.Locations.get(value)
[docs] def lookup_location_ipv6(self, address): if self.TreeRoot is None: # L.warning("Cannot enrich the location") return None address_int = int(ipaddress.IPv6Address(address)) value = self.search(address_int) return self.Locations.get(value)
[docs] def lookup_location(self, address): if "." in address: return self.lookup_location_ipv4(address) elif ":" in address: return self.lookup_location_ipv6(address) else: raise ValueError("Invalid IPv4/IPv6 format")
class Node: def __init__(self, d): self.data = d self.left = None self.right = None