Source code for bspump.trigger.crontrig

from croniter import croniter
from datetime import datetime
from .trigger import Trigger


[docs] class CronTrigger(Trigger): """ Trigger that uses crontab syntax for specifying execution frequency. CronTrigger allows you to schedule pipeline execution using standard cron expressions. It calculates the next execution time based on the cron string and fires when that time is reached. Cron syntax: * * * * * | | | | | | | | | +-- Day of week (0-7, 0 and 7 are Sunday) | | | +---- Month (1-12) | | +------ Day of month (1-31) | +-------- Hour (0-23) +---------- Minute (0-59) Examples: "0 9 * * *" # Daily at 9:00 AM "*/15 * * * *" # Every 15 minutes "0 0 1 * *" # First day of every month at midnight "0 9 * * 1" # Every Monday at 9:00 AM Warning: Always use timezone-aware datetime objects for init_time to avoid timezone-related issues. """
[docs] def __init__(self, app, cron_string, init_time, id=None): super().__init__(app, id=id) self.cron_string = cron_string self.init_time = init_time self.next_trigger_time = self.get_new_time(cron_string, init_time) app.PubSub.subscribe("Application.tick!", self.on_timer)
[docs] async def on_timer(self, event_type="simulated"): """ Method that is called on every tick of the application. """ if datetime.now() > self.next_trigger_time: # get new time for next trigger self.next_trigger_time = self.get_new_time( self.cron_string, self.next_trigger_time ) self.fire() else: return
[docs] def pause(self, pause=True): """ Pauses the trigger """ super().pause(pause) if not pause: self.Loop.call_soon(self.on_tick)
[docs] def get_new_time(self, cron_string, time): """ Calculates new time for next trigger """ iterable = croniter(cron_string, time) return iterable.get_next(datetime)
[docs] @classmethod def construct(cls, app, definition: dict): id = definition.get("id") interval = definition.get("args", {}).get("cron_string") if interval is None: raise RuntimeError("CronTrigger needs interval to be defined") init_time = datetime.now() return cls(app, id=id, cron_string=interval, init_time=init_time)