kafka.to() function
kafka.to()
sends data to Apache Kafka brokers.
Function type signature
(
<-tables: stream[A],
brokers: [string],
topic: string,
?balancer: string,
?name: string,
?nameColumn: string,
?tagColumns: [string],
?timeColumn: string,
?valueColumns: [string],
) => stream[A] where A: Record
Parameters
brokers
(Required) List of Kafka brokers to send data to.
topic
(Required) Kafka topic to send data to.
balancer
Kafka load balancing strategy. Default is hash
.
The load balancing strategy determines how messages are routed to partitions available on a Kafka cluster. The following strategies are available:
- hash: Uses a hash of the group key to determine which Kafka partition to route messages to. This ensures that messages generated from rows in the table are routed to the same partition.
- round-robin: Equally distributes messages across all available partitions.
- least-bytes: Routes messages to the partition that has received the least amount of data.
name
Kafka metric name. Default is the value of the nameColumn
.
nameColumn
Column to use as the Kafka metric name.
Default is _measurement
.
timeColumn
Time column. Default is _time
.
tagColumns
List of tag columns in input data.
valueColumns
List of value columns in input data. Default is ["_value"]
.
tables
Input data. Default is piped-forward data (<-
).
Examples
Send data to Kafka
import "kafka"
import "sampledata"
sampledata.int()
|> kafka.to(
brokers: ["http://127.0.0.1:9092"],
topic: "example-topic",
name: "example-metric-name",
tagColumns: ["tag"],
)
Was this page helpful?
Thank you for your feedback!
Support and feedback
Thank you for being part of our community! We welcome and encourage your feedback and bug reports for Flux and this documentation. To find support, use the following resources:
Customers with an annual or support contract can contact InfluxData Support.