MongoDB Integration¶
BSPump provides MongoDB integration through bspump.mongodb.
Installation¶
pip install motor
Components¶
MongoDBConnection: Shared connection to MongoDB
MongoDBSource: Reads documents from MongoDB collections
MongoDBSink: Writes documents to MongoDB collections
MongoDBLookup: Lookup table from MongoDB
MongoDBConnection¶
import bspump.mongodb
connection = bspump.mongodb.MongoDBConnection(app, "MongoDBConnection")
Configuration:
[connection:MongoDBConnection]
uri=mongodb://localhost:27017
database=mydb
# Authentication (optional)
# uri=mongodb://user:password@localhost:27017/mydb?authSource=admin
# Replica set (optional)
# uri=mongodb://host1:27017,host2:27017,host3:27017/mydb?replicaSet=rs0
MongoDBSource¶
Read documents from a MongoDB collection.
import bspump.mongodb
source = bspump.mongodb.MongoDBSource(
app, pipeline,
connection="MongoDBConnection"
)
Configuration:
[pipeline:MyPipeline:MongoDBSource]
collection=events
query={"status": "pending"}
batch_size=100
MongoDBSink¶
Write documents to a MongoDB collection.
import bspump.mongodb
sink = bspump.mongodb.MongoDBSink(
app, pipeline,
connection="MongoDBConnection"
)
Configuration:
[pipeline:MyPipeline:MongoDBSink]
collection=processed_events
# Insert mode: insert, upsert, replace
mode=insert
MongoDBLookup¶
Use MongoDB for event enrichment.
import bspump.mongodb
lookup = bspump.mongodb.MongoDBLookup(
app, "ProductLookup",
connection="MongoDBConnection",
config={
"collection": "products",
"key": "product_id"
}
)
Complete Example¶
from bspump.jupyter import *
import bspump.mongodb
import bspump.kafka
@register_connection
def mongo_connection(app):
return bspump.mongodb.MongoDBConnection(app, "MongoDBConnection")
@register_connection
def kafka_connection(app):
return bspump.kafka.KafkaConnection(app, "KafkaConnection")
auto_pipeline(
source=lambda app, pipeline: bspump.kafka.KafkaSource(
app, pipeline, connection="KafkaConnection"
),
sink=lambda app, pipeline: bspump.mongodb.MongoDBSink(
app, pipeline, connection="MongoDBConnection"
),
name="KafkaToMongoPipeline",
)
# Transform event to MongoDB document
import json
data = json.loads(event.decode("utf-8"))
event = {
"_id": data.get("id"),
"data": data,
"created_at": datetime.now()
}
Upsert Operations¶
Update existing documents or insert new ones:
class UpsertProcessor(bspump.Processor):
def process(self, context, event):
# Set upsert key for the sink
context["mongodb_upsert_key"] = {"_id": event["_id"]}
return event
Configuration:
[pipeline:MyPipeline:MongoDBSink]
collection=events
mode=upsert
Aggregation Pipeline¶
Use MongoDB aggregation in a source:
[pipeline:MyPipeline:MongoDBSource]
collection=events
pipeline=[{"$match": {"status": "active"}}, {"$group": {"_id": "$type", "count": {"$sum": 1}}}]
Configuration Reference¶
Connection Options
Option |
Default |
Description |
|---|---|---|
uri |
mongodb://localhost:27017 |
MongoDB connection URI |
database |
test |
Default database name |
Sink Options
Option |
Default |
Description |
|---|---|---|
collection |
(required) |
Target collection |
mode |
insert |
insert, upsert, or replace |