bspump.mongodb¶
MongoDB database integration for BSPump.
- class bspump.mongodb.MongoDBChangeStreamSource(app, pipeline, connection, id=None, config=None)[source]¶
Bases:
SourceMongoDBChangeStreamSource listens to the specified Database and (optionally) Collection (if not configured, the events are aggregated from all collections). The output are update, insert, delete, invalidate, dropDatabase, drop, rename, replace events. Examples of events you can find here: https://docs.mongodb.com/manual/reference/change-events/ WARNING! Make sure, that version of MongoDB is >= 4.0.0 and replica set is enabled.
- __init__(app, pipeline, connection, id=None, config=None)[source]¶
Set the initial ID,
Pipelineand Task.Parameters
- appApplication
Name of an Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ .
- pipelineaddress of a pipeline
Name of a
Pipeline.- idstr, default None
Name of a the
Pipeline.- configcompatible config type , default None
Option for adding a configuration file.
- class bspump.mongodb.MongoDBConnection(app, id=None, config=None)[source]¶
Bases:
ConnectionExamples of configurations:
[connection:Mongo] host=localhost port=27017
[connection:Mongo] host=mongodb://localhost:27017
[connection:Mongo] host=mongodb://host1,host2/?replicaSet=my-replicaset-name
- ConfigDefaults: dict = {'connect_timeout': '', 'database': 'database', 'heartbeat_frequency': 10000, 'host': 'localhost', 'max_idle_time': '', 'max_pool_size': 100, 'min_pool_size': 0, 'password': '', 'port': 27017, 'server_selection_timeout': '', 'socket_timeout': '', 'username': '', 'wait_queue_multiple': '', 'wait_queue_timeout': ''}¶
- __init__(app, id=None, config=None)[source]¶
Description:
Parameters
- appApplication
Specification of an Application.
id : default None
- configJSON or other compatible format, default None
It contains important information and data responsible for creating a connection.
- class bspump.mongodb.MongoDBLookup(app, connection, id=None, config=None, cache=None)[source]¶
Bases:
MappingLookup,AsyncLookupMixinThe lookup that is linked with a MongoDB. It provides a mapping (dictionary-like) interface to pipelines. It feeds lookup data from Mongo using a query. It also has a simple cache to reduce a number of database hits.
configs
database - Mongo database name
collection - Mongo collection name
key - field name to match
Example:
The MongoDBLookup can be then located and used inside a custom enricher:
class AsyncEnricher(bspump.Generator):
- def __init__(self, app, pipeline, id=None, config=None):
super().__init__(app, pipeline, id, config) svc = app.get_service(“bspump.PumpService”) self.Lookup = svc.locate_lookup(“MyMongoLookup”)
- async def generate(self, context, event, depth):
- if ‘user’ not in event:
return None
info = await self.Lookup.get(event[‘user’])
# Inject a new event into a next depth of the pipeline self.Pipeline.inject(context, event, depth)
- class bspump.mongodb.MongoDBSource(app, pipeline, connection, query_parms=None, id=None, config=None)[source]¶
Bases:
TriggerSourceMongoDB database source
- __init__(app, pipeline, connection, query_parms=None, id=None, config=None)[source]¶
- Create new instance
- Parameters:
app (Application): application pipeline (PipeLone): pipeline connection (Connection): Connection to the database query_parms (Dictionary): Query parameters (filter,projection,number of records) id (int): Id config (Dictionary): Connection configuration
- class bspump.mongodb.MongoDBSink(app, pipeline, connection, id=None, config=None)[source]¶
Bases:
SinkMongoDBSink is a sink processor that forwards the event to a MongoDB specified by a MongoDBConnection object.
MongoDBSink expects either a dictionary or a list of dictionaries as an input.
Example code can be found in the examples section under bspump-mongo-sink.py
While the connection defines MongoDB database used, you need to specify particular collection inside of this database in the sink itself by modifying the ConfigDefaults while instantiating the class.
- __init__(app, pipeline, connection, id=None, config=None)[source]¶
Initializes the Parameters
Parameters
- process(context, event)[source]¶
Can be implemented to return event based on a given logic.
Parameters
- context :
Additional information passed to the method.
- eventData with time stamp stored in any data type, usually it is in JSON.
You can specify an event that is passed to the method.
- Parameters:
event ([<class 'dict'>, <class 'list'>])
MongoDBConnection¶
Connection to MongoDB.
import bspump.mongodb
connection = bspump.mongodb.MongoDBConnection(app, "MongoDBConnection")
Configuration:
[connection:MongoDBConnection]
uri=mongodb://localhost:27017
database=mydb
Options:
uri- MongoDB connection URIdatabase- Default database name
URI Examples:
# Local
uri=mongodb://localhost:27017
# With authentication
uri=mongodb://user:password@localhost:27017/mydb?authSource=admin
# Replica set
uri=mongodb://host1:27017,host2:27017,host3:27017/mydb?replicaSet=rs0
MongoDBSource¶
Reads documents from MongoDB.
source = bspump.mongodb.MongoDBSource(
app, pipeline,
connection="MongoDBConnection"
)
Configuration:
[pipeline:MyPipeline:MongoDBSource]
collection=events
query={"status": "pending"}
batch_size=100
MongoDBSink¶
Writes documents to MongoDB.
sink = bspump.mongodb.MongoDBSink(
app, pipeline,
connection="MongoDBConnection"
)
Configuration:
[pipeline:MyPipeline:MongoDBSink]
collection=processed_events
mode=insert
Modes:
insert- Insert new documentsupsert- Update or insertreplace- Replace existing documents
MongoDBLookup¶
Lookup table backed by MongoDB.
lookup = bspump.mongodb.MongoDBLookup(
app, "ProductLookup",
connection="MongoDBConnection",
config={
"collection": "products",
"key": "product_id"
}
)
Upsert Example¶
class UpsertProcessor(bspump.Processor):
def process(self, context, event):
# Set upsert key
context["mongodb_upsert_key"] = {"_id": event["_id"]}
return event
Example Pipeline¶
import bspump
import bspump.mongodb
import bspump.kafka
class KafkaToMongoPipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.kafka.KafkaSource(app, self, connection="KafkaConnection"),
DocumentProcessor(app, self),
bspump.mongodb.MongoDBSink(app, self, connection="MongoDBConnection"),
)