Source code for bspump.abc.source

import logging
import asyncio

import bspump.asab as asab


L = logging.getLogger(__name__)


[docs] class Source(asab.Configurable): """ Description: """
[docs] def __init__(self, app, pipeline, id=None, config=None): """ Set the initial ID, :meth:`Pipeline <bspump.Pipeline()>` and Task. **Parameters** app : Application Name of an `Application` <https://asab.readthedocs.io/en/latest/asab/application.html#>`_ . pipeline : address of a pipeline Name of a :meth:`Pipeline <bspump.Pipeline()>`. id : str, default None Name of a the :meth:`Pipeline <bspump.Pipeline()>`. config : compatible config type , default None Option for adding a configuration file. """ super().__init__( "pipeline:{}:{}".format( pipeline.Id, id if id is not None else self.__class__.__name__ ), config=config, ) self.Id = id if id is not None else self.__class__.__name__ self.Pipeline = pipeline self.Task = None # Contains a main coroutine `main()` if Pipeline is started self.MQTTService = app.get_service("bspump.MQTTService") self.EventCount = 0 self.EventsToPublish = 0
[docs] async def process(self, event, context=None): """ This method is used to emit event into a :meth:`Pipeline <bspump.Pipeline()>`. **Parameters** event: Data with time stamp stored in any data type, usually JSON. Message or information that is passed to the method and emitted into a :meth:`Pipeline <bspump.Pipeline()>`. context : default None Additional information. If there is an error in the processing of the event, the :meth:`Pipeline <bspump.Pipeline()>` is throttled by setting the error and the exception raised. :hint The source should catch this exception and fail gracefully. """ # TODO: Remove this method completely, each source should call pipeline.process() method directly self.EventCount += 1 if self.MQTTService and self.EventsToPublish > 0: self.MQTTService.publish_event( self.Pipeline.Id, self, event, self.EventsToPublish ) self.EventsToPublish -= 1 await self.Pipeline.process(event, context=context)
[docs] def start(self, loop): """ Starts the :meth:`Pipeline <bspump.Pipeline()>` through the _main method, but if main method is implemented it starts the coroutine using main method instead. **Parameters** loop : ? Contains the coroutines. """ if self.Task is not None: return async def _main(): """ Description: :return: """ # This is to properly handle a lifecycle of the main method try: await self.main() except asyncio.CancelledError: pass except Exception as e: self.Pipeline.set_error(None, None, e) L.exception("Exception in the source '{}'".format(self.Id)) self.Task = loop.create_task(_main())
[docs] async def stop(self): """ Stops the Source using self.Task. If the processes are not done it cancels them or raises an error. :return: """ if self.Task is None: return # Source is not started if not self.Task.done(): self.Task.cancel() try: await self.Task except asyncio.exceptions.CancelledError: L.warning("Task cancelled: {}".format(self)) if not self.Task.done(): L.error("Source '{}' refused to stop: {}".format(self.Id, self.Task)) self.Task = None
[docs] def restart(self, loop): """ Restarts the loop of coroutines and returns result() method. **Parameters** loop : ?? Contains the coroutines. """ if self.Task is not None: if self.Task.done(): self.Task.result() self.Task = None self.start(loop)
[docs] async def main(self): """ Can be implemented for additional features, else will raise NotImplementedError and _main is called instead. """ raise NotImplementedError()
[docs] async def stopped(self): """ Waits for all asynchronous tasks to be completed. It is helper that simplifies the implementation of sources. Example: ..code:: python async def main(self): #... initialize resources here await self.stopped() #... finalize resources here """ try: while True: await asyncio.sleep(60) except asyncio.CancelledError: pass
[docs] def locate_address(self): """ Locates address of a :meth:`Pipeline <bspump.Pipeline()>`. :return: ID and ID of a :meth:`Pipeline <bspump.Pipeline()>` as a string. """ return "{}.*{}".format(self.Pipeline.Id, self.Id)
[docs] def rest_get(self): """ :return: ID and class ID """ return {"Id": self.Id, "Class": self.__class__.__name__}
def __repr__(self): return "%s(%r)" % (self.__class__.__name__, self.locate_address())
[docs] @classmethod def construct(cls, app, pipeline, definition: dict): """ Can create a source based on a specific definition. For example, a JSON file. **Parameters** app : Application Name of the `Application <https://asab.readthedocs.io/en/latest/asab/application.html#>`_. pipeline : :meth:`Pipeline <bspump.Pipeline()>` Specification of a :meth:`Pipeline <bspump.Pipeline()>`. definition : dict Definition that is used to create a source. :return: cls(app, newid, config) """ newid = definition.get("id") config = definition.get("config") args = definition.get("args") if args is not None: return cls(app, pipeline, id=newid, config=config, **args) else: return cls(app, pipeline, id=newid, config=config)
[docs] class TriggerSource(Source): """ A specialized source that responds to trigger events. TriggerSource is designed to work with BitSwan's trigger system. It waits for trigger events and executes a cycle() method when triggered. This allows for event-driven data processing in pipelines. Key features: - Waits for trigger events using asyncio.Event - Executes cycle() method when triggered - Manages multiple triggers simultaneously - Handles trigger lifecycle events Usage: class MyTriggerSource(TriggerSource): async def cycle(self, *args, **kwargs): # Your trigger logic here await self.Pipeline.process({"data": "triggered"}) # Attach to any trigger type source = MyTriggerSource(app, pipeline) source.on(some_trigger) """
[docs] def __init__(self, app, pipeline, id=None, config=None): super().__init__(app, pipeline, id=id, config=config) self.App = app self.Loop = app.Loop self.TriggerEvent = asyncio.Event() self.TriggerEvent.clear() self.Triggers = set()
[docs] def time(self): """ Method used for measuring an accurate time. :return: App.time() :hint: You can find more information about `UTC Time <https://asab.readthedocs.io/en/latest/asab/application.html#utc-time>`_ in the ASAB documentation """ return self.App.time()
[docs] def on(self, trigger): """ Sets a Trigger which is a method that waits for a given condition. **Parameters** trigger : keyword of a trigger Given condition that. :return: Trigger.add(trigger) """ trigger.add(self) self.Triggers.add(trigger) return self
[docs] async def main(self, *args, **kwags): """ Waits for :meth:`Pipeline <bspump.Pipeline()>`, triggers, and calls exceptions when the source is initiated. **Parameters** *args : ? **kwags : ? | """ while True: # Wait for pipeline is ready await self.Pipeline.ready() # Wait for a trigger try: await self.TriggerEvent.wait() except asyncio.CancelledError: break # Send begin on a cycle event self.Pipeline.PubSub.publish( "bspump.pipeline.cycle_begin!", pipeline=self.Pipeline ) # Execute one cycle try: await self.cycle(*args, **kwags) except asyncio.CancelledError: # This happens when Ctrl-C is pressed L.warning( "Pipeline '{}' processing was cancelled".format(self.Pipeline.Id) ) # Send end of a cycle event self.Pipeline.PubSub.publish( "bspump.pipeline.cycle_canceled!", pipeline=self.Pipeline ) break except BaseException as e: self.Pipeline.set_error(None, None, e) # Send end of a cycle event self.Pipeline.PubSub.publish( "bspump.pipeline.cycle_end!", pipeline=self.Pipeline ) self.TriggerEvent.clear() for trigger in self.Triggers: trigger.done(self)
[docs] async def cycle(self, *args, **kwags): """ Not implemented. **Parameters** *args : ? **kwags : ? """ raise NotImplementedError()
[docs] def rest_get(self): """ Description: :return: """ result = super().rest_get() result.update({"triggered": self.TriggerEvent.is_set()}) return result