Stream
Stream (stream)
Track per-key changes and emit deltas, elapsed times, and optional aggregates.
Stateful Transform json
Minimal example
actions: - stream: watch: ""JSON
{ "actions": [ { "stream": { "watch": "" } } ]}Contents
Aggregations
Aggregations
| Field | Type | Required | Description |
|---|---|---|---|
aggregations | Stream Aggregation Spec[] | Optional incremental aggregates maintained per key. |
Behaviour
Behaviour
| Field | Type | Required | Description |
|---|---|---|---|
delta ✓ | boolean (bool) | Emit the difference between the current and last value. Default: false | |
only-changes ✓ | boolean (bool) | Only emit records when the watched field changes. Default: false | |
reset-on-document-end ✓ | boolean (bool) | Reset state when a document end marker is observed. Default: false |
General
General
| Field | Type | Required | Description |
|---|---|---|---|
description | string | Describe this step. | |
condition | lua-expression (string) | Only run this action when the Lua condition evaluates to true. Examples: 2 * count() |
Input
Input
| Field | Type | Required | Description |
|---|---|---|---|
watch | field (string) | ✅ | Field whose value should be tracked. Examples: data_field |
group-by | field (string) | Group tracking state by this field’s value. Examples: data_field | |
input-time | field (string) | Source field providing event time for elapsed calculations. Examples: data_field |
Missing Data
Missing Data
| Field | Type | Required | Description |
|---|---|---|---|
fill-strategy | Fill Strategy | Strategy applied when the watched value is missing. Allowed values: forward-fill, default-value |
Output
Output
| Field | Type | Required | Description |
|---|---|---|---|
marker | string | Optional marker added to generated events (when not enriching). | |
output-field | field (string) | Field name used for the emitted delta. Examples: data_field | |
elapsed-field | field (string) | Field name capturing elapsed milliseconds. Examples: data_field |
Recency
Recency
| Field | Type | Required | Description |
|---|---|---|---|
recency | Recency Output | Emit a boolean indicating whether the key changed within a threshold. |
Resources
Resources
| Field | Type | Required | Description |
|---|---|---|---|
max-keys | number (integer) | Maximum number of concurrent keys tracked. Examples: 42, 1.2e-10 | |
eviction | Stream Eviction Policy | Eviction policy applied when max-keys is exceeded.Allowed values: drop-new, lru, ttl |
Windowing
Windowing
| Field | Type | Required | Description |
|---|---|---|---|
window-size | number (integer) | Maximum number of recent events to keep per key when computing aggregates. Examples: 42, 1.2e-10 |
Schema
- Fill Strategy Options
- Stream Eviction Policy Options
- Stream Aggregation Spec Fields
- Fill Strategy - Default Value Fields
- Recency Output Fields
- Stream Eviction Policy - Ttl Fields
- Stream Aggregation Spec - Stream Aggregation Operation Options
Fill Strategy Options
| Option | Name | Type | Description |
|---|---|---|---|
forward-fill | Forward Fill | map | Carry forward the previous value when the field is missing. |
default-value | Default Value | object | Substitute a default JSON value when the field is missing. |
Stream Eviction Policy Options
| Option | Name | Type | Description |
|---|---|---|---|
drop-new | Drop New | map | Drop new keys when the limit is reached. |
lru | Lru | map | Evict the least-recently updated key. |
ttl | Ttl | object | Evict keys that have been idle longer than the configured duration. |
Stream Aggregation Spec Fields
| Field | Type | Required | Description |
|---|---|---|---|
field | field (string) | ✅ | Source field used for the aggregation. Examples: data_field |
op | Stream Aggregation Operation | Aggregation operation. Allowed values: avg, mean, min, max, first, last, sum, stddev, range, earliest, … | |
r-as | field (string) | Optional alias for the output field. Examples: data_field | |
window-size | number (integer) | Override window size for this aggregation (falls back to stream-level window). Examples: 42, 1.2e-10 | |
percentile | number (integer) | Percentile to compute when op = percentile (0-100).Examples: 42, 1.2e-10 |
Fill Strategy - Default Value Fields
| Field | Type | Required | Description |
|---|---|---|---|
value | map (object) | ✅ |
Recency Output Fields
| Field | Type | Required | Description |
|---|---|---|---|
threshold-ms | duration (integer) | ✅ | Threshold in milliseconds for considering a key “recent”. |
output-field | field (string) | Field where the recency flag is written. Examples: data_field |
Stream Eviction Policy - Ttl Fields
| Field | Type | Required | Description |
|---|---|---|---|
max-idle | string | ✅ |
Stream Aggregation Spec - Stream Aggregation Operation Options
| Value | Name | Description |
|---|---|---|
avg | avg | Avg |
mean | mean | Mean |
min | min | Min |
max | max | Max |
first | first | First |
last | last | Last |
sum | sum | Sum |
stddev | stddev | Stddev |
range | range | Range |
earliest | earliest | Earliest |
latest | latest | Latest |
count | count | Count |
distinct-count | distinct-count | Distinct Count |
median | median | Median |
percentile | percentile | Percentile |
variance | variance | Variance |
variance-population | variance-population | Variance Population |
stddev-population | stddev-population | Stddev Population |
sum-squares | sum-squares | Sum Squares |
mode | mode | Mode |
list | list | List |
values | values | Values |