Connection

Connections are shared, reusable connections to external systems. They manage connection pooling, reconnection, and configuration for databases, message queues, and other services.

Why Use Connections?

  • Reusability: Share a single connection across multiple pipelines

  • Resource Efficiency: Connection pooling reduces overhead

  • Centralized Configuration: Configure once, use everywhere

  • Lifecycle Management: Automatic connection and reconnection handling

Creating a Connection

Connections are registered with the application:

import bspump
import bspump.kafka

app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")

# Create and register a Kafka connection
svc.add_connection(
    bspump.kafka.KafkaConnection(app, "KafkaConnection")
)

Using Connections in Pipelines

Reference connections by their ID:

class MyPipeline(bspump.Pipeline):
    def __init__(self, app, pipeline_id):
        super().__init__(app, pipeline_id)
        self.build(
            bspump.kafka.KafkaSource(
                app, self,
                connection="KafkaConnection"  # Reference by ID
            ),
            bspump.kafka.KafkaSink(
                app, self,
                connection="KafkaConnection"  # Same connection
            ),
        )

Jupyter Connection Registration

In Jupyter notebooks, use the @register_connection decorator:

from bspump.jupyter import *
import bspump.kafka

@register_connection
def kafka_connection(app):
    return bspump.kafka.KafkaConnection(app, "KafkaConnection")

Built-in Connections

Kafka

import bspump.kafka

connection = bspump.kafka.KafkaConnection(app, "KafkaConnection")

PostgreSQL

import bspump.postgresql

connection = bspump.postgresql.PostgreSQLConnection(app, "PostgreSQLConnection")

MongoDB

import bspump.mongodb

connection = bspump.mongodb.MongoDBConnection(app, "MongoDBConnection")

Elasticsearch

import bspump.elasticsearch

connection = bspump.elasticsearch.ElasticSearchConnection(app, "ESConnection")

MQTT

import bspump.mqtt

connection = bspump.mqtt.MQTTConnection(app, "MQTTConnection")

Connection Configuration

Connections are configured via the pipelines.conf file:

[connection:KafkaConnection]
bootstrap_servers=kafka:9092
security_protocol=PLAINTEXT

[connection:PostgreSQLConnection]
host=localhost
port=5432
database=mydb
user=postgres
password=${POSTGRES_PASSWORD}

Environment variables can be referenced using ${VAR_NAME} syntax.

See pipelines.conf for detailed configuration options.

Connection Lifecycle

Connections handle their lifecycle automatically:

  1. Initialization: Connection is created and configured

  2. Connection: Establishes connection to the external system

  3. Ready: Connection is available for use

  4. Reconnection: Automatic reconnection on failure

  5. Shutdown: Graceful disconnection on application stop

Custom Connections

Create custom connections by extending the base class:

import bspump

class MyConnection(bspump.Connection):
    def __init__(self, app, connection_id, config=None):
        super().__init__(app, connection_id, config=config)
        self.client = None

    async def connect(self):
        self.client = await create_client(
            host=self.Config.get("host"),
            port=self.Config.getint("port")
        )

    async def disconnect(self):
        if self.client:
            await self.client.close()