Apache StreamPipes¶
Layer: IoT Layer
Purpose¶
Apache StreamPipes is the IIoT data processing engine for FlexGalaxy.AI, wrapped by ThingIO for account-scoped data processing, dashboard visualization, and application integration. StreamPipes handles complex stream analytics that go beyond ThingsBoard’s built-in rule engine — pattern detection, anomaly analysis, machine learning inference, and multi-source data correlation.
StreamPipes complements ThingsBoard. ThingsBoard owns device connectivity and basic alerting (surfaced via DeviceAdmin); StreamPipes owns advanced analytics (surfaced via ThingIO).
Responsibility Split¶
Concern |
Owner |
Why |
|---|---|---|
Threshold alerting (“battery < 20%”) |
ThingsBoard rule engine |
Simple, low-latency, no external dependency |
Pattern detection (“battery drain rate increasing”) |
StreamPipes |
Requires windowed computation |
Multi-device correlation (“congestion at zone B”) |
StreamPipes |
Requires cross-device aggregation |
ML model inference on telemetry |
StreamPipes |
ML runtime integration |
Anomaly detection |
StreamPipes |
Statistical analysis over time windows |
Event routing to Kafka |
ThingsBoard rule engine |
Native integration, low overhead |
Integration with ThingsBoard¶
Devices ──MQTT──► ThingsBoard ──Rule Engine──► Kafka ──► StreamPipes
│ │
│ ▼
│ Analytics results
│ │
▼ ▼
Time-series DB Kafka: sp.analytics.*
(raw storage) │
▼
Platform services
(alerts, dashboards)
Data Flow¶
Devices send telemetry to ThingsBoard via MQTT/CoAP/HTTP
ThingsBoard stores raw telemetry in its time-series database
ThingsBoard Rule Engine publishes to Kafka topics (
tb.telemetry.*)StreamPipes adapters consume from Kafka topics
StreamPipes pipelines process data (filter, transform, analyze)
Results are published back to Kafka (
sp.analytics.*) for platform consumption
Kafka Topics¶
Direction |
Topic |
Content |
|---|---|---|
TB → SP |
|
Device position updates |
TB → SP |
|
Battery level readings |
TB → SP |
|
Generic sensor data |
TB → SP |
|
Task progress events |
SP → Platform |
|
Detected anomalies |
SP → Platform |
|
Pattern detection results |
SP → Platform |
|
Coverage analysis results |
SP → Platform |
|
Zone congestion alerts |
Pipeline Examples¶
Battery Degradation Detection¶
[Kafka Adapter] [Trend Detector] [Alert Sink]
tb.telemetry.battery ──► Window: 1 hour ──► sp.analytics.anomaly
Detect: drain rate
increasing > 2x
normal
Detects devices whose battery drain rate is accelerating beyond normal patterns, indicating potential hardware degradation or excessive workload.
Zone Congestion Analysis¶
[Kafka Adapter] [Geo Aggregation] [Threshold Filter] [Alert Sink]
tb.telemetry.position ──► Group by: zone ──► Count > threshold ──► sp.analytics.congestion
Window: 5 min per zone type
Counts devices per zone within a sliding time window. When device density exceeds zone capacity thresholds, triggers congestion alerts that the Planner can use for rerouting.
Cleaning Coverage Verification¶
[Kafka Adapter] [Area Calculator] [Gap Detector] [Result Sink]
tb.telemetry.position ──► Calculate swept ──► Compare to zone ──► sp.analytics.coverage
(cleaning robots only) area per zone total area
Window: per shift Flag uncovered
regions
For ClearJanitor: computes actual cleaning coverage from robot trajectories and identifies uncovered areas within each zone.
Pipeline Builder¶
StreamPipes provides a visual pipeline builder accessible to platform operators. This allows creating custom analytics pipelines without code:
Adapters — data sources (Kafka topics, HTTP endpoints, OPC-UA)
Processing elements — filters, transforms, aggregations, ML inference
Data sinks — Kafka, databases, notifications, dashboards
StreamPipes ships with 100+ prebuilt pipeline elements. Custom elements can be added as Docker containers following the StreamPipes SDK.
Deployment¶
StreamPipes runs on the shared EKS cluster:
Component |
Purpose |
|---|---|
Backend |
Core API, pipeline management, user management |
UI |
Web-based pipeline builder and monitoring |
Extensions (all-jvm) |
Prebuilt adapters, processors, and sinks |
Consul |
Service discovery for pipeline elements |
CouchDB |
Pipeline and user data storage |
InfluxDB |
Internal metrics and pipeline results |
Resource Requirements¶
Scale |
Pipelines |
CPU |
Memory |
|---|---|---|---|
Small (dev) |
< 10 |
2 cores |
4 GB |
Medium |
10–50 |
4 cores |
8 GB |
Large |
50+ |
8+ cores |
16+ GB |
StreamPipes uses a dedicated MSK cluster (Kafka Cluster B), separate from ThingsBoard’s MSK cluster (Kafka Cluster A). Cross-cluster replication delivers telemetry topics from Cluster A to Cluster B. See the IoT Layer overview for the cluster topology.
Boundaries¶
StreamPipes handles:
Complex stream analytics beyond basic thresholds
Multi-source data correlation and aggregation
Machine learning model inference on telemetry data
Visual pipeline builder for custom analytics
Pattern detection and anomaly analysis
StreamPipes does not handle:
Device connectivity → ThingsBoard (via DeviceAdmin)
Raw telemetry storage → ThingsBoard
Simple threshold alerts → ThingsBoard rule engine (via DeviceAdmin)
OTA updates → hawkBit (via OTAForge)
Device provisioning → DeviceAdmin
Dashboard account scoping → ThingIO (wraps StreamPipes with access control)