bspump.trigger

Triggers for controlling when TriggerSources produce events.

class bspump.trigger.Trigger(app, max_triggered=None, id=None)[source]

Bases: ABC

Abstract base class for all trigger types in BitSwan.

Triggers are components that fire events at specific times or under specific conditions. When a trigger fires, it notifies all attached sources to execute their cycle() method.

Key features: - Manages a set of attached sources - Handles firing triggers (notifying sources) - Supports pause/resume functionality - Tracks trigger state and timing - Supports maximum triggered source limits

Usage:

trigger = SomeTrigger(app) source = MyTriggerSource(app, pipeline) source.on(trigger) # Attach source to trigger

__init__(app, max_triggered=None, id=None)[source]
add(source)[source]
remove(source)[source]
fire()[source]
done(trigger_source)[source]

Called by TriggerSource when cycle is completed.

pause(pause=True)[source]
class bspump.trigger.OpportunisticTrigger(app, id=None, run_immediately=True, chilldown_period=5)[source]

Bases: Trigger

This trigger tries to trigger the pump as frequenty as possible. It triggers immediately when possible, after each Source report completed cycle and in 5 sec. period (see chilldown period)

__init__(app, id=None, run_immediately=True, chilldown_period=5)[source]
pause(pause=True)[source]
on_tick(event_type='simulated')[source]
done(trigger_source)[source]

Called by TriggerSource when cycle is completed.

classmethod construct(app, definition)[source]
Parameters:

definition (dict)

class bspump.trigger.RunOnceTrigger(app, id=None)[source]

Bases: Trigger

RunOnceTrigger is an obsolete trigger that should not be used.

The following issues should be taken into consideration and need to be refactored in the future: 1) RunOnceTrigger issues the application stop. 2) The self.fire() event is triggered before the pipeline is_ready()

__init__(app, id=None)[source]
done(trigger_source)[source]

Called by TriggerSource when cycle is completed.

class bspump.trigger.PubSubTrigger(app, message_types, pubsub=None, id=None)[source]

Bases: Trigger

__init__(app, message_types, pubsub=None, id=None)[source]
async on_message(message_type)[source]
classmethod construct(app, definition)[source]
Parameters:

definition (dict)

class bspump.trigger.PeriodicTrigger(app, interval=None, id=None)[source]

Bases: Trigger

__init__(app, interval=None, id=None)[source]

Interval is in seconds, can be a float or int.

async on_timer()[source]
classmethod construct(app, definition)[source]
Parameters:

definition (dict)

class bspump.trigger.CronTrigger(app, cron_string, init_time, id=None)[source]

Bases: Trigger

Trigger that uses crontab syntax for specifying execution frequency.

CronTrigger allows you to schedule pipeline execution using standard cron expressions. It calculates the next execution time based on the cron string and fires when that time is reached.

Cron syntax:
| | | |
| | | +– Day of week (0-7, 0 and 7 are Sunday)
| | +—- Month (1-12)
| +—— Day of month (1-31)
+——– Hour (0-23)

+———- Minute (0-59)

Examples

“0 9 * * “ # Daily at 9:00 AM “/15 * * * *” # Every 15 minutes “0 0 1 * *” # First day of every month at midnight “0 9 * * 1” # Every Monday at 9:00 AM

Warning: Always use timezone-aware datetime objects for init_time to avoid timezone-related issues.

__init__(app, cron_string, init_time, id=None)[source]
async on_timer(event_type='simulated')[source]

Method that is called on every tick of the application.

pause(pause=True)[source]

Pauses the trigger

get_new_time(cron_string, time)[source]

Calculates new time for next trigger

classmethod construct(app, definition)[source]
Parameters:

definition (dict)

CronTrigger

Schedule-based triggers using cron expressions.

from bspump.trigger import CronTrigger

trigger = CronTrigger(app, "*/5 * * * *")  # Every 5 minutes

Cron Expression Format:

┌───────────── minute (0 - 59)
│ ┌───────────── hour (0 - 23)
│ │ ┌───────────── day of month (1 - 31)
│ │ │ ┌───────────── month (1 - 12)
│ │ │ │ ┌───────────── day of week (0 - 6, Sunday = 0)
│ │ │ │ │
* * * * *

Examples:

CronTrigger(app, "*/10 * * * *")     # Every 10 minutes
CronTrigger(app, "0 * * * *")        # Every hour
CronTrigger(app, "0 0 * * *")        # Daily at midnight
CronTrigger(app, "0 9 * * 1")        # Monday at 9 AM
CronTrigger(app, "0 0 1 * *")        # First of each month
CronTrigger(app, "0 18 * * 1-5")     # Weekdays at 6 PM

PubSubTrigger

Event-driven triggers using internal pub/sub.

from bspump.trigger import PubSubTrigger

trigger = PubSubTrigger(app, "my.event.topic")

# Fire the trigger from elsewhere
app.PubSub.publish("my.event.topic", data={"key": "value"})

PeriodicTrigger

Simple interval-based triggers.

from bspump.trigger import PeriodicTrigger

trigger = PeriodicTrigger(app, 10)  # Every 10 seconds

OpportunisticTrigger

Fires when the application has idle time.

from bspump.trigger import OpportunisticTrigger

trigger = OpportunisticTrigger(app)

Using Triggers

Attach triggers to a TriggerSource:

from bspump.abc.source import TriggerSource
from bspump.trigger import CronTrigger

class MySource(TriggerSource):
    async def cycle(self, *args, **kwargs):
        await self.Pipeline.ready()
        event = {"timestamp": datetime.now().isoformat()}
        await self.Pipeline.process(event)

source = MySource(app, pipeline).on(
    CronTrigger(app, "*/5 * * * *")
)

Multiple Triggers

A source can have multiple triggers:

source = MySource(app, pipeline).on(
    CronTrigger(app, "0 * * * *"),     # Every hour
    PubSubTrigger(app, "force.run")    # On-demand
)

The source’s cycle() method runs when any trigger fires.

Custom Triggers

Create custom triggers:

from bspump.trigger import Trigger

class WebhookTrigger(Trigger):
    def __init__(self, app, path):
        super().__init__(app)
        self.path = path
        # Set up webhook handler

    async def handle_webhook(self, request):
        await self.fire()