Documentation

Get started processing data

Now that you know the basics of querying data from InfluxDB, let’s go beyond a basic query and begin to process the queried data. “Processing” data could mean transforming, aggregating, downsampling, or alerting on data. This tutorial covers the following data processing use cases:

Most data processing operations require manually editing Flux queries. If you’re using the InfluxDB Data Explorer, switch to the Script Editor instead of using the Query Builder.

Remap or assign values in your data

Use the map() function to iterate over each row in your data and update the values in that row. map() is one of the most useful functions in Flux and will help you accomplish many of they data processing operations you need to perform.

Learn more about how map() works

from(bucket: "get-started")
    |> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T20:00:01Z)
    |> filter(fn: (r) => r._measurement == "home")
    |> filter(fn: (r) => r._field == "hum")
    |> map(fn: (r) => ({r with _value: r._value / 100.0}))

Map examples

Perform mathematical operations

Conditionally assign a state

Alert on data

Group data

Use the group() function to regroup your data by specific column values in preparation for further processing.

from(bucket: "get-started")
    |> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T20:00:01Z)
    |> filter(fn: (r) => r._measurement == "home")
    |> group(columns: ["room", "_field"])

Understanding data grouping and why it matters is important, but may be too much for this “getting started” tutorial. For more information about how data is grouped and why it matters, see the Flux data model documentation.

By default, from() returns data queried from InfluxDB grouped by series (measurement, tags, and field key). Each table in the returned stream of tables represents a group. Each table contains the same values for the columns that data is grouped by. This grouping is important as you aggregate data.

Group examples

Group data by specific columns

Ungroup data

Aggregate or select specific data

Use Flux aggregate or selector functions to return aggregate or selected values from each input table.

from(bucket: "get-started")
    |> range(start: 2022-01-01T08:00:00Z, stop: 2022-01-01T20:00:01Z)
    |> filter(fn: (r) => r._measurement == "home")
    |> filter(fn: (r) => r._field == "co" or r._field == "hum" or r._field == "temp")
    |> mean()

Aggregate over time

If you want to query aggregate values over time, this is a form of downsampling.

Aggregate functions

Aggregate functions drop columns that are not in the group key and return a single row for each input table with the aggregate value of that table.

Aggregate examples

Calculate the average temperature for each room

Calculate the overall average temperature of all rooms

Count the number of points reported per room across all fields

Assign a new aggregate timestamp

_time is generally not part of the group key and will be dropped when using aggregate functions. To assign a new timestamp to aggregate points, duplicate the _start or _stop column, which represent the query bounds, as the new _time column.

from(bucket: "get-started")
    |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
    |> filter(fn: (r) => r._measurement == "home")
    |> filter(fn: (r) => r._field == "temp")
    |> mean()
    |> duplicate(column: "_stop", as: "_time")

Selector functions

Selector functions return one or more columns from each input table and retain all columns and their values.

Selector examples

Return the first temperature from each room

Return the last temperature from each room

Return the maximum temperature from each room

Pivot data into a relational schema

If coming from relational SQL or SQL-like query languages, such as InfluxQL, the data model that Flux uses is different than what you’re used to. Flux returns multiple tables where each table contains a different field. A “relational” schema structures each field as a column in each row.

Use the pivot() function to pivot data into a “relational” schema based on timestamps.

from(bucket: "get-started")
    |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
    |> filter(fn: (r) => r._measurement == "home")
    |> filter(fn: (r) => r._field == "co" or r._field == "hum" or r._field == "temp")
    |> filter(fn: (r) => r.room == "Kitchen")
    |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")

View input and pivoted output

Downsample data

Downsampling data is a strategy that improve performance at query time and also optimizes long-term data storage. Simply put, downsampling reduces the number of points returned by a query without losing the general trends in the data.

For more information about downsampling data, see Downsample data.

The most common way to downsample data is by time intervals or “windows.” For example, you may want to query the last hour of data and return the average value for every five minute window.

Use aggregateWindow() to downsample data by specified time intervals:

  • Use the every parameter to specify the duration of each window.
  • Use the fn parameter to specify what aggregate or selector function to apply to each window.
  • (Optional) Use the timeSrc parameter to specify which column value to use to create the new aggregate timestamp for each window. The default is _stop.
from(bucket: "get-started")
    |> range(start: 2022-01-01T14:00:00Z, stop: 2022-01-01T20:00:01Z)
    |> filter(fn: (r) => r._measurement == "home")
    |> filter(fn: (r) => r._field == "temp")
    |> aggregateWindow(every: 2h, fn: mean)

View input and downsampled output

Automate processing with InfluxDB tasks

InfluxDB tasks are scheduled queries that can perform any of the data processing operations described above. Generally tasks then use the to() function to write the processed result back to InfluxDB.

For more information about creating and configuring tasks, see Get started with InfluxDB tasks.

Example downsampling task

option task = {
    name: "Example task"
    every: 1d,
}

from(bucket: "get-started-downsampled")
    |> range(start: -task.every)
    |> filter(fn: (r) => r._measurement == "home")
    |> aggregateWindow(every: 2h, fn: mean)

Was this page helpful?

Thank you for your feedback!


The future of Flux

Flux is going into maintenance mode. You can continue using it as you currently are without any changes to your code.

Read more

InfluxDB 3 Open Source Now in Public Alpha

InfluxDB 3 Open Source is now available for alpha testing, licensed under MIT or Apache 2 licensing.

We are releasing two products as part of the alpha.

InfluxDB 3 Core, is our new open source product. It is a recent-data engine for time series and event data. InfluxDB 3 Enterprise is a commercial version that builds on Core’s foundation, adding historical query capability, read replicas, high availability, scalability, and fine-grained security.

For more information on how to get started, check out:

InfluxDB Cloud powered by TSM