Custom Event Source¶
This pattern demonstrates building custom sources for polling external systems or handling custom data ingestion scenarios.
Use Cases¶
Polling REST APIs for new data
Integrating with proprietary systems
Custom file watchers
Database change data capture
Basic Custom Source¶
A minimal custom source:
import bspump
import asyncio
class MyCustomSource(bspump.Source):
async def main(self):
while True:
# Fetch data from external system
events = await self.fetch_events()
for event in events:
await self.Pipeline.ready()
await self.Pipeline.process(event)
# Wait before next poll
await asyncio.sleep(10)
async def fetch_events(self):
# Implement your data fetching logic
return [{"id": 1, "data": "example"}]
Polling REST API¶
A source that polls a REST API:
import bspump
import aiohttp
import asyncio
class RESTPollingSource(bspump.Source):
def __init__(self, app, pipeline, id=None, config=None):
super().__init__(app, pipeline, id, config)
self.url = self.Config.get("url")
self.poll_interval = self.Config.getint("poll_interval", 60)
self.session = None
self.last_id = None
async def main(self):
async with aiohttp.ClientSession() as self.session:
while True:
await self.poll()
await asyncio.sleep(self.poll_interval)
async def poll(self):
params = {}
if self.last_id:
params["since_id"] = self.last_id
async with self.session.get(self.url, params=params) as response:
data = await response.json()
for item in data.get("items", []):
await self.Pipeline.ready()
await self.Pipeline.process(item)
self.last_id = item.get("id")
Using Thread Pools¶
For blocking operations, use a thread pool:
import bspump
import asyncio
from concurrent.futures import ThreadPoolExecutor
class ThreadedSource(bspump.Source):
def __init__(self, app, pipeline, id=None, config=None):
super().__init__(app, pipeline, id, config)
self.executor = ThreadPoolExecutor(max_workers=4)
async def main(self):
loop = asyncio.get_event_loop()
while True:
# Run blocking operation in thread pool
events = await loop.run_in_executor(
self.executor,
self.blocking_fetch
)
for event in events:
await self.Pipeline.ready()
await self.Pipeline.process(event)
await asyncio.sleep(10)
def blocking_fetch(self):
# Blocking I/O operations here
import requests
response = requests.get("https://api.example.com/data")
return response.json()
TriggerSource for Scheduled Polling¶
Use TriggerSource for cron-scheduled polling:
from bspump.abc.source import TriggerSource
from bspump.trigger import CronTrigger
class ScheduledPollingSource(TriggerSource):
async def cycle(self, *args, **kwargs):
# Called on each trigger
events = await self.fetch_events()
for event in events:
await self.Pipeline.ready()
await self.Pipeline.process(event)
async def fetch_events(self):
async with aiohttp.ClientSession() as session:
async with session.get(self.url) as response:
return await response.json()
# Usage
source = ScheduledPollingSource(app, pipeline).on(
CronTrigger(app, "*/5 * * * *") # Every 5 minutes
)
Jupyter Implementation¶
from bspump.jupyter import *
from bspump.abc.source import TriggerSource
from bspump.trigger import CronTrigger
import aiohttp
class APIPollingSource(TriggerSource):
async def cycle(self, *args, **kwargs):
await self.Pipeline.ready()
async with aiohttp.ClientSession() as session:
async with session.get("https://api.example.com/events") as resp:
data = await resp.json()
for event in data:
await self.Pipeline.process(event)
auto_pipeline(
source=lambda app, pipeline: APIPollingSource(app, pipeline).on(
CronTrigger(app, "*/5 * * * *")
),
sink=lambda app, pipeline: bspump.common.PPrintSink(app, pipeline),
name="PollingPipeline",
)
Configuration¶
[pipeline:PollingPipeline:RESTPollingSource]
url=https://api.example.com/events
poll_interval=60
Best Practices¶
Handle rate limits: Respect API rate limits with backoff
Track state: Remember last processed ID for incremental polling
Use connection pooling: Reuse HTTP sessions
Handle failures: Implement retry logic with exponential backoff
Log progress: Track metrics for monitoring