Documentation

Migrate continuous queries to tasks

InfluxDB Cloud replaces 1.x continuous queries (CQs) with InfluxDB tasks. To migrate continuous queries to InfluxDB Cloud tasks, do the following:

  1. Output all InfluxDB 1.x continuous queries
  2. Convert continuous queries to Flux queries
  3. Create new InfluxDB tasks

Output all InfluxDB 1.x continuous queries

  1. Use the InfluxDB 1.x influx interactive shell to run SHOW CONTINUOUS QUERIES:

    $ influx
    Connected to http://localhost:8086 version 
    InfluxDB shell version: 
    > SHOW CONTINUOUS QUERIES
    
    • Copy
    • Fill window
  2. Copy and save the displayed continuous queries.

Convert continuous queries to Flux queries

To migrate InfluxDB 1.x continuous queries to InfluxDB Cloud tasks, convert the InfluxQL query syntax to Flux. The majority of continuous queries are simple downsampling queries and can be converted quickly using the aggregateWindow() function. For example:

Example continuous query
CREATE CONTINUOUS QUERY "downsample-daily" ON "my-db"
BEGIN
  SELECT mean("example-field")
  INTO "my-db"."example-rp"."average-example-measurement"
  FROM "example-measurement"
  GROUP BY time(1h)
END
  • Copy
  • Fill window
Equivalent Flux task
option task = {name: "downsample-daily", every: 1d}

from(bucket: "my-db/")
    |> range(start: -task.every)
    |> filter(fn: (r) => r._measurement == "example-measurement")
    |> filter(fn: (r) => r._field == "example-field")
    |> aggregateWindow(every: 1h, fn: mean)
    |> set(key: "_measurement", value: "average-example-measurement")
    |> to(org: "example-org", bucket: "my-db/example-rp")
  • Copy
  • Fill window

Convert InfluxQL continuous queries to Flux

Review the following statements and clauses to see how to convert your CQs to Flux:

ON clause

The ON clause defines the database to query. In InfluxDB Cloud, database and retention policy combinations are mapped to specific buckets (for more information, see Database and retention policy mapping).

Use the from() function to specify the bucket to query:

InfluxQL
CREATE CONTINUOUS QUERY "downsample-daily" ON "my-db"
-- ...
  • Copy
  • Fill window
Flux
from(bucket: "my-db/")
// ...
  • Copy
  • Fill window

SELECT statement

The SELECT statement queries data by field, tag, and time from a specific measurement. SELECT statements can take many different forms and converting them to Flux depends on your use case. For information about Flux and InfluxQL function parity, see Flux vs InfluxQL. See other resources available to help.

INTO clause

The INTO clause defines the measurement to write results to. INTO also supports fully-qualified measurements that include the database and retention policy. In InfluxDB Cloud, database and retention policy combinations are mapped to specific buckets (for more information, see Database and retention policy mapping).

To write to a measurement different than the measurement queried, use set() or map() to change the measurement name. Use the to() function to specify the bucket to write results to.

InfluxQL
-- ...
INTO "example-db"."example-rp"."example-measurement"
-- ...
  • Copy
  • Fill window
Flux
// ...
    |> set(key: "_measurement", value: "example-measurement")
    |> to(bucket: "example-db/example-rp")
  • Copy
  • Fill window
// ...
    |> map(fn: (r) => ({ r with _measurement: "example-measurement"}))
    |> to(bucket: "example-db/example-rp")
  • Copy
  • Fill window
Write pivoted data to InfluxDB

InfluxDB 1.x query results include a column for each field. InfluxDB Cloud does not do this by default, but it is possible with pivot() or schema.fieldsAsCols().

If you use to() to write pivoted data back to InfluxDB Cloud, each field column is stored as a tag. To write pivoted fields back to InfluxDB as fields, import the experimental package and use the experimental.to() function.

InfluxQL
CREATE CONTINUOUS QUERY "downsample-daily" ON "my-db"
BEGIN
  SELECT mean("example-field-1"), mean("example-field-2")
  INTO "example-db"."example-rp"."example-measurement"
  FROM "example-measurement"
  GROUP BY time(1h)
END
  • Copy
  • Fill window
Flux
// ...

from(bucket: "my-db/")
    |> range(start: -task.every)
    |> filter(fn: (r) => r._measurement == "example-measurement")
    |> filter(fn: (r) => r._field == "example-field-1" or r._field == "example-field-2")
    |> aggregateWindow(every: task.every, fn: mean)
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
    |> experimental.to(bucket: "example-db/example-rp")
  • Copy
  • Fill window

