PostgreSQL Integration

BSPump provides PostgreSQL integration through bspump.postgresql.

Installation

pip install asyncpg

Components

  • PostgreSQLConnection: Shared connection pool

  • PostgreSQLSource: Queries data from PostgreSQL

  • PostgreSQLSink: Writes data to PostgreSQL

  • PostgreSQLLookup: Lookup table from PostgreSQL

PostgreSQLConnection

import bspump.postgresql

connection = bspump.postgresql.PostgreSQLConnection(app, "PostgreSQLConnection")

Configuration:

[connection:PostgreSQLConnection]
host=localhost
port=5432
database=mydb
user=postgres
password=${POSTGRES_PASSWORD}

# Connection pool settings
min_size=1
max_size=10

PostgreSQLSource

Query data from PostgreSQL tables.

import bspump.postgresql

source = bspump.postgresql.PostgreSQLSource(
    app, pipeline,
    connection="PostgreSQLConnection"
)

Configuration:

[pipeline:MyPipeline:PostgreSQLSource]
query=SELECT * FROM events WHERE processed = false
batch_size=100

PostgreSQLSink

Write events to PostgreSQL.

import bspump.postgresql

sink = bspump.postgresql.PostgreSQLSink(
    app, pipeline,
    connection="PostgreSQLConnection"
)

Configuration:

[pipeline:MyPipeline:PostgreSQLSink]
table=processed_events
# Columns to insert (optional, defaults to all event keys)
columns=id,data,created_at

PostgreSQLLookup

Use PostgreSQL data for event enrichment.

import bspump.postgresql

lookup = bspump.postgresql.PostgreSQLLookup(
    app, "UserLookup",
    connection="PostgreSQLConnection",
    config={
        "query": "SELECT id, name, email FROM users",
        "key": "id"
    }
)

Usage in a processor:

class EnrichProcessor(bspump.Processor):
    def __init__(self, app, pipeline, id=None, config=None):
        super().__init__(app, pipeline, id, config)
        svc = app.get_service("bspump.PumpService")
        self.user_lookup = svc.locate_lookup("UserLookup")

    def process(self, context, event):
        user_id = event.get("user_id")
        user = self.user_lookup.get(user_id)
        if user:
            event["user_name"] = user["name"]
        return event

Complete Example

from bspump.jupyter import *
import bspump.postgresql
import bspump.kafka

@register_connection
def pg_connection(app):
    return bspump.postgresql.PostgreSQLConnection(app, "PostgreSQLConnection")

@register_connection
def kafka_connection(app):
    return bspump.kafka.KafkaConnection(app, "KafkaConnection")

auto_pipeline(
    source=lambda app, pipeline: bspump.kafka.KafkaSource(
        app, pipeline, connection="KafkaConnection"
    ),
    sink=lambda app, pipeline: bspump.postgresql.PostgreSQLSink(
        app, pipeline, connection="PostgreSQLConnection"
    ),
    name="KafkaToPostgresPipeline",
)

# Transform Kafka message to database row
import json
data = json.loads(event.decode("utf-8"))
event = {
    "id": data["id"],
    "payload": json.dumps(data),
    "created_at": datetime.now()
}

Raw Query Execution

Execute custom queries in processors:

class QueryProcessor(bspump.Processor):
    def __init__(self, app, pipeline, id=None, config=None):
        super().__init__(app, pipeline, id, config)
        svc = app.get_service("bspump.PumpService")
        self.connection = svc.locate_connection("PostgreSQLConnection")

    async def process(self, context, event):
        async with self.connection.acquire() as conn:
            result = await conn.fetch(
                "SELECT * FROM related WHERE parent_id = $1",
                event["id"]
            )
            event["related"] = [dict(r) for r in result]
        return event

Configuration Reference

Connection Options

Option

Default

Description

host

localhost

Database host

port

5432

Database port

database

postgres

Database name

user

postgres

Username

password

(empty)

Password

min_size

1

Minimum pool connections

max_size

10

Maximum pool connections