Apache StreamPipes

Layer: IoT Layer

Purpose

Apache StreamPipes is the IIoT data processing engine for FlexGalaxy.AI, wrapped by ThingIO for account-scoped data processing, dashboard visualization, and application integration. StreamPipes handles complex stream analytics that go beyond ThingsBoard’s built-in rule engine — pattern detection, anomaly analysis, machine learning inference, and multi-source data correlation.

StreamPipes complements ThingsBoard. ThingsBoard owns device connectivity and basic alerting (surfaced via DeviceAdmin); StreamPipes owns advanced analytics (surfaced via ThingIO).

Responsibility Split

Concern

Owner

Why

Threshold alerting (“battery < 20%”)

ThingsBoard rule engine

Simple, low-latency, no external dependency

Pattern detection (“battery drain rate increasing”)

StreamPipes

Requires windowed computation

Multi-device correlation (“congestion at zone B”)

StreamPipes

Requires cross-device aggregation

ML model inference on telemetry

StreamPipes

ML runtime integration

Anomaly detection

StreamPipes

Statistical analysis over time windows

Event routing to Kafka

ThingsBoard rule engine

Native integration, low overhead

Integration with ThingsBoard

Devices ──MQTT──► ThingsBoard ──Rule Engine──► Kafka ──► StreamPipes
                       │                                      │
                       │                                      ▼
                       │                              Analytics results
                       │                                      │
                       ▼                                      ▼
                  Time-series DB                     Kafka: sp.analytics.*
                  (raw storage)                              │
                                                             ▼
                                                    Platform services
                                                    (alerts, dashboards)

Data Flow

  1. Devices send telemetry to ThingsBoard via MQTT/CoAP/HTTP

  2. ThingsBoard stores raw telemetry in its time-series database

  3. ThingsBoard Rule Engine publishes to Kafka topics (tb.telemetry.*)

  4. StreamPipes adapters consume from Kafka topics

  5. StreamPipes pipelines process data (filter, transform, analyze)

  6. Results are published back to Kafka (sp.analytics.*) for platform consumption

Kafka Topics

Direction

Topic

Content

TB → SP

tb.telemetry.position

Device position updates

TB → SP

tb.telemetry.battery

Battery level readings

TB → SP

tb.telemetry.sensor

Generic sensor data

TB → SP

tb.telemetry.task

Task progress events

SP → Platform

sp.analytics.anomaly

Detected anomalies

SP → Platform

sp.analytics.pattern

Pattern detection results

SP → Platform

sp.analytics.coverage

Coverage analysis results

SP → Platform

sp.analytics.congestion

Zone congestion alerts

Pipeline Examples

Battery Degradation Detection

[Kafka Adapter]          [Trend Detector]       [Alert Sink]
tb.telemetry.battery ──► Window: 1 hour     ──► sp.analytics.anomaly
                         Detect: drain rate
                         increasing > 2x
                         normal

Detects devices whose battery drain rate is accelerating beyond normal patterns, indicating potential hardware degradation or excessive workload.

Zone Congestion Analysis

[Kafka Adapter]          [Geo Aggregation]      [Threshold Filter]    [Alert Sink]
tb.telemetry.position ──► Group by: zone    ──► Count > threshold ──► sp.analytics.congestion
                          Window: 5 min          per zone type

Counts devices per zone within a sliding time window. When device density exceeds zone capacity thresholds, triggers congestion alerts that the Planner can use for rerouting.

Cleaning Coverage Verification

[Kafka Adapter]          [Area Calculator]      [Gap Detector]        [Result Sink]
tb.telemetry.position ──► Calculate swept   ──► Compare to zone   ──► sp.analytics.coverage
(cleaning robots only)    area per zone          total area
                          Window: per shift       Flag uncovered
                                                  regions

For ClearJanitor: computes actual cleaning coverage from robot trajectories and identifies uncovered areas within each zone.

Pipeline Builder

StreamPipes provides a visual pipeline builder accessible to platform operators. This allows creating custom analytics pipelines without code:

  • Adapters — data sources (Kafka topics, HTTP endpoints, OPC-UA)

  • Processing elements — filters, transforms, aggregations, ML inference

  • Data sinks — Kafka, databases, notifications, dashboards

StreamPipes ships with 100+ prebuilt pipeline elements. Custom elements can be added as Docker containers following the StreamPipes SDK.

Deployment

StreamPipes runs on the shared EKS cluster:

Component

Purpose

Backend

Core API, pipeline management, user management

UI

Web-based pipeline builder and monitoring

Extensions (all-jvm)

Prebuilt adapters, processors, and sinks

Consul

Service discovery for pipeline elements

CouchDB

Pipeline and user data storage

InfluxDB

Internal metrics and pipeline results

Resource Requirements

Scale

Pipelines

CPU

Memory

Small (dev)

< 10

2 cores

4 GB

Medium

10–50

4 cores

8 GB

Large

50+

8+ cores

16+ GB

StreamPipes uses a dedicated MSK cluster (Kafka Cluster B), separate from ThingsBoard’s MSK cluster (Kafka Cluster A). Cross-cluster replication delivers telemetry topics from Cluster A to Cluster B. See the IoT Layer overview for the cluster topology.

Boundaries

StreamPipes handles:

  • Complex stream analytics beyond basic thresholds

  • Multi-source data correlation and aggregation

  • Machine learning model inference on telemetry data

  • Visual pipeline builder for custom analytics

  • Pattern detection and anomaly analysis

StreamPipes does not handle:

  • Device connectivity → ThingsBoard (via DeviceAdmin)

  • Raw telemetry storage → ThingsBoard

  • Simple threshold alerts → ThingsBoard rule engine (via DeviceAdmin)

  • OTA updates → hawkBit (via OTAForge)

  • Device provisioning → DeviceAdmin

  • Dashboard account scoping → ThingIO (wraps StreamPipes with access control)