Scheduled Tasks¶
This pattern demonstrates running periodic tasks using CronTrigger for scheduled job execution.
Use Cases¶
Periodic data aggregation and reporting
Scheduled cleanup tasks
Regular health checks
Batch processing at specific times
Daily/weekly/monthly jobs
Architecture¶
CronTrigger
(Schedule: "0 * * * *")
│
▼
┌───────────────────┐
│ TriggerSource │
│ cycle() method │
└───────────────────┘
│
▼
┌───────────────────┐
│ Processors │
└───────────────────┘
│
▼
┌───────────────────┐
│ Sink │
└───────────────────┘
Jupyter Implementation¶
from bspump.jupyter import *
from bspump.abc.source import TriggerSource
from bspump.trigger import CronTrigger
from datetime import datetime
class ScheduledSource(TriggerSource):
async def cycle(self, *args, **kwargs):
await self.Pipeline.ready()
event = {
"type": "scheduled_job",
"triggered_at": datetime.now().isoformat(),
"job_name": "hourly_report"
}
await self.Pipeline.process(event)
auto_pipeline(
source=lambda app, pipeline: ScheduledSource(app, pipeline).on(
CronTrigger(app, "0 * * * *") # Every hour
),
sink=lambda app, pipeline: bspump.common.PPrintSink(app, pipeline),
name="ScheduledPipeline",
)
Process the scheduled event:
# Generate report data
event["report_data"] = await generate_hourly_report()
event["completed_at"] = datetime.now().isoformat()
Standalone Application¶
import bspump
import bspump.common
from bspump.abc.source import TriggerSource
from bspump.trigger import CronTrigger
from datetime import datetime
class DailyReportSource(TriggerSource):
async def cycle(self, *args, **kwargs):
await self.Pipeline.ready()
# Generate daily report
report = await self.generate_report()
await self.Pipeline.process(report)
async def generate_report(self):
return {
"type": "daily_report",
"date": datetime.now().strftime("%Y-%m-%d"),
"metrics": {
"total_events": 12345,
"errors": 5,
"success_rate": 99.96
}
}
class ReportProcessor(bspump.Processor):
def process(self, context, event):
# Format report for output
event["formatted"] = f"Daily Report for {event['date']}"
return event
class ScheduledPipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
DailyReportSource(app, self).on(
CronTrigger(app, "0 0 * * *") # Daily at midnight
),
ReportProcessor(app, self),
bspump.common.PPrintSink(app, self),
)
if __name__ == "__main__":
app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
svc.add_pipeline(ScheduledPipeline(app, "ScheduledPipeline"))
app.run()
Multiple Triggers¶
Combine multiple triggers for flexibility:
from bspump.trigger import CronTrigger, PubSubTrigger
source = MySource(app, pipeline).on(
# Regular schedule
CronTrigger(app, "0 * * * *"),
# On-demand trigger via pub/sub
PubSubTrigger(app, "run.scheduled.job")
)
# Trigger on-demand
app.PubSub.publish("run.scheduled.job")
Batch Processing with Scheduled Source¶
Process multiple items in a single cycle:
class BatchProcessingSource(TriggerSource):
async def cycle(self, *args, **kwargs):
await self.Pipeline.ready()
# Fetch batch of items to process
items = await self.fetch_pending_items()
for item in items:
await self.Pipeline.process(item)
async def fetch_pending_items(self):
# Query database for pending items
async with self.db_connection.acquire() as conn:
return await conn.fetch(
"SELECT * FROM pending_items WHERE status = 'pending'"
)
Cron Expression Examples¶
# Every 10 minutes
CronTrigger(app, "*/10 * * * *")
# Every hour at minute 0
CronTrigger(app, "0 * * * *")
# Every day at midnight
CronTrigger(app, "0 0 * * *")
# Every Monday at 9 AM
CronTrigger(app, "0 9 * * 1")
# First of every month at midnight
CronTrigger(app, "0 0 1 * *")
# Weekdays at 6 PM
CronTrigger(app, "0 18 * * 1-5")
Configuration¶
[pipeline:ScheduledPipeline]
# Pipeline-level configuration
Best Practices¶
Idempotent operations: Design jobs to be safely re-run
Track execution: Log start/end times and results
Handle long-running jobs: Use async for lengthy operations
Avoid overlapping: Ensure jobs complete before next trigger
Monitor failures: Alert on job failures