Source code for bspump.mongodb.changestreamsource

import asyncio
import logging
import pymongo

from ..abc.source import Source

#

L = logging.getLogger(__name__)

#


[docs] class MongoDBChangeStreamSource(Source): """ `MongoDBChangeStreamSource` listens to the specified `Database` and (optionally) `Collection` (if not configured, the events are aggregated from all collections). The output are `update`, `insert`, `delete`, `invalidate`, `dropDatabase`, `drop`, `rename`, `replace` events. Examples of events you can find here: https://docs.mongodb.com/manual/reference/change-events/ WARNING! Make sure, that version of MongoDB is >= 4.0.0 and replica set is enabled. """ ConfigDefaults = { "database": "", "collection": "", "full_document": "", }
[docs] def __init__(self, app, pipeline, connection, id=None, config=None): super().__init__(app, pipeline, id=id, config=config) self.Connection = pipeline.locate_connection(app, connection) self.Database = self.Config["database"] self.Collection = self.Config["collection"] self.FullDocument = self.Config["full_document"] if self.FullDocument == "": self.FullDocument = None if self.Collection == "": self.Collection = None
[docs] async def main(self): running = True await self.Pipeline.ready() db = self.Connection.Client[self.Database] if self.Collection is None: if self.FullDocument is None: stream = db.watch() else: stream = db.watch(full_document=self.FullDocument) else: if self.FullDocument is None: stream = db[self.Collection].watch() else: stream = db[self.Collection].watch(full_document=self.FullDocument) while True: if not running: await stream.close() break try: event = await stream.try_next() if event is not None: await self.process(event, context={}) except asyncio.CancelledError as e: L.warning( f"Mongo change stream task canceled or timed out: {e}. Restarting connection." ) return await self.main() except pymongo.errors.OperationFailure as e: L.warning( f"Recoverable error encountered while reading changestream: {e}. Restarting connection." ) return await self.main()