Kafka event handler

Apache Kafka is a distributed streaming platform designed for building real-time data pipelines and streaming apps. Configure Kapacitor to send alert messages to a Kafka cluster.


Configuration as well as default option values for the Kafka event handler are set in your kapacitor.conf. Below is an example configuration:

  enabled = true
  id = "localhost"
  brokers = []
  timeout = "10s"
  batch-size = 100
  batch-timeout = "1s"
  use-ssl = false
  ssl-ca = ""
  ssl-cert = ""
  ssl-key = ""
  insecure-skip-verify = false
  # Optional SASL configuration
  sasl-username = "xxxxx"
  sasl-password = "xxxxxxxx"
  sasl-mechanism = ""
  sasl-version = ""
  # Use if sasl-mechanism is GSSAPI. GSSAPI is for organizations using Kerberos.
  sasl-gssapi-service-name = ""
  sasl-gssapi-auth-type = "KRB5_USER_AUTH"
  sasl-gssapi-disable-pafxfast = false
  sasl-gssapi-kerberos-config-path = "/"
  sasl-gssapi-key-tab-path = ""
  sasl-gssapi-realm = "realm"
  # Use if sasl-mechanism is `OAUTHBEARER` (experimental).
  sasl-access-token = ""

Multiple Kafka clients may be configured with multiple [[kafka]] sections in TOML. The id acts as a unique identifier for each configured Kafka client.


Set to true to enable the Kafka event handler.


A unique identifier for the Kafka cluster.


List of Kafka broker addresses using the host:port format.


Timeout on network operations with the Kafka brokers. If set to 0, a default of 10s is used.


The number of messages batched before being sent to Kafka. If set to 0, a default of 100 is used.


The maximum amount of time to wait before flushing an incomplete batch. If set to 0, a default of 1s is used.


Enable SSL communication. Must be true for other SSL options to take effect.


Path to certificate authority file.


Path to host certificate file.


Path to certificate private key file.


Use SSL but skip chain and host verification. (Required if using a self-signed certificate.)

(Optional) SASL configuration


Username to use for SASL authentication.


Password to use for SASL authentication.


SASL mechanism type. Options include GSSAPI, OAUTHBEARER, PLAIN.


SASL protocol version.


The service name for GSSAPI.


The authorization type for GSSAPI.


Set to true or false.


Path to the Kerberos config file.


Path to the Kerberos key tab.


Default Kerberos realm.


Used if the SASL mechanism is OAUTHBEARER (experimental).


The following Kafka event handler options can be set in a handler file or when using .kafka() in a TICKscript.

clusterstringName of the Kafka cluster.
topicstringKafka topic. In TICKscripts, this is set using .kafkaTopic().
templatestringMessage template.
disablePartitionByIdbooleanDisable partitioning Kafka messages by message ID.
partitionAlgorithmstringAlgorithm to use to assign message IDs to Kafka partitions (crc32 (default), murmur2, or fnv-1a).

Kafka message partitioning

In Kapacitor 1.6+, messages with the same ID are sent to the same Kafka partition. Previously, messages were sent to the Kafka partition with the least amount of data, regardless of the message ID. Messages with no ID are spread randomly between partitions. This aligns the Kapacitor concept of message IDs with the Kafka concept of message keys.

To revert to the previous behavior, use the disablePartitionById option.

When partitioning by ID, use the partitionHashAlgorithm to specify the method used to assign message IDs to Kafka partitions. Kapacitor supports the following partitioning algorithms:

  • crc32: (default) aligns with librdkafka and confluent-kafka-go
  • murmur2: aligns with canonical Java partitioning logic
  • fnv-1a: aligns with Shopify’s sarama project

Example: handler file

id: kafka-event-handler
topic: kapacitor-topic-name
kind: kafka
  cluster: kafka-cluster
  topic: kafka-topic-name
  template: kafka-template-name
  disablePartitionById: false
  partitionAlgorithm: crc32

Example: TICKscript

  // ...

Using the Kafka Event Handler

With the Kafka event handler enabled in your kapacitor.conf, use the .kafka() attribute in your TICKscripts to send alerts to a Kafka cluster or define a Kafka handler that subscribes to a topic and sends published alerts to Kafka.

The examples below use the following Kafka configuration defined in the kapacitor.conf:

Kafka settings in kapacitor.conf

  enabled = true
  id = "infra-monitoring"
  brokers = ["", ""]
  timeout = "10s"
  batch-size = 100
  batch-timeout = "1s"
  use-ssl = true
  ssl-ca = "/etc/ssl/certs/ca.crt"
  ssl-cert = "/etc/ssl/certs/cert.crt"
  ssl-key = "/etc/ssl/certs/cert-key.key"
  insecure-skip-verify = true

Send alerts to a Kafka cluster from a TICKscript

The following TICKscript uses the .kafka() event handler to send the message, “Hey, check your CPU”, whenever idle CPU usage drops below 10%. It publishes the messages to the cpu-alerts topic in the infra-monitoring Kafka cluster defined in the kapacitor.conf.


    .crit(lambda: "usage_idle" < 10)
    .message('Hey, check your CPU')

Send alerts to a Kafka cluster from a defined handler

The following setup sends an alert to the cpu topic with the message, “Hey, check your CPU”. A Kafka handler is added that subscribes to the cpu topic and publishes all alert messages to the cpu-alerts topic associated with the infra-monitoring Kafka cluster defined in the kapacitor.conf.

Create a TICKscript that publishes alert messages to a topic. The TICKscript below sends an alert message to the cpu topic any time CPU idle usage drops below 10% (or CPU usage is above 90%).

    .crit(lambda: "usage_idle" < 10)
    .message('Hey, check your CPU')

Add and enable the TICKscript:

kapacitor define cpu_alert -tick cpu_alert.tick
kapacitor enable cpu_alert

Create a handler file that subscribes to the cpu topic and uses the Kafka event handler to send alerts to the cpu-alerts topic in Kafka.

id: kafka-cpu-alert
topic: cpu
kind: kafka
  topic: 'cpu-alerts'

Add the handler:

kapacitor define-topic-handler kafka_cpu_handler.yaml

Using SASL with Kapacitor

To use an authentication method other than SSL, configure Kapacitor to use SASL. An example would be using Kapacitor to authenticate directly against Kafka with a username/password. Multiple configuration options are available, but the most common usage is username and password as shown in the following example:

  enabled = true
  id = "infra-monitoring"
  brokers = ["", ""]
  timeout = "10s"
  batch-size = 100
  batch-timeout = "1s"
  use-ssl = true
  ssl-ca = "/etc/ssl/certs/ca.crt"
  ssl-cert = "/etc/ssl/certs/cert.crt"
  ssl-key = "/etc/ssl/certs/cert-key.key"
  insecure-skip-verify = true
  sasl-username = "kafka"
  sasl-password = "kafkapassword"

Was this page helpful?

Thank you for your feedback!

The future of Flux

Flux is going into maintenance mode. You can continue using it as you currently are without any changes to your code.

Flux is going into maintenance mode and will not be supported in InfluxDB 3.0. This was a decision based on the broad demand for SQL and the continued growth and adoption of InfluxQL. We are continuing to support Flux for users in 1.x and 2.x so you can continue using it with no changes to your code. If you are interested in transitioning to InfluxDB 3.0 and want to future-proof your code, we suggest using InfluxQL.

For information about the future of Flux, see the following: