Documentation

Downsampler plugin

The Downsampler Plugin enables time-based data aggregation and downsampling in InfluxDB 3. Reduce data volume by aggregating measurements over specified time intervals using functions like avg, sum, min, max, derivative, or median. The plugin supports both scheduled batch processing of historical data and on-demand downsampling through HTTP requests. Each downsampled record includes metadata about the original data points compressed.

Configuration

Required parameters

ParameterTypeDefaultDescription
source_measurementstringrequiredSource measurement containing data to downsample
target_measurementstringrequiredDestination measurement for downsampled data
windowstringrequired (scheduled only)Time window for each downsampling job. Format: <number><unit> (for example, "1h", "1d")

Aggregation parameters

ParameterTypeDefaultDescription
intervalstring"10min"Time interval for downsampling. Format: <number><unit> (for example, "10min", "2h", "1d")
calculationsstring“avg”Aggregation functions. Single function or dot-separated field:aggregation pairs
specific_fieldsstringall fieldsDot-separated list of fields to downsample (for example, "co.temperature")
excluded_fieldsstringnoneDot-separated list of fields to exclude from downsampling

Filtering parameters

ParameterTypeDefaultDescription
tag_valuesstringnoneTag filters. Format: tag:value1@value2@value3 for multiple values
offsetstring“0”Time offset to apply to the window

Advanced parameters

ParameterTypeDefaultDescription
target_databasestring“default”Database for storing downsampled data
max_retriesinteger5Maximum number of retries for write operations
batch_sizestring“30d”Time interval for batch processing (HTTP mode only)

TOML configuration

ParameterTypeDefaultDescription
config_file_pathstringnoneTOML config file path relative to PLUGIN_DIR (required for TOML configuration)

To use a TOML configuration file, set the PLUGIN_DIR environment variable and specify the config_file_path in the trigger arguments. This is in addition to the --plugin-dir flag when starting InfluxDB 3.

Example TOML configuration

downsampling_config_scheduler.toml

For more information on using TOML configuration files, see the Using TOML Configuration Files section in the influxdb3_plugins /README.md.

Schema management

Each downsampled record includes three additional metadata columns:

  • record_count—the number of original points compressed into this single downsampled row
  • time_from—the minimum timestamp among the original points in the interval
  • time_to—the maximum timestamp among the original points in the interval

Installation steps

  1. Start InfluxDB 3 Core with the Processing Engine enabled (--plugin-dir /path/to/plugins)

    influxdb3 serve \
      --node-id node0 \
      --object-store file \
      --data-dir ~/.influxdb3 \
      --plugin-dir ~/.plugins
  2. No additional Python packages required for this plugin.

Trigger setup

Scheduled downsampling

Run downsampling periodically on historical data:

influxdb3 create trigger \
  --database mydb \
  --plugin-filename gh:influxdata/downsampler/downsampler.py \
  --trigger-spec "every:1h" \
  --trigger-arguments 'source_measurement=cpu_metrics,target_measurement=cpu_hourly,interval=1h,window=6h,calculations=avg,specific_fields=usage_user.usage_system' \
  cpu_hourly_downsample

On-demand downsampling

Trigger downsampling via HTTP requests:

influxdb3 create trigger \
  --database mydb \
  --plugin-filename gh:influxdata/downsampler/downsampler.py \
  --trigger-spec "request:downsample" \
  downsample_api

Example usage

Example 1: CPU metrics hourly aggregation

Downsample CPU usage data from 1-minute intervals to hourly averages:

# Create the trigger
influxdb3 create trigger \
  --database system_metrics \
  --plugin-filename gh:influxdata/downsampler/downsampler.py \
  --trigger-spec "every:1h" \
  --trigger-arguments 'source_measurement=cpu,target_measurement=cpu_hourly,interval=1h,window=6h,calculations=avg,specific_fields=usage_user.usage_system.usage_idle' \
  cpu_hourly_downsample

# Write test data
influxdb3 write \
  --database system_metrics \
  "cpu,host=server1 usage_user=45.2,usage_system=12.1,usage_idle=42.7"

# Query downsampled data (after trigger runs)
influxdb3 query \
  --database system_metrics \
  "SELECT * FROM cpu_hourly WHERE time >= now() - 1d"

Expected output

host    | usage_user | usage_system | usage_idle | record_count | time_from           | time_to             | time
--------|------------|--------------|------------|--------------|---------------------|---------------------|-----
server1 | 44.8       | 11.9         | 43.3       | 60           | 2024-01-01T00:00:00Z| 2024-01-01T00:59:59Z| 2024-01-01T01:00:00Z

Aggregation details:

  • Before: 60 individual CPU measurements over 1 hour
  • After: 1 aggregated measurement with averages and metadata
  • Metadata shows original record count and time range

Example 2: Multi-field aggregation with different functions

Apply different aggregation functions to different fields:

# Create trigger with field-specific aggregations
influxdb3 create trigger \
  --database sensors \
  --plugin-filename gh:influxdata/downsampler/downsampler.py \
  --trigger-spec "every:10min" \
  --trigger-arguments 'source_measurement=environment,target_measurement=environment_10min,interval=10min,window=30min,calculations=temperature:avg.humidity:avg.pressure:max' \
  env_multi_agg

# Write data with various sensor readings
influxdb3 write \
  --database sensors \
  "environment,location=office temperature=22.5,humidity=45.2,pressure=1013.25"

