Source code for bspump.kafka.keyfilter
import bspump
import collections.abc
[docs]
class KafkaKeyFilter(bspump.Processor):
"""
KafkaKeyFilter reduces the incoming event stream from Kafka based on a
key provided in each event.
Every Kafka message has a key, KafkaKeyFilter selects only those events where
the event key matches one of provided 'keys', other events will be discarded.
Set filtering `keys` as a parameter (in bytes) in the KafkaKeyFilter constructor.
KafkaKeyFilter is meant to be inserted after KafkaSource in a Pipeline.
"""
[docs]
def __init__(self, app, pipeline, keys, id=None, config=None):
"""
Initializes variables
**Parameters**
app : Application
Name of the `Application <https://asab.readthedocs.io/en/latest/asab/application.html`_.
pipeline : Pipeline
Name of the Pipeline.
keys : bytes
keys used to filter out events from the event stream.
id : , default = None
config : JSON, default = None
configuration file in JSON
"""
super().__init__(app, pipeline, id, config)
if not isinstance(keys, collections.abc.Iterable) or isinstance(keys, bytes):
self.Keys = frozenset([keys])
else:
self.Keys = frozenset(keys)
[docs]
def process(self, context, event):
kafka_key = context.get("kafka_key")
assert kafka_key is not None
if kafka_key is not None and kafka_key in self.Keys:
return event
else:
return None