Kafka
Kafka (kafka)
Consume records from Kafka/Redpanda clusters.
Messaging binary json raw
Minimal example
input: kafka: bootstrap-servers: ""JSON
{ "input": { "kafka": { "bootstrap-servers": "" } }}Contents
Advanced
Advanced
| Field | Type | Required | Description |
|---|---|---|---|
global-options ✓ | map (string) | Additional librdkafka consumer configuration. | |
topic-options ✓ | map (string) | Topic-level librdkafka configuration overrides. |
Authentication
Authentication
| Field | Type | Required | Description |
|---|---|---|---|
auth | Kafka Authentication | Authentication and TLS material. |
Connection
Connection
| Field | Type | Required | Description |
|---|---|---|---|
bootstrap-servers | hostname (string) | ✅ | Comma separated bootstrap server list. Examples: example.com, localhost |
client-id | string | Optional client identifier override. | |
group-id | string | Consumer group identifier (required for coordinated consumption). |
Cursor
Cursor
| Field | Type | Required | Description |
|---|---|---|---|
cursor | Kafka Cursor | Cursor persistence configuration for incremental polling. |
Payload
Payload
| Field | Type | Required | Description |
|---|---|---|---|
payload | Kafka Payload Format | Controls how payload bytes should be interpreted. Allowed values: json, raw, avro-bridge |
Processing
Processing
| Field | Type | Required | Description |
|---|---|---|---|
batch | Batch | Optional batching semantics shared with other inputs. |
Reliability
Reliability
| Field | Type | Required | Description |
|---|---|---|---|
retry | Retry | Retry configuration for transient failures. |
Schema
Schema
| Field | Type | Required | Description |
|---|---|---|---|
avro | Schema Registry | Schema registry configuration (required for Avro bridge). |
Subscription
Subscription
| Field | Type | Required | Description |
|---|---|---|---|
topics | string[] | Explicit list of topics to subscribe to. | |
topic-pattern | string | Regex pattern for dynamic topic subscription. | |
partitions ✓ | map (string) | Explicit partition list per topic. | |
assignments | Assignments[] | Static assignments of topic/partition pairs. |
Trigger
Trigger
| Field | Type | Required | Description |
|---|---|---|---|
trigger | Trigger | Run on the provided cadence; if omitted, this input runs exactly once. Allowed values: message, cron, interval |
Schema
- Trigger - Cron - Window - Window Start Options
- Trigger - Interval - Window - Window Start Options
- Trigger Options
- Kafka Authentication - Kafka SASL Fields
- Kafka Authentication - Kafka TLS Fields
- Kafka Authentication Fields
- Kafka Cursor Fields
- Batch Fields
- Retry Fields
- Schema Registry Fields
- Assignments Fields
- Trigger - Message Fields
- Trigger - Cron - Window - Window Start - Start Time Fields
- Trigger - Cron - Window Fields
- Trigger - Cron Fields
- Trigger - Interval - Window - Window Start - Start Time Fields
- Trigger - Interval - Window Fields
- Trigger - Interval Fields
- Global Options Table
- Topic Options Table
- Kafka Authentication - Kafka SASL - Extensions Table
- Kafka Authentication - Config Table
- Kafka Cursor - Per Partition Table
- Kafka Authentication - Kafka SASL - Kafka SASL Mechanism Options
- Kafka Cursor - Cursor Mode Options
- Kafka Payload Format Options
- Trigger - Message - Filter Kind Options
- Trigger - Message - Filter Source Options
- Trigger - Message - Filter Type Options
Trigger - Cron - Window - Window Start Options
| Option | Name | Type | Description |
|---|---|---|---|
start-time | Start Time | object | |
tracked | Tracked | string | Examples: /path/to/file, c:\users\joe\data\file.txt |
Trigger - Interval - Window - Window Start Options
| Option | Name | Type | Description |
|---|---|---|---|
start-time | Start Time | object | |
tracked | Tracked | string | Examples: /path/to/file, c:\users\joe\data\file.txt |
Trigger Options
| Option | Name | Type | Description |
|---|---|---|---|
message | Message | object | |
cron | Cron | object | |
interval | Interval | object |
Kafka Authentication - Kafka SASL Fields
| Field | Type | Required | Description |
|---|---|---|---|
mechanism | Kafka SASL Mechanism | ✅ | Allowed values: plain, scram-sha256, scram-sha512, o-auth-bearer |
username ✓ | string | ||
password ✓ | string | ||
token ✓ | string | ||
token-command ✓ | string | ||
extensions ✓ | map (string) |
Kafka Authentication - Kafka TLS Fields
| Field | Type | Required | Description |
|---|---|---|---|
ca-certificate | string | ||
client-certificate | string | ||
client-key | string | ||
insecure-skip-verify ✓ | boolean (bool) | Default: false |
Kafka Authentication Fields
| Field | Type | Required | Description |
|---|---|---|---|
sasl | Kafka SASL | ||
tls | Kafka TLS | ||
config ✓ | map (string) | Additional librdkafka configuration forwarded verbatim. |
Kafka Cursor Fields
| Field | Type | Required | Description |
|---|---|---|---|
mode | Cursor Mode | Default: offsetAllowed values: offset, timestamp, custom | |
persist-to | string | ||
initial-offset | string | ||
initial-timestamp | string | ||
per-partition | map (string) | Per-topic/partition initial offsets in the form topic:partition -> offset. | |
custom-value | string | ||
state-id | string | Optional human-readable identifier used to stabilise cursor persistence across redeployments. |
Batch Fields
| Field | Type | Required | Description |
|---|---|---|---|
uuid-field | field (string) | Field where generated uuid, the unique marker for the group, will be stored. Examples: data_field | |
invocation-time-field | field (string) | Field where invocation time will be stored. Examples: data_field | |
completion-time-field | field (string) | Field where completion (end of execution) time will be stored. Examples: data_field | |
begin-marker-field | field (string) | Field used to mark first event in the group. Examples: data_field | |
end-marker-field | field (string) | Field used to mark last event in the group. Examples: data_field | |
line-count-field | field (string) | Field used to store the line count of the batch. Examples: data_field | |
line-num-field | field (string) | Field used to store the line number of the batch. Examples: data_field |
Retry Fields
| Field | Type | Required | Description |
|---|---|---|---|
timeout | time-interval (string) | ✅ | timeout (e.g. 500ms, 2s etc. - default is 30). Examples: 500ms, 2h |
retries | number (integer) | number of retries. Examples: 42, 1.2e-10 |
Schema Registry Fields
| Field | Type | Required | Description |
|---|---|---|---|
url | url (string) | ✅ | Examples: https://example.com/path |
username | string | ||
password | string | ||
access-token | string | ||
subject-suffix | string | Optional suffix appended to subject names (e.g. -value). | |
use-latest ✓ | boolean (bool) | When true, fetch the latest compatible schema instead of an explicit version. Default: false |
Assignments Fields
| Field | Type | Required | Description |
|---|---|---|---|
topic | string | ✅ | |
partition ✓ | string | ✅ | |
offset | string |
Trigger - Message Fields
| Field | Type | Required | Description |
|---|---|---|---|
limit ✓ | number (integer) | The number of times to run the input. Examples: 42, 1.2e-10 | |
filter-kind | Filter Kind | Specifies whether the message originated from the “system” or by the “user”. Allowed values: system, user, runtime-artifact-fetch, runtime-artifact-fetch-error, runtime-artifact-clear, runtime-artifact-clear-ack, runtime-artifact-fetch-reply, runtime-artifact-update, runtime-artifact-update-ack | |
filter-source | Filter Source[] | Specifies what process generated the message. Was it a “server”, “worker” or “job”? Allowed values: job, worker, server | |
filter-worker | string | Specifies what worker to select. | |
filter-job | string | Specifies the name of the job that the message came from. | |
filter-type | Filter Type[] | Specifies that particular types of message ought to match. Allowed values: worker-licensed, worker-unlicensed, variable, variable-deleted, begin-shutting-down-job, begin-shutting-down-server, begin-shutting-down-worker, broadcast-job-thread-state, broadcast-server-thread-state, broadcast-worker-thread-state, … | |
filter-tag | string | Specifies that messages matched ought to carry a tag with a particular value. This only matches against user-generated messages. |
Trigger - Cron - Window - Window Start - Start Time Fields
| Field | Type | Required | Description |
|---|---|---|---|
start-time | time-format (string) | ✅ | Allows the windowing to start at a specified time. Hint: %Y-%m-%d %H:%M:%S%.3f %z |
highwatermark-file | path (string) | ✅ | Specify file where timestamp would be stored in order to resume, for when Job has been restarted. Examples: /path/to/file, c:\users\joe\data\file.txt |
Trigger - Cron - Window Fields
| Field | Type | Required | Description |
|---|---|---|---|
size | duration (string) | ✅ | Window size. |
offset | duration (string) | Window offset. Default: 0s | |
start | Window Start | Specify file where timestamp would be stored in order to resume, for when Job has been restarted. Allowed values: start-time, tracked |
Trigger - Cron Fields
| Field | Type | Required | Description |
|---|---|---|---|
cron | cron-expression (string) | ✅ | The Cron pattern. |
immediate ✓ | boolean (bool) | Run as soon as invoked, instead of waiting for the specified cron interval. Default: false | |
random-offset | duration (string) | Sets a random offset to the schedule, then sticks to it. Default: 0s | |
window | Window | Optional window definition when the schedule should only read a bounded range. |
Trigger - Interval - Window - Window Start - Start Time Fields
| Field | Type | Required | Description |
|---|---|---|---|
start-time | time-format (string) | ✅ | Allows the windowing to start at a specified time. Hint: %Y-%m-%d %H:%M:%S%.3f %z |
highwatermark-file | path (string) | ✅ | Specify file where timestamp would be stored in order to resume, for when Job has been restarted. Examples: /path/to/file, c:\users\joe\data\file.txt |
Trigger - Interval - Window Fields
| Field | Type | Required | Description |
|---|---|---|---|
size | duration (string) | ✅ | Window size. |
offset | duration (string) | Window offset. Default: 0s | |
start | Window Start | Specify file where timestamp would be stored in order to resume, for when Job has been restarted. Allowed values: start-time, tracked |
Trigger - Interval Fields
| Field | Type | Required | Description |
|---|---|---|---|
duration | duration (string) | ✅ | Duration to wait between events. |
random-offset | duration (string) | Sets a random offset to the schedule, then sticks to it. Default: 0s | |
window | Window | Optional window definition when the interval should only cover a bounded range. |
Global Options Table
| Option | Value |
|---|---|
option.name | value |
Value format: templated-text.
Topic Options Table
| Topic option | Value |
|---|---|
option.name | value |
Value format: templated-text.
Kafka Authentication - Kafka SASL - Extensions Table
| Extension | Value |
|---|---|
key | value |
Value format: templated-text.
Kafka Authentication - Config Table
| Option | Value |
|---|---|
option.name | value |
Value format: templated-text.
Kafka Cursor - Per Partition Table
| Topic:Partition | Offset |
|---|---|
topic:partition | offset |
Kafka Authentication - Kafka SASL - Kafka SASL Mechanism Options
| Value | Name | Description |
|---|---|---|
plain | plain | Plain |
scram-sha256 | scram-sha256 | Scram Sha256 |
scram-sha512 | scram-sha512 | Scram Sha512 |
o-auth-bearer | o-auth-bearer | O Auth Bearer |
Kafka Cursor - Cursor Mode Options
| Value | Name | Description |
|---|---|---|
offset | offset | Offset |
timestamp | timestamp | Timestamp |
custom | custom | Custom |
Kafka Payload Format Options
| Value | Name | Description |
|---|---|---|
json | json | Json |
raw | raw | Raw |
avro-bridge | avro-bridge | Avro Bridge |
Trigger - Message - Filter Kind Options
| Value | Name | Description |
|---|---|---|
system | system | System |
user | user | User |
runtime-artifact-fetch | runtime-artifact-fetch | Runtime Artifact Fetch |
runtime-artifact-fetch-error | runtime-artifact-fetch-error | Runtime Artifact Fetch Error |
runtime-artifact-clear | runtime-artifact-clear | Runtime Artifact Clear |
runtime-artifact-clear-ack | runtime-artifact-clear-ack | Runtime Artifact Clear Ack |
runtime-artifact-fetch-reply | runtime-artifact-fetch-reply | Runtime Artifact Fetch Reply |
runtime-artifact-update | runtime-artifact-update | Runtime Artifact Update |
runtime-artifact-update-ack | runtime-artifact-update-ack | Runtime Artifact Update Ack |
Trigger - Message - Filter Source Options
| Value | Name | Description |
|---|---|---|
job | job | Job |
worker | worker | Worker |
server | server | Server |
Trigger - Message - Filter Type Options
| Value | Name | Description |
|---|---|---|
worker-licensed | worker-licensed | Worker Licensed |
worker-unlicensed | worker-unlicensed | Worker Unlicensed |
variable | variable | Variable |
variable-deleted | variable-deleted | Variable Deleted |
begin-shutting-down-job | begin-shutting-down-job | Begin Shutting Down Job |
begin-shutting-down-server | begin-shutting-down-server | Begin Shutting Down Server |
begin-shutting-down-worker | begin-shutting-down-worker | Begin Shutting Down Worker |
broadcast-job-thread-state | broadcast-job-thread-state | Broadcast Job Thread State |
broadcast-server-thread-state | broadcast-server-thread-state | Broadcast Server Thread State |
broadcast-worker-thread-state | broadcast-worker-thread-state | Broadcast Worker Thread State |
check-job-report-time | check-job-report-time | Check Job Report Time |
check-worker-report-time | check-worker-report-time | Check Worker Report Time |
de-register-job-thread-dependency | de-register-job-thread-dependency | De Register Job Thread Dependency |
de-register-server-thread-dependency | de-register-server-thread-dependency | De Register Server Thread Dependency |
de-register-worker-thread-dependency | de-register-worker-thread-dependency | De Register Worker Thread Dependency |
deployed-job-active | deployed-job-active | Deployed Job Active |
deployed-job-removed | deployed-job-removed | Deployed Job Removed |
deployed-job-should-be-running | deployed-job-should-be-running | Deployed Job Should Be Running |
deployment-phase | deployment-phase | Deployment Phase |
heart-beat | heart-beat | Heart Beat |
initialise-internal-state | initialise-internal-state | Initialise Internal State |
initialise-job-states | initialise-job-states | Initialise Job States |
job-batch-end | job-batch-end | Job Batch End |
job-backlog-update | job-backlog-update | Job Backlog Update |
job-checkpoint-update | job-checkpoint-update | Job Checkpoint Update |
job-deploy-ready | job-deploy-ready | Job Deploy Ready |
job-deployed | job-deployed | Job Deployed |
job-document-end | job-document-end | Job Document End |
job-document-start | job-document-start | Job Document Start |
job-errors | job-errors | Job Errors |
job-execution-anomaly | job-execution-anomaly | Job Execution Anomaly |
job-execution-status | job-execution-status | Job Execution Status |
job-emit-custom | job-emit-custom | Job Emit Custom |
job-finished | job-finished | Job Finished |
job-idle | job-idle | Job Idle |
job-initiated | job-initiated | Job Initiated |
job-is-processing | job-is-processing | Job Is Processing |
job-logs | job-logs | Job Logs |
job-metrics | job-metrics | Job Metrics |
job-notifications | job-notifications | Job Notifications |
job-removing | job-removing | Job Removing |
job-removed | job-removed | Job Removed |
job-remove-failed | job-remove-failed | Job Remove Failed |
job-remove-ready | job-remove-ready | Job Remove Ready |
job-replaced | job-replaced | Job Replaced |
job-required | job-required | Job Required |
job-run-ended | job-run-ended | Job Run Ended |
job-runtime-error | job-runtime-error | Job Runtime Error |
job-runtime-settings | job-runtime-settings | Job Runtime Settings |
job-run-started | job-run-started | Job Run Started |
job-running-docker | job-running-docker | Job Running Docker |
job-running-script | job-running-script | Job Running Script |
job-running-subprocess | job-running-subprocess | Job Running Subprocess |
job-running-system-d | job-running-system-d | Job Running System D |
job-settings | job-settings | Job Settings |
job-step-statistics | job-step-statistics | Job Step Statistics |
job-started | job-started | Job Started |
job-staged | job-staged | Job Staged |
job-state-transition | job-state-transition | Job State Transition |
job-shutting-down | job-shutting-down | Job Shutting Down |
job-stopping | job-stopping | Job Stopping |
job-stopped | job-stopped | Job Stopped |
job-timed-out | job-timed-out | Job Timed Out |
job-suspicious-silence | job-suspicious-silence | Job Suspicious Silence |
job-thread-state | job-thread-state | Job Thread State |
job-trace | job-trace | Job Trace |
job-trace-requires-samples | job-trace-requires-samples | Job Trace Requires Samples |
job-updated | job-updated | Job Updated |
job-worker-comms-error | job-worker-comms-error | Job Worker Comms Error |
job-unstaged | job-unstaged | Job Unstaged |
license-state-changed | license-state-changed | License State Changed |
license-validation-failed | license-validation-failed | License Validation Failed |
license-validation-ok | license-validation-ok | License Validation Ok |
license-volume-violation | license-volume-violation | License Volume Violation |
new-license | new-license | New License |
override-job-coordinated-shutdown | override-job-coordinated-shutdown | Override Job Coordinated Shutdown |
override-server-coordinated-shutdown | override-server-coordinated-shutdown | Override Server Coordinated Shutdown |
override-worker-coordinated-shutdown | override-worker-coordinated-shutdown | Override Worker Coordinated Shutdown |
register-job-thread-dependency | register-job-thread-dependency | Register Job Thread Dependency |
register-server-thread-dependency | register-server-thread-dependency | Register Server Thread Dependency |
register-worker-thread-dependency | register-worker-thread-dependency | Register Worker Thread Dependency |
run-job-failure | run-job-failure | Run Job Failure |
server-logs | server-logs | Server Logs |
server-metrics-batch | server-metrics-batch | Server Metrics Batch |
server-started | server-started | Server Started |
server-starting | server-starting | Server Starting |
server-stopping | server-stopping | Server Stopping |
server-thread-state | server-thread-state | Server Thread State |
server-worker-comms-error | server-worker-comms-error | Server Worker Comms Error |
shutdown-jobs | shutdown-jobs | Shutdown Jobs |
shutdown-worker | shutdown-worker | Shutdown Worker |
system-shutdown | system-shutdown | System Shutdown |
update-upstream-sync-for-job | update-upstream-sync-for-job | Update Upstream Sync For Job |
update-upstream-sync-for-worker | update-upstream-sync-for-worker | Update Upstream Sync For Worker |
update-variable | update-variable | Update Variable |
user-alert | user-alert | User Alert |
user-generated | user-generated | User Generated |
user-notification | user-notification | User Notification |
worker-command-for-job | worker-command-for-job | Worker Command For Job |
worker-auth-lease-ack | worker-auth-lease-ack | Worker Auth Lease Ack |
worker-connected | worker-connected | Worker Connected |
worker-created | worker-created | Worker Created |
worker-debug-heart-beat | worker-debug-heart-beat | Worker Debug Heart Beat |
worker-error | worker-error | Worker Error |
worker-first-seen | worker-first-seen | Worker First Seen |
worker-heart-beat | worker-heart-beat | Worker Heart Beat |
worker-logs | worker-logs | Worker Logs |
worker-metrics-batch | worker-metrics-batch | Worker Metrics Batch |
worker-offline | worker-offline | Worker Offline |
worker-requests-auth-lease | worker-requests-auth-lease | Worker Requests Auth Lease |
worker-server-comms-error | worker-server-comms-error | Worker Server Comms Error |
worker-settings | worker-settings | Worker Settings |
worker-shutdown | worker-shutdown | Worker Shutdown |
worker-shutting-down | worker-shutting-down | Worker Shutting Down |
worker-started | worker-started | Worker Started |
worker-state-uuid | worker-state-uuid | Worker State Uuid |
worker-stopping | worker-stopping | Worker Stopping |
worker-suspicious-silence | worker-suspicious-silence | Worker Suspicious Silence |
worker-system-information | worker-system-information | Worker System Information |
worker-thread-state | worker-thread-state | Worker Thread State |
worker-updated | worker-updated | Worker Updated |
worker-modified | worker-modified | Worker Modified |
worker-removed | worker-removed | Worker Removed |
context-changed | context-changed | Context Changed |
rerender-deployment | rerender-deployment | Rerender Deployment |
job-killed | job-killed | Job Killed |
message-serviced | message-serviced | Message Serviced |
failed-to-service-message | failed-to-service-message | Failed To Service Message |
worker-wants-initial-settings | worker-wants-initial-settings | Worker Wants Initial Settings |
worker-wants-initial-settings-reply | worker-wants-initial-settings-reply | Worker Wants Initial Settings Reply |
worker-wants-deployed-jobs | worker-wants-deployed-jobs | Worker Wants Deployed Jobs |
worker-wants-deployed-jobs-reply | worker-wants-deployed-jobs-reply | Worker Wants Deployed Jobs Reply |
worker-wants-job-configuration | worker-wants-job-configuration | Worker Wants Job Configuration |
worker-wants-job-configuration-reply | worker-wants-job-configuration-reply | Worker Wants Job Configuration Reply |
job-wants-dslir-key | job-wants-dslir-key | Job Wants Dslir Key |
job-wants-dslir-key-reply | job-wants-dslir-key-reply | Job Wants Dslir Key Reply |
worker-wants-dslir-key | worker-wants-dslir-key | Worker Wants Dslir Key |
worker-wants-dslir-key-reply | worker-wants-dslir-key-reply | Worker Wants Dslir Key Reply |
job-wants-variables | job-wants-variables | Job Wants Variables |
job-wants-variables-reply | job-wants-variables-reply | Job Wants Variables Reply |
job-wants-credentials | job-wants-credentials | Job Wants Credentials |
job-wants-credentials-reply | job-wants-credentials-reply | Job Wants Credentials Reply |
job-wants-credentials-error | job-wants-credentials-error | Job Wants Credentials Error |
job-wants-secret-variables-reply | job-wants-secret-variables-reply | Job Wants Secret Variables Reply |
job-credentials-invalidated | job-credentials-invalidated | Job Credentials Invalidated |
aggregator-health | aggregator-health | Aggregator Health |
worker-aggregator-health | worker-aggregator-health | Worker Aggregator Health |
worker-verification-token | worker-verification-token | Worker Verification Token |
worker-requests-verification-token | worker-requests-verification-token | Worker Requests Verification Token |
runtime-artifact-update | runtime-artifact-update | Runtime Artifact Update |
runtime-artifact-update-ack | runtime-artifact-update-ack | Runtime Artifact Update Ack |
runtime-artifact-clear | runtime-artifact-clear | Runtime Artifact Clear |
runtime-artifact-clear-ack | runtime-artifact-clear-ack | Runtime Artifact Clear Ack |
runtime-artifact-fetch | runtime-artifact-fetch | Runtime Artifact Fetch |
runtime-artifact-fetch-reply | runtime-artifact-fetch-reply | Runtime Artifact Fetch Reply |
runtime-artifact-fetch-error | runtime-artifact-fetch-error | Runtime Artifact Fetch Error |