# Query aggregated data
influxdb3 query \
  --database sensors \
  "SELECT * FROM environment_10min WHERE time >= now() - 1h"

Expected output

location | temperature | humidity | pressure | record_count | time
---------|-------------|----------|----------|--------------|-----
office   | 22.3        | 44.8     | 1015.1   | 10           | 2024-01-01T00:10:00Z

Example 3: HTTP API downsampling with backfill

Use HTTP API for on-demand downsampling with historical data:

# Send HTTP request for backfill downsampling
curl -X POST http://localhost:8181/api/v3/engine/downsample \
  --header "Authorization: Bearer YOUR_TOKEN" \
  --data '{
    "source_measurement": "metrics",
    "target_measurement": "metrics_daily",
    "target_database": "analytics",
    "interval": "1d",
    "batch_size": "7d",
    "calculations": [["cpu_usage", "avg"], ["memory_usage", "max"], ["disk_usage", "avg"]],
    "backfill_start": "2024-01-01T00:00:00Z",
    "backfill_end": "2024-01-31T00:00:00Z",
    "max_retries": 3
  }'

Code overview

Files

  • downsampler.py: The main plugin code containing handlers for scheduled and HTTP-triggered downsampling
  • downsampling_config_scheduler.toml: Example TOML configuration file for scheduled triggers

Logging

Logs are stored in the _internal database (or the database where the trigger is created) in the system.processing_engine_logs table. To view logs:

influxdb3 query --database _internal "SELECT * FROM system.processing_engine_logs WHERE trigger_name = 'your_trigger_name'"

Log columns:

  • event_time: Timestamp of the log event (with nanosecond precision)
  • trigger_name: Name of the trigger that generated the log
  • log_level: Severity level (INFO, WARN, ERROR)
  • log_text: Message describing the action or error with unique task_id for traceability

Main functions

process_scheduled_call(influxdb3_local, call_time, args)

Handles scheduled downsampling tasks. Queries historical data within the specified window and applies aggregation functions.

Key operations:

  1. Parses configuration from arguments or TOML file
  2. Queries source measurement with optional tag filters
  3. Applies time-based aggregation with specified functions
  4. Writes downsampled data with metadata columns

process_http_request(influxdb3_local, request_body, args)

Handles HTTP-triggered on-demand downsampling. Processes batch downsampling with configurable time ranges for backfill scenarios.

Key operations:

  1. Parses JSON request body parameters
  2. Processes data in configurable time batches
  3. Applies aggregation functions to historical data
  4. Returns processing statistics and results

aggregate_data(data, interval, calculations)

Core aggregation engine that applies statistical functions to time-series data.

Supported aggregation functions:

  • avg: Average value
  • sum: Sum of values
  • min: Minimum value
  • max: Maximum value
  • derivative: Rate of change
  • median: Median value

Troubleshooting

Common issues

Issue: No data in target measurement

Solution: Check that source measurement exists and contains data in the specified time window:

influxdb3 query --database mydb "SELECT COUNT(*) FROM source_measurement WHERE time >= now() - 1h"

Issue: Aggregation function not working

Solution: Verify field names and aggregation syntax. Use SHOW FIELD KEYS to check available fields:

influxdb3 query --database mydb "SHOW FIELD KEYS FROM source_measurement"

Issue: Tag filters not applied

Solution: Check tag value format. Use @ separator for multiple values:

--trigger-arguments 'tag_values=host:server1@server2@server3'

Issue: HTTP endpoint not accessible

Solution: Verify the trigger was created with correct request specification:

influxdb3 list triggers --database mydb

Debugging tips

  1. Check execution logs with task ID filtering:

    influxdb3 query --database _internal \
      "SELECT * FROM system.processing_engine_logs WHERE log_text LIKE '%task_id%' ORDER BY event_time DESC LIMIT 10"
  2. Test with smaller time windows for debugging:

    --trigger-arguments 'window=5min,interval=1min'
  3. Verify field types before aggregation:

    influxdb3 query --database mydb "SELECT * FROM source_measurement LIMIT 1"

Performance considerations

  • Batch processing: Use appropriate batch_size for HTTP requests to balance memory usage and performance
  • Field filtering: Use specific_fields to process only necessary data
  • Retry logic: Configure max_retries based on network reliability
  • Metadata overhead: Metadata columns add ~20% storage overhead but provide valuable debugging information
  • Index optimization: Tag filters are more efficient than field filters for large datasets

Report an issue

For plugin issues, see the Plugins repository issues page.

Find support for InfluxDB 3 Core

The InfluxDB Discord server is the best place to find support for InfluxDB 3 Core and InfluxDB 3 Enterprise. For other InfluxDB versions, see the Support and feedback options.


Was this page helpful?

Thank you for your feedback!


The future of Flux

Flux is going into maintenance mode. You can continue using it as you currently are without any changes to your code.

Read more

New in InfluxDB 3.4

Key enhancements in InfluxDB 3.4 and the InfluxDB 3 Explorer 1.2.

See the Blog Post

InfluxDB 3.4 is now available for both Core and Enterprise, which introduces offline token generation for use in automated deployments and configurable license type selection that lets you bypass the interactive license prompt. InfluxDB 3 Explorer 1.2 is also available, which includes InfluxDB cache management and other new features.

For more information, check out: