Documentation

Basic transformation plugin

The Basic Transformation Plugin enables real-time and scheduled transformation of time series data in InfluxDB 3. Transform field and tag names, convert values between units, and apply custom string replacements to standardize or clean your data. The plugin supports both scheduled batch processing of historical data and real-time transformation as data is written.

Configuration

Required parameters

ParameterTypeDefaultDescription
measurementstringrequiredSource measurement containing data to transform
target_measurementstringrequiredDestination measurement for transformed data
target_databasestringcurrent databaseDatabase for storing transformed data
dry_runstring"false"When "true", logs transformations without writing

Transformation parameters

ParameterTypeDefaultDescription
names_transformationsstringnoneField/tag name transformation rules. Format: 'field1:"transform1 transform2".field2:"transform3"'
values_transformationsstringnoneField value transformation rules. Format: 'field1:"transform1".field2:"transform2"'
custom_replacementsstringnoneCustom string replacements. Format: 'rule_name:"find=replace"'
custom_regexstringnoneRegex patterns for field matching. Format: 'pattern_name:"temp%"'

Data selection parameters

ParameterTypeDefaultDescription
windowstringrequired (scheduled only)Historical data window. Format: <number><unit> (for example, "30d", "1h")
included_fieldsstringall fieldsDot-separated list of fields to include (for example, "temp.humidity")
excluded_fieldsstringnoneDot-separated list of fields to exclude
filtersstringnoneQuery filters. Format: 'field:"operator value"'

TOML configuration

ParameterTypeDefaultDescription
config_file_pathstringnoneTOML config file path relative to PLUGIN_DIR (required for TOML configuration)

To use a TOML configuration file, set the PLUGIN_DIR environment variable and specify the config_file_path in the trigger arguments. This is in addition to the --plugin-dir flag when starting InfluxDB 3.

Example TOML configurations

influxdb3 create trigger \
 --database mydb \
 --plugin-filename basic_transformation.py \
 --trigger-spec "every:1d" \
 --trigger-arguments config_file_path=basic_transformation_config_scheduler.toml \
 basic_transform_trigger

For more information on using TOML configuration files, see the Using TOML Configuration Files section in the influxdb3_plugins /README.md.

Schema requirements

The plugin assumes that the table schema is already defined in the database, as it relies on this schema to retrieve field and tag names required for processing.

Requires existing schema

By design, the plugin returns an error if the schema doesn’t exist or doesn’t contain the expected columns.

Installation steps

  1. Start InfluxDB 3 Enterprise with the Processing Engine enabled (--plugin-dir /path/to/plugins)

    influxdb3 serve \
      --node-id node0 \
      --object-store file \
      --data-dir ~/.influxdb3 \
      --plugin-dir ~/.plugins
  2. Install required Python packages:

    • pint (for unit conversions)
    influxdb3 install package pint

Trigger setup

Scheduled transformation

Run transformations periodically on historical data:

influxdb3 create trigger \
  --database mydb \
  --plugin-filename gh:influxdata/basic_transformation/basic_transformation.py \
  --trigger-spec "every:1h" \
  --trigger-arguments 'measurement=temperature,window=24h,target_measurement=temperature_normalized,names_transformations=temp:"snake",values_transformations=temp:"convert_degC_to_degF"' \
  hourly_temp_transform

Real-time transformation

Transform data as it’s written:

influxdb3 create trigger \
  --database mydb \
  --plugin-filename gh:influxdata/basic_transformation/basic_transformation.py \
  --trigger-spec "all_tables" \
  --trigger-arguments 'measurement=sensor_data,target_measurement=sensor_data_clean,names_transformations=.*:"snake alnum_underscore_only"' \
  realtime_clean

Example usage

Example 1: Temperature unit conversion

Convert temperature readings from Celsius to Fahrenheit while standardizing field names:

# Create the trigger
influxdb3 create trigger \
  --database weather \
  --plugin-filename gh:influxdata/basic_transformation/basic_transformation.py \
  --trigger-spec "every:30m" \
  --trigger-arguments 'measurement=raw_temps,window=1h,target_measurement=temps_fahrenheit,names_transformations=Temperature:"snake",values_transformations=temperature:"convert_degC_to_degF"' \
  temp_converter

# Write test data
influxdb3 write \
  --database weather \
  "raw_temps,location=office Temperature=22.5"

# Query transformed data (after trigger runs)
influxdb3 query \
  --database weather \
  "SELECT * FROM temps_fahrenheit"

Expected output

location | temperature | time
---------|-------------|-----
office   | 72.5        | 2024-01-01T00:00:00Z

Transformation details:

  • Before: Temperature=22.5 (Celsius)
  • After: temperature=72.5 (Fahrenheit, field name converted to snake_case)

Example 2: Field name standardization

Clean and standardize field names from various sensors:

# Create trigger with multiple transformations
influxdb3 create trigger \
  --database sensors \
  --plugin-filename gh:influxdata/basic_transformation/basic_transformation.py \
  --trigger-spec "all_tables" \
  --trigger-arguments 'measurement=raw_sensors,target_measurement=clean_sensors,names_transformations=.*:"snake alnum_underscore_only collapse_underscore trim_underscore"' \
  field_cleaner

