Documentation

Use Flux tasks with Kapacitor

Use Kapacitor 1.6+ to run Flux tasks against InfluxDB and other data sources. Leverage the full library of Flux functionality to build powerful data processing and monitoring tasks in Kapacitor.

Before you start

Before you get started with Flux tasks, consider:

  • Kapacitor Flux tasks can not use Kapacitor topics or event handlers. You can only send alerts from within a Flux script using Flux notification endpoints.
  • Flux tasks are scheduled and executed by the Flux task engine built into Kapacitor 1.6+. This engine is separate from the Kapacitor TICKscript task engine.
  • Flux tasks are configured with the Flux task option inside of a Flux task script. This includes the task name and execution schedule.

Select which version of InfluxDB you’re using:

  1. Set up a Flux task database in InfluxDB
  2. Configure Kapacitor Flux tasks
  3. Create a Flux task

Set up a Flux task database in InfluxDB

(Optional but encouraged)

When Kapacitor executes a Flux task, it can store information about the task execution (run) in an InfluxDB database. To store this data, do the following:

  1. Create a new database in InfluxDB to store Flux task run and log data in.

    CREATE DATABASE kapacitorfluxtasks
    
  2. To prevent large amounts of Kapacitor Flux task log data on disk, update the default autogen retention policy with a finite retention period or create a new retention policy (RP) with a finite retention period.

-- Syntax
ALTER RETENTION POLICY <rp-name> ON <db-name> DURATION <new-retention-duration>

-- Example
ALTER RETENTION POLICY autogen ON kapacitorfluxtasks DURATION 3d
-- Syntax
CREATE RETENTION POLICY <rp-name> on <db-name> DURATION <retention-duration>

-- Example 
CREATE RETENTION POLICY threedays on kapacitorfluxtasks DURATION 3d

Configure Kapacitor Flux tasks

Update or add the following settings under [fluxtask] in your kapacitor.conf:

  • enabled: true
  • task-run-influxdb: Name of the InfluxDB configuration in your kapacitor.conf to use to store Flux task data. To disable Flux task logging, set to "none".
  • task-run-bucket: InfluxDB database to store Flux task data and logs in. We recommend leaving this empty. By default, data is written to the kapacitor_fluxtask_logs database. To specify another database to write task log data to, use the "db-name" naming convention (including the retention policy "db-name/rp" is not supported). If the specified database does not already exist in InfluxDB, Kapacitor attempts to create the database. If authentication is turned on, permissions to CREATE DATABASE are required. For more information, see Authentication and authorization in InfluxDB.
  • Provide one of the following:
    • task-run-org: Leave as an empty string ("")
    • task-run-orgid: Leave as an empty string ("")
  • task-run-measurement: InfluxDB measurement to store task run and log data in. Default is "runs".
Kapacitor Flux task configuration example
# ...

[fluxtask]
  enabled = true
  task-run-influxdb = "default"
  task-run-bucket = "kapacitor_fluxtask_logs"
  task-run-org = ""
  task-run-orgid = ""
  task-run-measurement = "runs"

# ...

For more information about Kapacitor [fluxtask] configuration options, see Configure Kapacitor.

Create a Flux task

  1. Create a Flux task script. Include the task option in your script to configure the Kapacitor Flux task. For more information about writing Flux tasks, see:

    Provide InfluxDB connection credentials

    from() and to() functions require your InfluxDB host and token.

    Bucket name syntax

    When querying or writing to InfluxDB 1.x with Flux, use the database-name/retention-policy-name pattern to specify your bucket.

    example-task.flux
    option task = {
      name: "example-task-name",
      every: 1h,
      offset: 10m
    }
    
    host = "http://localhost:8086"
    token = ""
    
    from(bucket: "example-db/example-rp", host: host, token: token)
      |> range(start: -task.every)
      |> filter(fn: (r) => r._measurement == "example-measurement")
      |> aggregateWindow(every: 10m, fn: mean)
      |> to(bucket: "example-db/example-rp-downsampled", host: host, token: token)
    
  2. Use the kapacitor flux task create command to add your Flux script as a Kapacitor Flux task.

    kapacitor flux task create --file /path/to/example-task.flux
    

For more details about creating Kapacitor Flux tasks, see Create a Kapacitor Flux task.

Consider using InfluxDB tasks

If you’re using InfluxDB Cloud or InfluxDB OSS 2.x, consider using native InfluxDB tasks for data processing.

  1. Set up Kapacitor for InfluxDB Cloud or 2.x
  2. Configure Kapacitor Flux tasks for InfluxDB Cloud or 2.x
  3. Create a Flux task

Set up Kapacitor for InfluxDB Cloud or 2.x

Configure Kapacitor to connect to InfluxDB Cloud or InfluxDB OSS 2.x. For detailed instructions, see the following:

Configure Kapacitor Flux tasks for InfluxDB Cloud or 2.x

Update or add the following settings under [fluxtask] your kapacitor.conf:

  • enabled: true
  • task-run-influxdb: Name of the InfluxDB configuration in your kapacitor.conf to use to store Flux task data. To disable Flux task logging, set to "none".
  • task-run-bucket: InfluxDB bucket to store Flux task data and logs in. We recommend leaving this empty. By default, data is written to the kapacitor_fluxtask_logs bucket. To specify another bucket to write task log data to, use the _tasks system bucket or create a new bucket. If the specified bucket does not already exist in InfluxDB, Kapacitor attempts to create it with POST /api/v2/buckets, in which case your API token must have permissions to create buckets in InfluxDB. For more information, see Manage API tokens.
  • Provide one of the following:
    • task-run-org: InfluxDB organization name.
    • task-run-orgid: InfluxDB organization ID.
  • task-run-measurement: InfluxDB measurement to store task run and log data in. Default is "runs".
# ...

[fluxtask]
  enabled = true
  task-run-influxdb = "InfluxDB"
  task-run-bucket = "kapacitor_fluxtask_logs"
  task-run-org = "example-org"
  task-run-measurement = "runs"

# ...

Create a Flux task

  1. Create a Flux task script. Include the task option in your script to configure the Kapacitor Flux task. For more information about writing Flux tasks, see:

    Provide InfluxDB connection credentials

    from()](/flux/v0.x/stdlib/influxdata/influxdb/from/) and to() functions require your InfluxDB host and token.

    example-task.flux
    option task = {
      name: "example-task-name",
      every: 1h,
      offset: 10m
    }
    
    host = "http://localhost:8086"
    token = ""
    
    from(bucket: "example-bucket", host: host, token: token)
      |> range(start: -task.every)
      |> filter(fn: (r) => r._measurement == "example-measurement")
      |> aggregateWindow(every: 10m, fn: mean)
      |> to(bucket: "example-bucket-downsampled", host: host, token: token)
    
  2. Use the kapacitor flux task create command to add your Flux script as a Kapacitor Flux task.

    kapacitor flux task create --file /path/to/example-task.flux
    

For more details about creating Kapacitor Flux tasks, see Create a Kapacitor Flux task.


Upgrade to InfluxDB Cloud or InfluxDB 2.0!

InfluxDB Cloud and InfluxDB OSS 2.0 ready for production.