pipelines.conf¶
The pipelines.conf file is the primary configuration file for BSPump
applications. It configures connections, pipelines, and components.
File Location¶
BSPump looks for pipelines.conf in:
Current working directory
Path specified via
-ccommand-line argument/etc/bspump/pipelines.conf
Basic Structure¶
# Connection configuration
[connection:KafkaConnection]
bootstrap_servers=kafka:9092
# Pipeline configuration
[pipeline:MyPipeline]
max_concurrent=10
# Component configuration
[pipeline:MyPipeline:KafkaSource]
topic=input-events
[pipeline:MyPipeline:KafkaSink]
topic=output-events
Connection Configuration¶
Connections are configured with [connection:ID] sections.
Kafka
[connection:KafkaConnection]
bootstrap_servers=kafka1:9092,kafka2:9092
security_protocol=PLAINTEXT
group_id=my-consumer-group
# SSL
# security_protocol=SSL
# ssl_cafile=/path/to/ca.pem
# ssl_certfile=/path/to/cert.pem
# ssl_keyfile=/path/to/key.pem
# SASL
# security_protocol=SASL_SSL
# sasl_mechanism=PLAIN
# sasl_plain_username=${KAFKA_USER}
# sasl_plain_password=${KAFKA_PASSWORD}
PostgreSQL
[connection:PostgreSQLConnection]
host=localhost
port=5432
database=mydb
user=postgres
password=${POSTGRES_PASSWORD}
min_size=1
max_size=10
MongoDB
[connection:MongoDBConnection]
uri=mongodb://localhost:27017
database=mydb
Elasticsearch
[connection:ElasticSearchConnection]
url=http://localhost:9200
MQTT
[connection:MQTTConnection]
host=localhost
port=1883
username=${MQTT_USER}
password=${MQTT_PASSWORD}
Pipeline Configuration¶
Pipelines are configured with [pipeline:Name] sections.
[pipeline:ProcessingPipeline]
# Maximum concurrent events
max_concurrent=100
# Pipeline timeout in seconds
timeout=60
Source Configuration¶
Sources are configured with [pipeline:Name:SourceId] sections.
Kafka Source
[pipeline:MyPipeline:KafkaSource]
topic=input-events
# Or multiple topics
# topics=topic1,topic2
auto_offset_reset=earliest
max_poll_records=500
max_poll_interval_ms=300000
WebHook Source
[pipeline:MyPipeline:WebHookSource]
path=/webhook
port=8080
host=0.0.0.0
File Source
[pipeline:MyPipeline:FileLineSource]
path=/data/input.txt
encoding=utf-8
Processor Configuration¶
Processors are configured with [pipeline:Name:ProcessorId] sections.
[pipeline:MyPipeline:FilterProcessor]
threshold=100
enabled=true
Sink Configuration¶
Sinks are configured with [pipeline:Name:SinkId] sections.
Kafka Sink
[pipeline:MyPipeline:KafkaSink]
topic=output-events
acks=all
batch_size=16384
linger_ms=0
Elasticsearch Sink
[pipeline:MyPipeline:ElasticSearchSink]
index=events-%Y-%m-%d
bulk_size=500
bulk_timeout=5.0
PostgreSQL Sink
[pipeline:MyPipeline:PostgreSQLSink]
table=events
columns=id,data,created_at
File Sink
[pipeline:MyPipeline:FileLineSink]
path=/data/output.txt
mode=a
Lookup Configuration¶
Lookups are configured with [lookup:ID] sections.
[lookup:UserLookup]
path=/data/users.json
reload_interval=3600
Environment Variables¶
Reference environment variables with ${VAR} syntax:
[connection:PostgreSQLConnection]
password=${POSTGRES_PASSWORD}
With defaults:
[connection:PostgreSQLConnection]
host=${POSTGRES_HOST:-localhost}
port=${POSTGRES_PORT:-5432}
Complete Example¶
# pipelines.conf
# Kafka connection
[connection:KafkaConnection]
bootstrap_servers=${KAFKA_BOOTSTRAP_SERVERS:-kafka:9092}
group_id=processing-group
# PostgreSQL connection
[connection:PostgreSQLConnection]
host=${POSTGRES_HOST:-localhost}
port=5432
database=events
user=postgres
password=${POSTGRES_PASSWORD}
# Main processing pipeline
[pipeline:ProcessingPipeline]
max_concurrent=100
[pipeline:ProcessingPipeline:KafkaSource]
topic=raw-events
auto_offset_reset=earliest
[pipeline:ProcessingPipeline:KafkaSink]
topic=processed-events
acks=all
# Archival pipeline
[pipeline:ArchivalPipeline]
[pipeline:ArchivalPipeline:KafkaSource]
topic=processed-events
[pipeline:ArchivalPipeline:ElasticSearchSink]
index=events-%Y-%m-%d
bulk_size=1000
# User lookup
[lookup:UserLookup]
path=/data/users.json
Best Practices¶
Use environment variables for secrets: Never commit passwords
Provide sensible defaults: Use
${VAR:-default}syntaxSeparate concerns: Use multiple config files if needed
Document configuration: Add comments explaining options
Validate on startup: Check required configuration exists