Source¶
Sources are the entry points for data into a pipeline. They either generate events internally or receive them from external systems.
Source Types¶
BSPump provides two base source classes:
- Source
Base class for sources that continuously produce events (like Kafka consumers).
- TriggerSource
Base class for sources that produce events on a trigger (like cron jobs).
Basic Source¶
A simple source that produces events:
import bspump
class MySource(bspump.Source):
async def main(self):
while True:
event = await self.get_event_from_somewhere()
await self.Pipeline.ready()
await self.Pipeline.process(event)
TriggerSource¶
For sources that should run on a schedule or trigger:
import bspump
from bspump.abc.source import TriggerSource
from bspump.trigger import CronTrigger
class ScheduledSource(TriggerSource):
async def cycle(self, *args, **kwargs):
await self.Pipeline.ready()
event = {"timestamp": datetime.now().isoformat()}
await self.Pipeline.process(event)
# Use with a trigger
source = ScheduledSource(app, pipeline).on(
CronTrigger(app, "*/5 * * * *") # Every 5 minutes
)
Built-in Sources¶
BSPump includes many built-in sources:
Kafka
import bspump.kafka
source = bspump.kafka.KafkaSource(
app, pipeline,
connection="KafkaConnection"
)
HTTP Webhook
import bspump.http.web
source = bspump.http.web.WebHookSource(
app, pipeline,
config={"path": "/webhook", "port": 8080}
)
File Sources
import bspump.file
# Line-by-line file reading
source = bspump.file.FileLineSource(app, pipeline, config={
"path": "/data/input.txt"
})
# CSV file reading
source = bspump.file.FileCSVSource(app, pipeline, config={
"path": "/data/input.csv"
})
See Integrations for the full list of available sources.
Source Configuration¶
Sources can be configured via:
Constructor parameters
Configuration file (
pipelines.conf)
[pipeline:MyPipeline:MySource]
path=/data/input.txt
batch_size=100
Context¶
Sources can attach context to events, which is passed through the pipeline:
async def main(self):
context = {"source": "my_source", "timestamp": time.time()}
await self.Pipeline.process(event, context=context)
The context can be accessed in processors:
def process(self, context, event):
source = context.get("source")
return event