Documentation

Downsample data with client libraries

Query and downsample time series data stored in InfluxDB and write the downsampled data back to InfluxDB.

This guide uses Python and the InfluxDB v3 Python client library, but you can use your runtime of choice and any of the available InfluxDB v3 client libraries. This guide also assumes you have already setup your Python project and virtual environment.

Install dependencies

Use pip to install the following dependencies:

  • influxdb_client_3
  • pandas
pip install influxdb3-python pandas

Prepare InfluxDB buckets

The downsampling process involves two InfluxDB buckets. Each bucket has a retention period that specifies how long data persists in the database before it expires and is deleted. By using two buckets, you can store unmodified, high-resolution data in a bucket with a shorter retention period and then downsampled, low-resolution data in a bucket with a longer retention period.

Ensure you have a bucket for each of the following:

  • One to query unmodified data from
  • The other to write downsampled data to

For information about creating buckets, see Create a bucket.

Create InfluxDB clients

Use the InfluxDBClient3 function in the influxdb_client_3 module to instantiate two InfluxDB clients:

  • One configured to connect to your InfluxDB bucket with unmodified data.
  • The other configured to connect to the InfluxDB bucket that you want to write downsampled data to.

Provide the following credentials for each client:

  • host: InfluxDB Cloud Serverless region URL (without the protocol)
  • org: InfluxDB organization name
  • token: InfluxDB API token with read and write permissions on the buckets you want to query and write to.
  • database: InfluxDB bucket name
from influxdb_client_3 import InfluxDBClient3
import pandas

# Instantiate an InfluxDBClient3 client configured for your unmodified bucket
influxdb_raw = InfluxDBClient3(
    host='cloud2.influxdata.com',
    token='
API_TOKEN
'
,
database='
RAW_BUCKET_NAME
'
) # Instantiate an InfluxDBClient3 client configured for your downsampled database. # When writing, the org= argument is required by the client (but ignored by InfluxDB). influxdb_downsampled = InfluxDBClient3( host='cloud2.influxdata.com', token='
API_TOKEN
'
,
database='
DOWNSAMPLED_BUCKET_NAME
'
,
org='' )

Query InfluxDB

Define a query that performs time-based aggregations

The most common method used to downsample time series data is to perform aggregate or selector operations on intervals of time. For example, return the average value for each hour in the queried time range.

Use either SQL or InfluxQL to downsample data by applying aggregate or selector functions to time intervals.

  1. In the SELECT clause:

  2. Include a GROUP BY clause that groups by intervals returned from the DATE_BIN function in your SELECT clause and any other queried tags. The example below uses GROUP BY 1 to group by the first column in the SELECT clause.

  3. Include an ORDER BY clause that sorts data by time.

For more information, see Aggregate data with SQL - Downsample data by applying interval-based aggregates.

SELECT
  DATE_BIN(INTERVAL '1 hour', time) AS time,
  room,
  AVG(temp) AS temp,
  AVG(hum) AS hum,
  AVG(co) AS co
FROM home
--In WHERE, time refers to <source_table>.time
WHERE time >= now() - INTERVAL '24 hours'
--1 refers to the DATE_BIN column
GROUP BY 1, room
ORDER BY time
  1. In the SELECT clause, apply an aggregate or selector function to queried fields.

  2. Include a GROUP BY clause that groups by time() at a specified interval.

SELECT
  MEAN(temp) AS temp,
  MEAN(hum) AS hum,
  MEAN(co) AS co
FROM home
WHERE time >= now() - 24h
GROUP BY time(1h)

Execute the query

  1. Assign the query string to a variable.

  2. Use the query method of your instantiated client to query raw data from InfluxDB. Provide the following arguments.

    • query: Query string to execute
    • language: sql or influxql
  3. Use the to_pandas method to convert the returned Arrow table to a Pandas DataFrame.

# ...

query = '''
SELECT
  DATE_BIN(INTERVAL '1 hour', time) AS time,
  room,
  AVG(temp) AS temp,
  AVG(hum) AS hum,
  AVG(co) AS co
FROM home
--In WHERE, time refers to <source_table>.time
WHERE time >= now() - INTERVAL '24 hours'
--1 refers to the DATE_BIN column
GROUP BY 1, room
ORDER BY 1
'''

table = influxdb_raw.query(query=query, language="sql")
data_frame = table.to_pandas()
# ...

query = '''
SELECT
  MEAN(temp) AS temp,
  MEAN(hum) AS hum,
  MEAN(co) AS co
FROM home
WHERE time >= now() - 24h
GROUP BY time(1h)
'''

