Analyzers

BSPump provides analyzers for time-window aggregation, statistics, and anomaly detection over streaming data.

Overview

Analyzers are specialized processors that maintain state across events and can perform complex aggregations.

TimeWindowAnalyzer

Aggregate events over fixed time windows:

import bspump
from bspump.analyzer import TimeWindowAnalyzer

class HourlyCountAnalyzer(TimeWindowAnalyzer):
    def __init__(self, app, pipeline, id=None, config=None):
        super().__init__(
            app, pipeline, id, config,
            window_size=3600,  # 1 hour in seconds
            resolution=60     # 1 minute resolution
        )
        self.counts = {}

    def evaluate(self, context, event):
        event_type = event.get("type")
        self.counts[event_type] = self.counts.get(event_type, 0) + 1
        return event

    def on_tick(self, tick):
        # Called each resolution period
        for event_type, count in self.counts.items():
            print(f"{event_type}: {count} events")
        self.counts.clear()

SessionAnalyzer

Track user sessions:

from bspump.analyzer import SessionAnalyzer

class UserSessionAnalyzer(SessionAnalyzer):
    def __init__(self, app, pipeline, id=None, config=None):
        super().__init__(
            app, pipeline, id, config,
            session_timeout=1800  # 30 minutes
        )

    def evaluate(self, context, event):
        user_id = event.get("user_id")
        session = self.get_session(user_id)

        if session is None:
            # New session
            session = self.create_session(user_id, {
                "start_time": event.get("timestamp"),
                "events": []
            })

        session["events"].append(event)
        return event

    def on_session_end(self, session_id, session_data):
        # Called when session times out
        duration = len(session_data["events"])
        print(f"Session {session_id} ended with {duration} events")

Matrix Analyzer

Use matrices for multi-dimensional analysis:

import bspump
from bspump.matrix import Matrix

class GeoAnalyzer(bspump.Analyzer):
    def __init__(self, app, pipeline, id=None, config=None):
        super().__init__(app, pipeline, id, config)
        # 360x180 matrix for lat/lon buckets
        self.matrix = Matrix(app, "GeoMatrix", (360, 180))

    def evaluate(self, context, event):
        lat = int(event.get("latitude", 0) + 90)
        lon = int(event.get("longitude", 0) + 180)
        self.matrix[lat, lon] += 1
        return event

Anomaly Detection

Detect anomalies in event streams:

from bspump.anomaly import Anomaly

class RateAnomalyDetector(bspump.Analyzer):
    def __init__(self, app, pipeline, id=None, config=None):
        super().__init__(app, pipeline, id, config)
        self.window = []
        self.window_size = 100

    def evaluate(self, context, event):
        value = event.get("value", 0)
        self.window.append(value)

        if len(self.window) > self.window_size:
            self.window.pop(0)

        if len(self.window) >= self.window_size:
            mean = sum(self.window) / len(self.window)
            std = (sum((x - mean) ** 2 for x in self.window) / len(self.window)) ** 0.5

            if abs(value - mean) > 3 * std:
                event["anomaly"] = True
                event["anomaly_score"] = abs(value - mean) / std

        return event

Aggregation Pipeline

Build aggregation pipelines:

class AggregationPipeline(bspump.Pipeline):
    def __init__(self, app, pipeline_id):
        super().__init__(app, pipeline_id)
        self.build(
            bspump.kafka.KafkaSource(app, self, connection="KafkaConnection"),
            HourlyCountAnalyzer(app, self),
            UserSessionAnalyzer(app, self),
            RateAnomalyDetector(app, self),
            bspump.kafka.KafkaSink(app, self, connection="KafkaConnection"),
        )

State Persistence

Persist analyzer state for recovery:

from bspump.matrix import PersistentMatrix

class PersistentAnalyzer(bspump.Analyzer):
    def __init__(self, app, pipeline, id=None, config=None):
        super().__init__(app, pipeline, id, config)
        self.matrix = PersistentMatrix(
            app, "PersistentMatrix",
            path="/data/matrix.dat",
            shape=(1000, 1000)
        )

The matrix state is automatically saved and restored on restart.

Configuration

Configure analyzers in pipelines.conf:

[pipeline:AnalysisPipeline:TimeWindowAnalyzer]
window_size=3600
resolution=60

[pipeline:AnalysisPipeline:SessionAnalyzer]
session_timeout=1800