Skip to content

Stream

Stream (stream)

Track per-key changes and emit deltas, elapsed times, and optional aggregates.

Stateful Transform json

Minimal example

actions:
- stream:
watch: ""
JSON
{
"actions": [
{
"stream": {
"watch": ""
}
}
]
}

Contents

Aggregations

Aggregations
FieldTypeRequiredDescription
aggregationsStream Aggregation Spec[]Optional incremental aggregates maintained per key.

Behaviour

Behaviour
FieldTypeRequiredDescription
deltaboolean (bool)Emit the difference between the current and last value.
Default: false
only-changesboolean (bool)Only emit records when the watched field changes.
Default: false
reset-on-document-endboolean (bool)Reset state when a document end marker is observed.
Default: false

General

General
FieldTypeRequiredDescription
descriptionstringDescribe this step.
conditionlua-expression (string)Only run this action when the Lua condition evaluates to true.
Examples: 2 * count()

Input

Input
FieldTypeRequiredDescription
watchfield (string)Field whose value should be tracked.
Examples: data_field
group-byfield (string)Group tracking state by this field’s value.
Examples: data_field
input-timefield (string)Source field providing event time for elapsed calculations.
Examples: data_field

Missing Data

Missing Data
FieldTypeRequiredDescription
fill-strategyFill StrategyStrategy applied when the watched value is missing.
Allowed values: forward-fill, default-value

Output

Output
FieldTypeRequiredDescription
markerstringOptional marker added to generated events (when not enriching).
output-fieldfield (string)Field name used for the emitted delta.
Examples: data_field
elapsed-fieldfield (string)Field name capturing elapsed milliseconds.
Examples: data_field

Recency

Recency
FieldTypeRequiredDescription
recencyRecency OutputEmit a boolean indicating whether the key changed within a threshold.

Resources

Resources
FieldTypeRequiredDescription
max-keysnumber (integer)Maximum number of concurrent keys tracked.
Examples: 42, 1.2e-10
evictionStream Eviction PolicyEviction policy applied when max-keys is exceeded.
Allowed values: drop-new, lru, ttl

Windowing

Windowing
FieldTypeRequiredDescription
window-sizenumber (integer)Maximum number of recent events to keep per key when computing aggregates.
Examples: 42, 1.2e-10

Schema

Fill Strategy Options

OptionNameTypeDescription
forward-fillForward FillmapCarry forward the previous value when the field is missing.
default-valueDefault ValueobjectSubstitute a default JSON value when the field is missing.

Stream Eviction Policy Options

OptionNameTypeDescription
drop-newDrop NewmapDrop new keys when the limit is reached.
lruLrumapEvict the least-recently updated key.
ttlTtlobjectEvict keys that have been idle longer than the configured duration.

Stream Aggregation Spec Fields

FieldTypeRequiredDescription
fieldfield (string)Source field used for the aggregation.
Examples: data_field
opStream Aggregation OperationAggregation operation.
Allowed values: avg, mean, min, max, first, last, sum, stddev, range, earliest, …
r-asfield (string)Optional alias for the output field.
Examples: data_field
window-sizenumber (integer)Override window size for this aggregation (falls back to stream-level window).
Examples: 42, 1.2e-10
percentilenumber (integer)Percentile to compute when op = percentile (0-100).
Examples: 42, 1.2e-10

Fill Strategy - Default Value Fields

FieldTypeRequiredDescription
valuemap (object)

Recency Output Fields

FieldTypeRequiredDescription
threshold-msduration (integer)Threshold in milliseconds for considering a key “recent”.
output-fieldfield (string)Field where the recency flag is written.
Examples: data_field

Stream Eviction Policy - Ttl Fields

FieldTypeRequiredDescription
max-idlestring

Stream Aggregation Spec - Stream Aggregation Operation Options

ValueNameDescription
avgavgAvg
meanmeanMean
minminMin
maxmaxMax
firstfirstFirst
lastlastLast
sumsumSum
stddevstddevStddev
rangerangeRange
earliestearliestEarliest
latestlatestLatest
countcountCount
distinct-countdistinct-countDistinct Count
medianmedianMedian
percentilepercentilePercentile
variancevarianceVariance
variance-populationvariance-populationVariance Population
stddev-populationstddev-populationStddev Population
sum-squaressum-squaresSum Squares
modemodeMode
listlistList
valuesvaluesValues