Connection¶
Connections are shared, reusable connections to external systems. They manage connection pooling, reconnection, and configuration for databases, message queues, and other services.
Why Use Connections?¶
Reusability: Share a single connection across multiple pipelines
Resource Efficiency: Connection pooling reduces overhead
Centralized Configuration: Configure once, use everywhere
Lifecycle Management: Automatic connection and reconnection handling
Creating a Connection¶
Connections are registered with the application:
import bspump
import bspump.kafka
app = bspump.BSPumpApplication()
svc = app.get_service("bspump.PumpService")
# Create and register a Kafka connection
svc.add_connection(
bspump.kafka.KafkaConnection(app, "KafkaConnection")
)
Using Connections in Pipelines¶
Reference connections by their ID:
class MyPipeline(bspump.Pipeline):
def __init__(self, app, pipeline_id):
super().__init__(app, pipeline_id)
self.build(
bspump.kafka.KafkaSource(
app, self,
connection="KafkaConnection" # Reference by ID
),
bspump.kafka.KafkaSink(
app, self,
connection="KafkaConnection" # Same connection
),
)
Jupyter Connection Registration¶
In Jupyter notebooks, use the @register_connection decorator:
from bspump.jupyter import *
import bspump.kafka
@register_connection
def kafka_connection(app):
return bspump.kafka.KafkaConnection(app, "KafkaConnection")
Built-in Connections¶
Kafka
import bspump.kafka
connection = bspump.kafka.KafkaConnection(app, "KafkaConnection")
PostgreSQL
import bspump.postgresql
connection = bspump.postgresql.PostgreSQLConnection(app, "PostgreSQLConnection")
MongoDB
import bspump.mongodb
connection = bspump.mongodb.MongoDBConnection(app, "MongoDBConnection")
Elasticsearch
import bspump.elasticsearch
connection = bspump.elasticsearch.ElasticSearchConnection(app, "ESConnection")
MQTT
import bspump.mqtt
connection = bspump.mqtt.MQTTConnection(app, "MQTTConnection")
Connection Configuration¶
Connections are configured via the pipelines.conf file:
[connection:KafkaConnection]
bootstrap_servers=kafka:9092
security_protocol=PLAINTEXT
[connection:PostgreSQLConnection]
host=localhost
port=5432
database=mydb
user=postgres
password=${POSTGRES_PASSWORD}
Environment variables can be referenced using ${VAR_NAME} syntax.
See pipelines.conf for detailed configuration options.
Connection Lifecycle¶
Connections handle their lifecycle automatically:
Initialization: Connection is created and configured
Connection: Establishes connection to the external system
Ready: Connection is available for use
Reconnection: Automatic reconnection on failure
Shutdown: Graceful disconnection on application stop
Custom Connections¶
Create custom connections by extending the base class:
import bspump
class MyConnection(bspump.Connection):
def __init__(self, app, connection_id, config=None):
super().__init__(app, connection_id, config=config)
self.client = None
async def connect(self):
self.client = await create_client(
host=self.Config.get("host"),
port=self.Config.getint("port")
)
async def disconnect(self):
if self.client:
await self.client.close()