Scheduled Tasks

This pattern demonstrates running periodic tasks using CronTrigger for scheduled job execution.

Use Cases

  • Periodic data aggregation and reporting

  • Scheduled cleanup tasks

  • Regular health checks

  • Batch processing at specific times

  • Daily/weekly/monthly jobs

Architecture

CronTrigger
(Schedule: "0 * * * *")
      │
      ▼
┌───────────────────┐
│  TriggerSource    │
│  cycle() method   │
└───────────────────┘
      │
      ▼
┌───────────────────┐
│   Processors      │
└───────────────────┘
      │
      ▼
┌───────────────────┐
│      Sink         │
└───────────────────┘

Jupyter Implementation

from bspump.jupyter import *
from bspump.abc.source import TriggerSource
from bspump.trigger import CronTrigger
from datetime import datetime

class ScheduledSource(TriggerSource):
    async def cycle(self, *args, **kwargs):
        await self.Pipeline.ready()
        event = {
            "type": "scheduled_job",
            "triggered_at": datetime.now().isoformat(),
            "job_name": "hourly_report"
        }
        await self.Pipeline.process(event)

auto_pipeline(
    source=lambda app, pipeline: ScheduledSource(app, pipeline).on(
        CronTrigger(app, "0 * * * *")  # Every hour
    ),
    sink=lambda app, pipeline: bspump.common.PPrintSink(app, pipeline),
    name="ScheduledPipeline",
)

Process the scheduled event:

# Generate report data
event["report_data"] = await generate_hourly_report()
event["completed_at"] = datetime.now().isoformat()

Standalone Application

import bspump
import bspump.common
from bspump.abc.source import TriggerSource
from bspump.trigger import CronTrigger
from datetime import datetime

class DailyReportSource(TriggerSource):
    async def cycle(self, *args, **kwargs):
        await self.Pipeline.ready()

        # Generate daily report
        report = await self.generate_report()
        await self.Pipeline.process(report)

    async def generate_report(self):
        return {
            "type": "daily_report",
            "date": datetime.now().strftime("%Y-%m-%d"),
            "metrics": {
                "total_events": 12345,
                "errors": 5,
                "success_rate": 99.96
            }
        }

class ReportProcessor(bspump.Processor):
    def process(self, context, event):
        # Format report for output
        event["formatted"] = f"Daily Report for {event['date']}"
        return event

class ScheduledPipeline(bspump.Pipeline):
    def __init__(self, app, pipeline_id):
        super().__init__(app, pipeline_id)
        self.build(
            DailyReportSource(app, self).on(
                CronTrigger(app, "0 0 * * *")  # Daily at midnight
            ),
            ReportProcessor(app, self),
            bspump.common.PPrintSink(app, self),
        )

if __name__ == "__main__":
    app = bspump.BSPumpApplication()
    svc = app.get_service("bspump.PumpService")
    svc.add_pipeline(ScheduledPipeline(app, "ScheduledPipeline"))
    app.run()

Multiple Triggers

Combine multiple triggers for flexibility:

from bspump.trigger import CronTrigger, PubSubTrigger

source = MySource(app, pipeline).on(
    # Regular schedule
    CronTrigger(app, "0 * * * *"),
    # On-demand trigger via pub/sub
    PubSubTrigger(app, "run.scheduled.job")
)

# Trigger on-demand
app.PubSub.publish("run.scheduled.job")

Batch Processing with Scheduled Source

Process multiple items in a single cycle:

class BatchProcessingSource(TriggerSource):
    async def cycle(self, *args, **kwargs):
        await self.Pipeline.ready()

        # Fetch batch of items to process
        items = await self.fetch_pending_items()

        for item in items:
            await self.Pipeline.process(item)

    async def fetch_pending_items(self):
        # Query database for pending items
        async with self.db_connection.acquire() as conn:
            return await conn.fetch(
                "SELECT * FROM pending_items WHERE status = 'pending'"
            )

Cron Expression Examples

# Every 10 minutes
CronTrigger(app, "*/10 * * * *")

# Every hour at minute 0
CronTrigger(app, "0 * * * *")

# Every day at midnight
CronTrigger(app, "0 0 * * *")

# Every Monday at 9 AM
CronTrigger(app, "0 9 * * 1")

# First of every month at midnight
CronTrigger(app, "0 0 1 * *")

# Weekdays at 6 PM
CronTrigger(app, "0 18 * * 1-5")

Configuration

[pipeline:ScheduledPipeline]
# Pipeline-level configuration

Best Practices

  1. Idempotent operations: Design jobs to be safely re-run

  2. Track execution: Log start/end times and results

  3. Handle long-running jobs: Use async for lengthy operations

  4. Avoid overlapping: Ensure jobs complete before next trigger

  5. Monitor failures: Alert on job failures