bspump.mongodb

MongoDB database integration for BSPump.

class bspump.mongodb.MongoDBChangeStreamSource(app, pipeline, connection, id=None, config=None)[source]

Bases: Source

MongoDBChangeStreamSource listens to the specified Database and (optionally) Collection (if not configured, the events are aggregated from all collections). The output are update, insert, delete, invalidate, dropDatabase, drop, rename, replace events. Examples of events you can find here: https://docs.mongodb.com/manual/reference/change-events/ WARNING! Make sure, that version of MongoDB is >= 4.0.0 and replica set is enabled.

ConfigDefaults: dict = {'collection': '', 'database': '', 'full_document': ''}
__init__(app, pipeline, connection, id=None, config=None)[source]

Set the initial ID, Pipeline and Task.

Parameters

appApplication

Name of an Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ .

pipelineaddress of a pipeline

Name of a Pipeline.

idstr, default None

Name of a the Pipeline.

configcompatible config type , default None

Option for adding a configuration file.

async main()[source]

Can be implemented for additional features, else will raise NotImplementedError and _main is called instead.

class bspump.mongodb.MongoDBConnection(app, id=None, config=None)[source]

Bases: Connection

Examples of configurations:

[connection:Mongo] host=localhost port=27017

[connection:Mongo] host=mongodb://localhost:27017

[connection:Mongo] host=mongodb://host1,host2/?replicaSet=my-replicaset-name

ConfigDefaults: dict = {'connect_timeout': '', 'database': 'database', 'heartbeat_frequency': 10000, 'host': 'localhost', 'max_idle_time': '', 'max_pool_size': 100, 'min_pool_size': 0, 'password': '', 'port': 27017, 'server_selection_timeout': '', 'socket_timeout': '', 'username': '', 'wait_queue_multiple': '', 'wait_queue_timeout': ''}
__init__(app, id=None, config=None)[source]

Description:

Parameters

appApplication

Specification of an Application.

id : default None

configJSON or other compatible format, default None

It contains important information and data responsible for creating a connection.

class bspump.mongodb.MongoDBLookup(app, connection, id=None, config=None, cache=None)[source]

Bases: MappingLookup, AsyncLookupMixin

The lookup that is linked with a MongoDB. It provides a mapping (dictionary-like) interface to pipelines. It feeds lookup data from Mongo using a query. It also has a simple cache to reduce a number of database hits.

configs

database - Mongo database name

collection - Mongo collection name

key - field name to match

Example:

The MongoDBLookup can be then located and used inside a custom enricher:

class AsyncEnricher(bspump.Generator):

def __init__(self, app, pipeline, id=None, config=None):

super().__init__(app, pipeline, id, config) svc = app.get_service(“bspump.PumpService”) self.Lookup = svc.locate_lookup(“MyMongoLookup”)

async def generate(self, context, event, depth):
if ‘user’ not in event:

return None

info = await self.Lookup.get(event[‘user’])

# Inject a new event into a next depth of the pipeline self.Pipeline.inject(context, event, depth)

ConfigDefaults: dict = {'changestream': False, 'collection': '', 'database': '', 'key': ''}
classmethod construct(app, definition, connection_id='MongoDBConnection')[source]

Usage:

— lookup: UserPasswordLookup config:

database: lookups collection: userpassword key: user

Parameters:

definition (dict)

__init__(app, connection, id=None, config=None, cache=None)[source]

Description:

build_query(key)[source]
async get(key)[source]

Obtain the value from lookup asynchronously.

async load()[source]

Description:

class bspump.mongodb.MongoDBSource(app, pipeline, connection, query_parms=None, id=None, config=None)[source]

Bases: TriggerSource

MongoDB database source

ConfigDefaults: dict = {'collection': '', 'database': '', 'output_queue_max_size': 2}
__init__(app, pipeline, connection, query_parms=None, id=None, config=None)[source]
Create new instance
Parameters:

app (Application): application pipeline (PipeLone): pipeline connection (Connection): Connection to the database query_parms (Dictionary): Query parameters (filter,projection,number of records) id (int): Id config (Dictionary): Connection configuration

async cycle()[source]

Not implemented.

Parameters

*args : ?

**kwags : ?

class bspump.mongodb.MongoDBSink(app, pipeline, connection, id=None, config=None)[source]

Bases: Sink

MongoDBSink is a sink processor that forwards the event to a MongoDB specified by a MongoDBConnection object.

MongoDBSink expects either a dictionary or a list of dictionaries as an input.

Example code can be found in the examples section under bspump-mongo-sink.py

While the connection defines MongoDB database used, you need to specify particular collection inside of this database in the sink itself by modifying the ConfigDefaults while instantiating the class.

ConfigDefaults: dict = {'collection': 'collection', 'output_queue_max_size': 100}
__init__(app, pipeline, connection, id=None, config=None)[source]

Initializes the Parameters

Parameters

appobject

Application object.

pipelinePipeline

Name of the Pipeline.

idstr, default=None,

ID of the class of config.

configJSON, or other compatible formats, default=None

Configuration file.

process(context, event)[source]

Can be implemented to return event based on a given logic.

Parameters

context :

Additional information passed to the method.

eventData with time stamp stored in any data type, usually it is in JSON.

You can specify an event that is passed to the method.

Parameters:

event ([<class 'dict'>, <class 'list'>])

MongoDBConnection

Connection to MongoDB.

import bspump.mongodb

connection = bspump.mongodb.MongoDBConnection(app, "MongoDBConnection")

Configuration:

[connection:MongoDBConnection]
uri=mongodb://localhost:27017
database=mydb

Options:

  • uri - MongoDB connection URI

  • database - Default database name

URI Examples:

# Local
uri=mongodb://localhost:27017

# With authentication
uri=mongodb://user:password@localhost:27017/mydb?authSource=admin

# Replica set
uri=mongodb://host1:27017,host2:27017,host3:27017/mydb?replicaSet=rs0

MongoDBSource

Reads documents from MongoDB.

source = bspump.mongodb.MongoDBSource(
    app, pipeline,
    connection="MongoDBConnection"
)

Configuration:

[pipeline:MyPipeline:MongoDBSource]
collection=events
query={"status": "pending"}
batch_size=100

MongoDBSink

Writes documents to MongoDB.

sink = bspump.mongodb.MongoDBSink(
    app, pipeline,
    connection="MongoDBConnection"
)

Configuration:

[pipeline:MyPipeline:MongoDBSink]
collection=processed_events
mode=insert

Modes:

  • insert - Insert new documents

  • upsert - Update or insert

  • replace - Replace existing documents

MongoDBLookup

Lookup table backed by MongoDB.

lookup = bspump.mongodb.MongoDBLookup(
    app, "ProductLookup",
    connection="MongoDBConnection",
    config={
        "collection": "products",
        "key": "product_id"
    }
)

Upsert Example

class UpsertProcessor(bspump.Processor):
    def process(self, context, event):
        # Set upsert key
        context["mongodb_upsert_key"] = {"_id": event["_id"]}
        return event

Example Pipeline

import bspump
import bspump.mongodb
import bspump.kafka

class KafkaToMongoPipeline(bspump.Pipeline):
    def __init__(self, app, pipeline_id):
        super().__init__(app, pipeline_id)
        self.build(
            bspump.kafka.KafkaSource(app, self, connection="KafkaConnection"),
            DocumentProcessor(app, self),
            bspump.mongodb.MongoDBSink(app, self, connection="MongoDBConnection"),
        )