table = influxdb_raw.query(query=query, language="influxql")
data_frame = table.to_pandas()
\

Write the downsampled data back to InfluxDB

  1. For InfluxQL query results, delete (drop) the iox::measurement column before writing data back to InfluxDB. You’ll avoid measurement name conflicts when querying your downsampled data later.

  2. Use the sort_values method to sort data in the Pandas DataFrame by time to ensure writing back to InfluxDB is as performant as possible.

  3. Use the write method of your instantiated downsampled client to write the query results back to your InfluxDB bucket for downsampled data. Include the following arguments:

    • record: Pandas DataFrame containing downsampled data
    • data_frame_measurement_name: Destination measurement name
    • data_frame_timestamp_column: Column containing timestamps for each point
    • data_frame_tag_columns: List of tag columns

    Columns not listed in the data_frame_tag_columns or data_frame_timestamp_column arguments are written to InfluxDB as fields.

# ...

data_frame = data_frame.sort_values(by="time")

influxdb_downsampled.write(
    record=data_frame,
    data_frame_measurement_name="home_ds",
    data_frame_timestamp_column="time",
    data_frame_tag_columns=['room']
)

Full downsampling script

from influxdb_client_3 import InfluxDBClient3
import pandas

influxdb_raw = InfluxDBClient3(
    host='cloud2.influxdata.com',
    token='
API_TOKEN
'
,
database='
RAW_BUCKET_NAME
'
) # When writing, the org= argument is required by the client (but ignored by InfluxDB). influxdb_downsampled = InfluxDBClient3( host='cloud2.influxdata.com', token='
API_TOKEN
'
,
database='
DOWNSAMPLED_BUCKET_NAME
'
,
org='' ) query = ''' SELECT DATE_BIN(INTERVAL '1 hour', time) AS time, room, AVG(temp) AS temp, AVG(hum) AS hum, AVG(co) AS co FROM home --In WHERE, time refers to <source_table>.time WHERE time >= now() - INTERVAL '24 hours' --1 refers to the DATE_BIN column GROUP BY 1, room ORDER BY 1 ''' table = influxdb_raw.query(query=query, language="sql") data_frame = table.to_pandas() data_frame = data_frame.sort_values(by="time") influxdb_downsampled.write( record=data_frame, data_frame_measurement_name="home_ds", data_frame_timestamp_column="time", data_frame_tag_columns=['room'] )
from influxdb_client_3 import InfluxDBClient3
import pandas

influxdb_raw = InfluxDBClient3(
    host='cloud2.influxdata.com',
    org='
ORG_NAME
'
,
token='
API_TOKEN
'
,
database='
RAW_BUCKET_NAME
'
) # When writing, the org= argument is required by the client (but ignored by InfluxDB). influxdb_downsampled = InfluxDBClient3( host='cloud2.influxdata.com', token='
API_TOKEN
'
,
database='
DOWNSAMPLED_BUCKET_NAME
'
,
org='' ) query = ''' SELECT MEAN(temp) AS temp, MEAN(hum) AS hum, MEAN(co) AS co FROM home WHERE time >= now() - 24h GROUP BY time(1h) ''' # To prevent naming conflicts when querying downsampled data, # drop the iox::measurement column before writing the data # with the new measurement. data_frame = data_frame.drop(columns=['iox::measurement']) table = influxdb_raw.query(query=query, language="influxql") data_frame = table.to_pandas() data_frame = data_frame.sort_values(by="time") influxdb_downsampled.write( record=data_frame, data_frame_measurement_name="home_ds", data_frame_timestamp_column="time", data_frame_tag_columns=['room'] )

Was this page helpful?

Thank you for your feedback!


The future of Flux

Flux is going into maintenance mode. You can continue using it as you currently are without any changes to your code.

Read more

InfluxDB v3 enhancements and InfluxDB Clustered is now generally available

New capabilities, including faster query performance and management tooling advance the InfluxDB v3 product line. InfluxDB Clustered is now generally available.

InfluxDB v3 performance and features

The InfluxDB v3 product line has seen significant enhancements in query performance and has made new management tooling available. These enhancements include an operational dashboard to monitor the health of your InfluxDB cluster, single sign-on (SSO) support in InfluxDB Cloud Dedicated, and new management APIs for tokens and databases.

Learn about the new v3 enhancements


InfluxDB Clustered general availability

InfluxDB Clustered is now generally available and gives you the power of InfluxDB v3 in your self-managed stack.

Talk to us about InfluxDB Clustered

InfluxDB Cloud Serverless