Custom Event Source

This pattern demonstrates building custom sources for polling external systems or handling custom data ingestion scenarios.

Use Cases

  • Polling REST APIs for new data

  • Integrating with proprietary systems

  • Custom file watchers

  • Database change data capture

Basic Custom Source

A minimal custom source:

import bspump
import asyncio

class MyCustomSource(bspump.Source):
    async def main(self):
        while True:
            # Fetch data from external system
            events = await self.fetch_events()

            for event in events:
                await self.Pipeline.ready()
                await self.Pipeline.process(event)

            # Wait before next poll
            await asyncio.sleep(10)

    async def fetch_events(self):
        # Implement your data fetching logic
        return [{"id": 1, "data": "example"}]

Polling REST API

A source that polls a REST API:

import bspump
import aiohttp
import asyncio

class RESTPollingSource(bspump.Source):
    def __init__(self, app, pipeline, id=None, config=None):
        super().__init__(app, pipeline, id, config)
        self.url = self.Config.get("url")
        self.poll_interval = self.Config.getint("poll_interval", 60)
        self.session = None
        self.last_id = None

    async def main(self):
        async with aiohttp.ClientSession() as self.session:
            while True:
                await self.poll()
                await asyncio.sleep(self.poll_interval)

    async def poll(self):
        params = {}
        if self.last_id:
            params["since_id"] = self.last_id

        async with self.session.get(self.url, params=params) as response:
            data = await response.json()

            for item in data.get("items", []):
                await self.Pipeline.ready()
                await self.Pipeline.process(item)
                self.last_id = item.get("id")

Using Thread Pools

For blocking operations, use a thread pool:

import bspump
import asyncio
from concurrent.futures import ThreadPoolExecutor

class ThreadedSource(bspump.Source):
    def __init__(self, app, pipeline, id=None, config=None):
        super().__init__(app, pipeline, id, config)
        self.executor = ThreadPoolExecutor(max_workers=4)

    async def main(self):
        loop = asyncio.get_event_loop()
        while True:
            # Run blocking operation in thread pool
            events = await loop.run_in_executor(
                self.executor,
                self.blocking_fetch
            )

            for event in events:
                await self.Pipeline.ready()
                await self.Pipeline.process(event)

            await asyncio.sleep(10)

    def blocking_fetch(self):
        # Blocking I/O operations here
        import requests
        response = requests.get("https://api.example.com/data")
        return response.json()

TriggerSource for Scheduled Polling

Use TriggerSource for cron-scheduled polling:

from bspump.abc.source import TriggerSource
from bspump.trigger import CronTrigger

class ScheduledPollingSource(TriggerSource):
    async def cycle(self, *args, **kwargs):
        # Called on each trigger
        events = await self.fetch_events()

        for event in events:
            await self.Pipeline.ready()
            await self.Pipeline.process(event)

    async def fetch_events(self):
        async with aiohttp.ClientSession() as session:
            async with session.get(self.url) as response:
                return await response.json()

# Usage
source = ScheduledPollingSource(app, pipeline).on(
    CronTrigger(app, "*/5 * * * *")  # Every 5 minutes
)

Jupyter Implementation

from bspump.jupyter import *
from bspump.abc.source import TriggerSource
from bspump.trigger import CronTrigger
import aiohttp

class APIPollingSource(TriggerSource):
    async def cycle(self, *args, **kwargs):
        await self.Pipeline.ready()

        async with aiohttp.ClientSession() as session:
            async with session.get("https://api.example.com/events") as resp:
                data = await resp.json()

        for event in data:
            await self.Pipeline.process(event)

auto_pipeline(
    source=lambda app, pipeline: APIPollingSource(app, pipeline).on(
        CronTrigger(app, "*/5 * * * *")
    ),
    sink=lambda app, pipeline: bspump.common.PPrintSink(app, pipeline),
    name="PollingPipeline",
)

Configuration

[pipeline:PollingPipeline:RESTPollingSource]
url=https://api.example.com/events
poll_interval=60

Best Practices

  1. Handle rate limits: Respect API rate limits with backoff

  2. Track state: Remember last processed ID for incremental polling

  3. Use connection pooling: Reuse HTTP sessions

  4. Handle failures: Implement retry logic with exponential backoff

  5. Log progress: Track metrics for monitoring