FROM clause

The from clause defines the measurement to query. Use the filter() function to specify the measurement to query.

InfluxQL
-- ...
FROM "example-measurement"
-- ...
  • Copy
  • Fill window
Flux
// ...
    |> filter(fn: (r) => r._measurement == "example-measurement")
  • Copy
  • Fill window

AS clause

The AS clause changes the name of the field when writing data back to InfluxDB. Use set() or map() to change the field name.

InfluxQL
-- ...
AS newfield
-- ...
  • Copy
  • Fill window
Flux
// ...
    |> set(key: "_field", value: "newfield")
  • Copy
  • Fill window
// ...
    |> map(fn: (r) => ({ r with _field: "newfield"}))
  • Copy
  • Fill window

WHERE clause

The WHERE clause uses predicate logic to filter results based on fields, tags, or timestamps. Use the filter() function and Flux comparison operators to filter results based on fields and tags. Use the range() function to filter results based on timestamps.

InfluxQL
-- ...
WHERE "example-tag" = "foo" AND time > now() - 7d
  • Copy
  • Fill window
Flux
// ...
    |> range(start: -7d)
    |> filter(fn: (r) => r["example-tag"] == "foo")
  • Copy
  • Fill window

GROUP BY clause

The InfluxQL GROUP BY clause groups data by specific tags or by time (typically to calculate an aggregate value for windows of time).

Group by tags

Use the group() function to modify the group key and change how data is grouped.

InfluxQL
-- ...
GROUP BY "location"
  • Copy
  • Fill window
Flux
// ...
    |> group(columns: ["location"])
  • Copy
  • Fill window
Group by time

Use the aggregateWindow() function to group data into time windows and perform an aggregation on each window. In CQs, the interval specified in the GROUP BY time() clause determines the CQ execution interval. Use the GROUP BY time() interval to set the every task option.

InfluxQL
-- ...
SELECT MEAN("example-field")
FROM "example-measurement"
GROUP BY time(1h)
  • Copy
  • Fill window
Flux
option task = {name: "task-name", every: 1h}

// ...
    |> filter(fn: (r) => r._measurement == "example-measurement" and r._field == "example-field")
    |> aggregateWindow(every: task.every, fn: mean)
  • Copy
  • Fill window

RESAMPLE clause

The CQ RESAMPLE clause uses data from the last specified duration to calculate a new aggregate point. The EVERY interval in RESAMPLE defines how often the CQ runs. The FOR interval defines the total time range queried by the CQ.

To accomplish this same functionality in a Flux task, set the start parameter in the range() function to the negative FOR duration. Define the task execution interval in the task options. For example:

InfluxQL
CREATE CONTINUOUS QUERY "resample-example" ON "my-db"
RESAMPLE EVERY 1m FOR 30m
BEGIN
  SELECT exponential_moving_average(mean("example-field"), 30)
  INTO "resample-average-example-measurement"
  FROM "example-measurement"
  WHERE region = 'example-region'
  GROUP BY time(1m)
END
  • Copy
  • Fill window
Flux
option task = {name: "resample-example", every: 1m}

from(bucket: "my-db/")
    |> range(start: -30m)
    |> filter(fn: (r) => r._measurement == "example-measurement" and r._field == "example-field" and r.region == "example-region")
    |> aggregateWindow(every: 1m, fn: mean)
    |> exponentialMovingAverage(n: 30)
    |> set(key: "_measurement", value: "resample-average-example-measurement")
    |> to(bucket: "my-db/")
  • Copy
  • Fill window

Create new InfluxDB tasks

After converting your continuous query to Flux, use the Flux query to create a new task.

Other helpful resources

The following resources are available and may be helpful when converting continuous queries to Flux tasks.

Documentation
Community

Was this page helpful?

Thank you for your feedback!


The future of Flux

Flux is going into maintenance mode. You can continue using it as you currently are without any changes to your code.

Read more

InfluxDB 3 Core and Enterprise are now in Beta

InfluxDB 3 Core and Enterprise are now available for beta testing, available under MIT or Apache 2 license.

InfluxDB 3 Core is a high-speed, recent-data engine that collects and processes data in real-time, while persisting it to local disk or object storage. InfluxDB 3 Enterprise is a commercial product that builds on Core’s foundation, adding high availability, read replicas, enhanced security, and data compaction for faster queries. A free tier of InfluxDB 3 Enterprise will also be available for at-home, non-commercial use for hobbyists to get the full historical time series database set of capabilities.

For more information, check out:

InfluxDB Cloud powered by TSM