Downsample data with client libraries
Query and downsample time series data stored in InfluxDB and write the downsampled data back to InfluxDB.
This guide uses Python and the InfluxDB 3 Python client library, but you can use your runtime of choice and any of the available InfluxDB 3 client libraries. This guide also assumes you have already setup your Python project and virtual environment.
- Install dependencies
- Prepare InfluxDB buckets
- Create InfluxDB clients
- Query InfluxDB
- Write the downsampled data back to InfluxDB
- Full downsampling script
Install dependencies
Use pip
to install the following dependencies:
influxdb_client_3
pandas
pip install influxdb3-python pandas
Prepare InfluxDB buckets
The downsampling process involves two InfluxDB buckets. Each bucket has a retention period that specifies how long data persists in the database before it expires and is deleted. By using two buckets, you can store unmodified, high-resolution data in a bucket with a shorter retention period and then downsampled, low-resolution data in a bucket with a longer retention period.
Ensure you have a bucket for each of the following:
- One to query unmodified data from
- The other to write downsampled data to
For information about creating buckets, see Create a bucket.
Create InfluxDB clients
Use the InfluxDBClient3
function in the influxdb_client_3
module to
instantiate two InfluxDB clients:
- One configured to connect to your InfluxDB bucket with unmodified data.
- The other configured to connect to the InfluxDB bucket that you want to write downsampled data to.
Provide the following credentials for each client:
- host: InfluxDB Cloud Serverless region URL (without the protocol)
- org: InfluxDB organization name
- token: InfluxDB API token with read and write permissions on the buckets you want to query and write to.
- database: InfluxDB bucket name
from influxdb_client_3 import InfluxDBClient3
import pandas
# Instantiate an InfluxDBClient3 client configured for your unmodified bucket
influxdb_raw = InfluxDBClient3(
host='us-east-1-1.aws.cloud2.influxdata.com',
token='API_TOKEN',
database='RAW_BUCKET_NAME'
)
# Instantiate an InfluxDBClient3 client configured for your downsampled database.
# When writing, the org= argument is required by the client (but ignored by InfluxDB).
influxdb_downsampled = InfluxDBClient3(
host='us-east-1-1.aws.cloud2.influxdata.com',
token='API_TOKEN',
database='DOWNSAMPLED_BUCKET_NAME',
org=''
)
Query InfluxDB
Define a query that performs time-based aggregations
The most common method used to downsample time series data is to perform aggregate or selector operations on intervals of time. For example, return the average value for each hour in the queried time range.
Use either SQL or InfluxQL to downsample data by applying aggregate or selector functions to time intervals.
In the
SELECT
clause:- Use
DATE_BIN
to assign each row to an interval based on the row’s timestamp and update thetime
column with the assigned interval timestamp. You can also useDATE_BIN_GAPFILL
to fill any gaps created by intervals with no data (see Fill gaps in data with SQL). - Apply an aggregate or selector function to each queried field.
- Use
Include a
GROUP BY
clause that groups by intervals returned from theDATE_BIN
function in yourSELECT
clause and any other queried tags. The example below usesGROUP BY 1
to group by the first column in theSELECT
clause.Include an
ORDER BY
clause that sorts data bytime
.
For more information, see Aggregate data with SQL - Downsample data by applying interval-based aggregates.
SELECT
DATE_BIN(INTERVAL '1 hour', time) AS time,
room,
AVG(temp) AS temp,
AVG(hum) AS hum,
AVG(co) AS co
FROM home
--In WHERE, time refers to <source_table>.time
WHERE time >= now() - INTERVAL '24 hours'
--1 refers to the DATE_BIN column
GROUP BY 1, room
ORDER BY time
In the
SELECT
clause, apply an aggregate or selector function to queried fields.Include a
GROUP BY
clause that groups bytime()
at a specified interval.
SELECT
MEAN(temp) AS temp,
MEAN(hum) AS hum,
MEAN(co) AS co
FROM home
WHERE time >= now() - 24h
GROUP BY time(1h)
Execute the query
Assign the query string to a variable.
Use the
query
method of your instantiated client to query raw data from InfluxDB. Provide the following arguments.- query: Query string to execute
- language:
sql
orinfluxql
Use the
to_pandas
method to convert the returned Arrow table to a Pandas DataFrame.
# ...
query = '''
SELECT
DATE_BIN(INTERVAL '1 hour', time) AS time,
room,
AVG(temp) AS temp,
AVG(hum) AS hum,
AVG(co) AS co
FROM home
--In WHERE, time refers to <source_table>.time
WHERE time >= now() - INTERVAL '24 hours'
--1 refers to the DATE_BIN column
GROUP BY 1, room
ORDER BY 1
'''
table = influxdb_raw.query(query=query, language="sql")
data_frame = table.to_pandas()
# ...
query = '''
SELECT
MEAN(temp) AS temp,
MEAN(hum) AS hum,
MEAN(co) AS co
FROM home
WHERE time >= now() - 24h
GROUP BY time(1h)
'''
table = influxdb_raw.query(query=query, language="influxql")
data_frame = table.to_pandas()
Write the downsampled data back to InfluxDB
For InfluxQL query results, delete (
drop
) theiox::measurement
column before writing data back to InfluxDB. You’ll avoid measurement name conflicts when querying your downsampled data later.Use the
sort_values
method to sort data in the Pandas DataFrame bytime
to ensure writing back to InfluxDB is as performant as possible.Use the
write
method of your instantiated downsampled client to write the query results back to your InfluxDB bucket for downsampled data. Include the following arguments:- record: Pandas DataFrame containing downsampled data
- data_frame_measurement_name: Destination measurement name
- data_frame_timestamp_column: Column containing timestamps for each point
- data_frame_tag_columns: List of tag columns
Columns not listed in the data_frame_tag_columns or data_frame_timestamp_column arguments are written to InfluxDB as fields.
# ...
data_frame = data_frame.sort_values(by="time")
influxdb_downsampled.write(
record=data_frame,
data_frame_measurement_name="home_ds",
data_frame_timestamp_column="time",
data_frame_tag_columns=['room']
)
Full downsampling script
from influxdb_client_3 import InfluxDBClient3
import pandas
influxdb_raw = InfluxDBClient3(
host='us-east-1-1.aws.cloud2.influxdata.com',
token='API_TOKEN',
database='RAW_BUCKET_NAME'
)
# When writing, the org= argument is required by the client (but ignored by InfluxDB).
influxdb_downsampled = InfluxDBClient3(
host='us-east-1-1.aws.cloud2.influxdata.com',
token='API_TOKEN',
database='DOWNSAMPLED_BUCKET_NAME',
org=''
)
query = '''
SELECT
DATE_BIN(INTERVAL '1 hour', time) AS time,
room,
AVG(temp) AS temp,
AVG(hum) AS hum,
AVG(co) AS co
FROM home
--In WHERE, time refers to <source_table>.time
WHERE time >= now() - INTERVAL '24 hours'
--1 refers to the DATE_BIN column
GROUP BY 1, room
ORDER BY 1
'''
table = influxdb_raw.query(query=query, language="sql")
data_frame = table.to_pandas()
data_frame = data_frame.sort_values(by="time")
influxdb_downsampled.write(
record=data_frame,
data_frame_measurement_name="home_ds",
data_frame_timestamp_column="time",
data_frame_tag_columns=['room']
)
from influxdb_client_3 import InfluxDBClient3
import pandas
influxdb_raw = InfluxDBClient3(
host='us-east-1-1.aws.cloud2.influxdata.com',
org='ORG_NAME',
token='API_TOKEN',
database='RAW_BUCKET_NAME'
)
# When writing, the org= argument is required by the client (but ignored by InfluxDB).
influxdb_downsampled = InfluxDBClient3(
host='us-east-1-1.aws.cloud2.influxdata.com',
token='API_TOKEN',
database='DOWNSAMPLED_BUCKET_NAME',
org=''
)
query = '''
SELECT
MEAN(temp) AS temp,
MEAN(hum) AS hum,
MEAN(co) AS co
FROM home
WHERE time >= now() - 24h
GROUP BY time(1h)
'''
# To prevent naming conflicts when querying downsampled data,
# drop the iox::measurement column before writing the data
# with the new measurement.
data_frame = data_frame.drop(columns=['iox::measurement'])
table = influxdb_raw.query(query=query, language="influxql")
data_frame = table.to_pandas()
data_frame = data_frame.sort_values(by="time")
influxdb_downsampled.write(
record=data_frame,
data_frame_measurement_name="home_ds",
data_frame_timestamp_column="time",
data_frame_tag_columns=['room']
)
Was this page helpful?
Thank you for your feedback!
Support and feedback
Thank you for being part of our community! We welcome and encourage your feedback and bug reports for InfluxDB Cloud Serverless and this documentation. To find support, use the following resources:
Customers with an annual or support contract can contact InfluxData Support.