Source code for bspump.mongodb.source
"""Module for connecting to Mongo database"""
import asyncio
import logging
from bspump.abc.source import TriggerSource
#
L = logging.getLogger(__name__)
#
[docs]
class MongoDBSource(TriggerSource):
"""MongoDB database source"""
ConfigDefaults = {
"output_queue_max_size": 2,
"database": "",
"collection": "",
}
[docs]
def __init__(
self, app, pipeline, connection, query_parms=None, id=None, config=None
):
"""
Create new instance
Parameters:
app (Application): application
pipeline (PipeLone): pipeline
connection (Connection): Connection to the database
query_parms (Dictionary): Query parameters (filter,projection,number of records)
id (int): Id
config (Dictionary): Connection configuration
"""
super().__init__(app, pipeline, id=id, config=config)
self.Pipeline = pipeline
self._output_queue = asyncio.Queue()
self._output_queue_max_size = int(self.Config["output_queue_max_size"])
self.QueryParms = query_parms
self.Connection = pipeline.locate_connection(app, connection)
self.Database = self.Config["database"]
self.Collection = self.Config["collection"]
[docs]
async def cycle(self):
db = self.Connection.Client[self.Database]
# We check the queue size and remove throttling if the size is smaller than its defined max size.
if self._output_queue.qsize() == self._output_queue_max_size - 1:
self.Pipeline.throttle(self, False)
coll = db[self.Collection]
await self.Pipeline.ready()
# query params
q_filter = self.QueryParms.get("filter", None)
q_projection = self.QueryParms.get("projection", None)
q_limit = self.QueryParms.get("limit", 0)
cur = coll.find(q_filter, q_projection, 0, int(q_limit))
async for recs in cur:
pass
await self.process(recs, context={})