HTTP Integration

BSPump provides HTTP capabilities through webhook sources and HTTP client sinks.

Components

  • WebHookSource: Receives HTTP POST requests (bspump.http.web)

  • HTTPClientSink: Sends HTTP requests (bspump.http.client)

WebHookSource

Receives incoming HTTP webhooks.

import bspump.http.web

source = bspump.http.web.WebHookSource(
    app, pipeline,
    config={"path": "/webhook", "port": 8080}
)

Configuration:

[pipeline:MyPipeline:WebHookSource]
path=/webhook
port=8080
host=0.0.0.0

Multiple Endpoints

Create multiple webhook endpoints:

class Pipeline1(bspump.Pipeline):
    def __init__(self, app, pipeline_id):
        super().__init__(app, pipeline_id)
        self.build(
            bspump.http.web.WebHookSource(app, self, config={
                "path": "/api/v1/events",
                "port": 8080
            }),
            MySink(app, self),
        )

class Pipeline2(bspump.Pipeline):
    def __init__(self, app, pipeline_id):
        super().__init__(app, pipeline_id)
        self.build(
            bspump.http.web.WebHookSource(app, self, config={
                "path": "/api/v1/notifications",
                "port": 8080
            }),
            MySink(app, self),
        )

HTTPClientSink

Sends events to HTTP endpoints.

import bspump.http.client

sink = bspump.http.client.HTTPClientSink(
    app, pipeline,
    config={"url": "https://api.example.com/events"}
)

Configuration:

[pipeline:MyPipeline:HTTPClientSink]
url=https://api.example.com/events
method=POST
timeout=30

# Headers (optional)
headers={"Content-Type": "application/json", "Authorization": "Bearer ${API_TOKEN}"}

Webhook Validation

Validate incoming webhooks:

import hmac
import hashlib

class WebhookValidator(bspump.Processor):
    def __init__(self, app, pipeline, id=None, config=None):
        super().__init__(app, pipeline, id, config)
        self.secret = os.environ.get("WEBHOOK_SECRET")

    def process(self, context, event):
        signature = context.get("headers", {}).get("X-Signature")
        if not self.verify_signature(event, signature):
            return None  # Drop invalid requests
        return event

    def verify_signature(self, payload, signature):
        expected = hmac.new(
            self.secret.encode(),
            payload,
            hashlib.sha256
        ).hexdigest()
        return hmac.compare_digest(expected, signature)

Complete Webhook Example

from bspump.jupyter import *
import bspump.http.web
import bspump.kafka
import json

@register_connection
def kafka_connection(app):
    return bspump.kafka.KafkaConnection(app, "KafkaConnection")

auto_pipeline(
    source=lambda app, pipeline: bspump.http.web.WebHookSource(
        app, pipeline,
        config={"path": "/webhook", "port": 8080}
    ),
    sink=lambda app, pipeline: bspump.kafka.KafkaSink(
        app, pipeline, connection="KafkaConnection"
    ),
    name="WebhookPipeline",
)

# Parse incoming JSON
event = json.loads(event.decode("utf-8"))

# Add metadata
event["received_at"] = datetime.now().isoformat()

# Serialize
event = json.dumps(event).encode("utf-8")

HTTP Client for Data Fetching

Use aiohttp for fetching data in processors:

import bspump
import aiohttp

class EnrichmentProcessor(bspump.Processor):
    def __init__(self, app, pipeline, id=None, config=None):
        super().__init__(app, pipeline, id, config)
        self.session = None

    async def process(self, context, event):
        if self.session is None:
            self.session = aiohttp.ClientSession()

        user_id = event.get("user_id")
        async with self.session.get(f"https://api.example.com/users/{user_id}") as resp:
            user_data = await resp.json()
            event["user"] = user_data

        return event

Configuration Reference

WebHookSource Options

Option

Default

Description

path

/

URL path for the webhook endpoint

port

8080

HTTP port to listen on

host

0.0.0.0

Host to bind to

HTTPClientSink Options

Option

Default

Description

url

(required)

Target URL

method

POST

HTTP method

timeout

30

Request timeout in seconds