Webhook to Kafka¶
This pattern demonstrates receiving HTTP webhooks and forwarding them to Kafka for reliable, scalable event processing.
Use Cases¶
Receiving payment notifications from payment providers
Ingesting alerts from monitoring systems
Collecting data from third-party APIs with webhook support
Building event-driven integrations
Architecture¶
External Service
│
▼
┌───────────────┐
│ WebHookSource │ HTTP POST /webhook
└───────────────┘
│
▼
┌───────────────┐
│ Processors │ Validate, transform
└───────────────┘
│
▼
┌───────────────┐
│ KafkaSink │ → Kafka topic
└───────────────┘
Jupyter Implementation¶
from bspump.jupyter import *
import bspump.kafka
import bspump.http.web
@register_connection
def kafka_connection(app):
return bspump.kafka.KafkaConnection(app, "KafkaConnection")
auto_pipeline(
source=lambda app, pipeline: bspump.http.web.WebHookSource(
app, pipeline,
config={"port": 8080, "path": "/webhook"}
),
sink=lambda app, pipeline: bspump.kafka.KafkaSink(
app, pipeline, connection="KafkaConnection"
),
name="Webhook2KafkaPipeline",
)
In subsequent cells, process the incoming webhook data:
import json
# Parse and validate the webhook payload
try:
data = json.loads(event.decode("utf-8"))
except json.JSONDecodeError:
event = None # Drop invalid JSON
# Add metadata
if event:
event["received_at"] = datetime.now().isoformat()
event["source"] = "webhook"
event = json.dumps(event).encode("utf-8")
Standalone Application¶
import bspump
import bspump.kafka
import bspump.http.web
import bspump.common
import json
class ValidateProcessor(bspump.Processor):
def process(self, context, event):
try:
data = json.loads(event.decode("utf-8"))
return data
except (json.JSONDecodeError, UnicodeDecodeError):
return None # Drop invalid events
class EnrichProcessor(bspump.Processor):
def process(self, context, event):
event["received_at"] = datetime.now().isoformat()
event["source"] = "webhook"
return json.dumps(event).encode("utf-8")
class WebhookToKafkaPipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.http.web.WebHookSource(app, self, config={
"port": 8080,
"path": "/webhook"
}),
ValidateProcessor(app, self),
EnrichProcessor(app, self),
bspump.kafka.KafkaSink(app, self, connection="KafkaConnection"),
)
if __name__ == "__main__":
app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
svc.add_connection(
bspump.kafka.KafkaConnection(app, "KafkaConnection")
)
svc.add_pipeline(WebhookToKafkaPipeline(app, "WebhookToKafkaPipeline"))
app.run()
Configuration¶
Configure via pipelines.conf:
[connection:KafkaConnection]
bootstrap_servers=kafka:9092
[pipeline:WebhookToKafkaPipeline:WebHookSource]
port=8080
path=/webhook
[pipeline:WebhookToKafkaPipeline:KafkaSink]
topic=incoming-webhooks
Best Practices¶
Validate incoming data: Always validate webhook payloads before processing
Add metadata: Include timestamps and source information
Use Kafka for durability: Kafka provides replay and failure recovery
Implement authentication: Secure webhook endpoints with signatures/tokens
Handle duplicates: Design downstream consumers to be idempotent