# Write data with inconsistent field names
influxdb3 write \
  --database sensors \
  "raw_sensors,device=sensor1 \"Room Temperature\"=20.1,\"__Humidity_%\"=45.2"

# Query cleaned data
influxdb3 query \
  --database sensors \
  "SELECT * FROM clean_sensors"

Expected output

device  | room_temperature | humidity | time
--------|------------------|----------|-----
sensor1 | 20.1            | 45.2     | 2024-01-01T00:00:00Z

Transformation details:

  • Before: "Room Temperature"=20.1, "__Humidity_%"=45.2
  • After: room_temperature=20.1, humidity=45.2 (field names standardized)

Example 3: Custom string replacements

Replace specific strings in field values:

# Create trigger with custom replacements
influxdb3 create trigger \
  --database inventory \
  --plugin-filename gh:influxdata/basic_transformation/basic_transformation.py \
  --trigger-spec "every:1d" \
  --trigger-arguments 'measurement=products,window=7d,target_measurement=products_updated,values_transformations=status:"status_replace",custom_replacements=status_replace:"In Stock=available.Out of Stock=unavailable"' \
  status_updater

Code overview

Files

  • basic_transformation.py: The main plugin code containing handlers for scheduled tasks and data write transformations
  • basic_transformation_config_data_writes.toml: Example TOML configuration file for data write triggers
  • basic_transformation_config_scheduler.toml: Example TOML configuration file for scheduled triggers

Logging

Logs are stored in the _internal database (or the database where the trigger is created) in the system.processing_engine_logs table. To view logs:

influxdb3 query --database _internal "SELECT * FROM system.processing_engine_logs WHERE trigger_name = 'your_trigger_name'"

Log columns:

  • event_time: Timestamp of the log event
  • trigger_name: Name of the trigger that generated the log
  • log_level: Severity level (INFO, WARN, ERROR)
  • log_text: Message describing the action or error

Main functions

process_scheduled_call(influxdb3_local, call_time, args)

Handles scheduled transformation tasks. Queries historical data within the specified window and applies transformations.

Key operations:

  1. Parses configuration from arguments
  2. Queries source measurement with filters
  3. Applies name and value transformations
  4. Writes transformed data to target measurement

process_writes(influxdb3_local, table_batches, args)

Handles real-time transformation during data writes. Processes incoming data batches and applies transformations before writing.

Key operations:

  1. Filters relevant table batches
  2. Applies transformations to each row
  3. Writes to target measurement immediately

apply_transformations(value, transformations)

Core transformation engine that applies a chain of transformations to a value.

Supported transformations:

  • String operations: lower, upper, snake
  • Space handling: space_to_underscore, remove_space
  • Character filtering: alnum_underscore_only
  • Underscore management: collapse_underscore, trim_underscore
  • Unit conversions: convert_<from>_to_<to>
  • Custom replacements: User-defined string substitutions

Troubleshooting

Common issues

Issue: Transformations not applying

Solution: Check that field names match exactly (case-sensitive). Use regex patterns for flexible matching:

--trigger-arguments 'custom_regex=temp_fields:"temp%",values_transformations=temp_fields:"convert_degC_to_degF"'

Issue: “Permission denied” errors in logs

Solution: Ensure the plugin file has execute permissions:

chmod +x ~/.plugins/basic_transformation.py

Issue: Unit conversion failing

Solution: Verify unit names are valid pint units. Common units:

  • Temperature: degC, degF, degK
  • Length: meter, foot, inch
  • Time: second, minute, hour

Issue: No data in target measurement

Solution:

  1. Check dry_run is not set to “true”
  2. Verify source measurement contains data
  3. Check logs for errors:
    influxdb3 query \
      --database _internal \
      "SELECT * FROM system.processing_engine_logs WHERE trigger_name = 'your_trigger_name'"

Debugging tips

  1. Enable dry run to test transformations:

    --trigger-arguments 'dry_run=true,...'
  2. Use specific time windows for testing:

    --trigger-arguments 'window=1h,...'
  3. Check field names in source data:

    influxdb3 query --database mydb "SHOW FIELD KEYS FROM measurement"

Performance considerations

  • Field name caching reduces query overhead (1-hour cache)
  • Batch processing for scheduled tasks improves throughput
  • Retry mechanism (3 attempts) handles transient write failures
  • Use filters to process only relevant data

Report an issue

For plugin issues, see the Plugins repository issues page.

Find support for InfluxDB 3 Enterprise

The InfluxDB Discord server is the best place to find support for InfluxDB 3 Core and InfluxDB 3 Enterprise. For other InfluxDB versions, see the Support and feedback options.


Was this page helpful?

Thank you for your feedback!


The future of Flux

Flux is going into maintenance mode. You can continue using it as you currently are without any changes to your code.

Read more

New in InfluxDB 3.4

Key enhancements in InfluxDB 3.4 and the InfluxDB 3 Explorer 1.2.

See the Blog Post

InfluxDB 3.4 is now available for both Core and Enterprise, which introduces offline token generation for use in automated deployments and configurable license type selection that lets you bypass the interactive license prompt. InfluxDB 3 Explorer 1.2 is also available, which includes InfluxDB cache management and other new features.

For more information, check out: