InfluxDB to Iceberg plugin
The InfluxDB to Iceberg Plugin enables data transfer from InfluxDB 3 Enterprise to Apache Iceberg tables. Transfer time series data to Iceberg for long-term storage, analytics, or integration with data lake architectures. The plugin supports both scheduled batch transfers of historical data and on-demand transfers via HTTP API.
Configuration
Plugin parameters may be specified as key-value pairs in the --trigger-arguments flag (CLI) or in the trigger_arguments field (API) when creating a trigger. Some plugins support TOML configuration files, which can be specified using the plugin’s config_file_path parameter.
If a plugin supports multiple trigger specifications, some parameters may depend on the trigger specification that you use.
Plugin metadata
This plugin includes a JSON metadata schema in its docstring that defines supported trigger types and configuration parameters. This metadata enables the InfluxDB 3 Explorer UI to display and configure the plugin.
Scheduler trigger parameters
Required parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
measurement | string | required | Source measurement containing data to transfer |
window | string | required | Time window for data transfer. Format: <number><unit> (for example, “1h”, “30d”) |
catalog_configs | string | required | Base64-encoded JSON string containing Iceberg catalog configuration |
Optional parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
included_fields | string | all fields/tags | Dot-separated list of fields and tags to include (for example, “usage_user.host”) |
excluded_fields | string | none | Dot-separated list of fields and tags to exclude |
namespace | string | “default” | Iceberg namespace for the target table |
table_name | string | measurement name | Iceberg table name |
auto_update_schema | string | false | Automatically update Iceberg table schema when data doesn’t match existing schema |
TOML configuration
| Parameter | Type | Default | Description |
|---|---|---|---|
config_file_path | string | none | TOML config file path relative to PLUGIN_DIR (required for TOML configuration) |
To use a TOML configuration file, set the PLUGIN_DIR environment variable and specify the config_file_path in the trigger arguments. This is in addition to the --plugin-dir flag when starting InfluxDB 3 Enterprise.
Example TOML configuration
influxdb_to_iceberg_config_scheduler.toml
For more information on using TOML configuration files, see the Using TOML Configuration Files section in the influxdb3_plugins/README.md.
HTTP trigger parameters
Request body structure
| Parameter | Type | Required | Description |
|---|---|---|---|
measurement | string | Yes | Source measurement containing data to transfer |
catalog_configs | object | Yes | Iceberg catalog configuration dictionary. See PyIceberg catalog documentation |
included_fields | array | No | List of field and tag names to include in replication |
excluded_fields | array | No | List of field and tag names to exclude from replication |
namespace | string | No | Target Iceberg namespace (default: “default”) |
table_name | string | No | Target Iceberg table name (default: measurement name) |
batch_size | string | No | Batch size duration for processing (default: “1d”). Format: <number><unit> |
backfill_start | string | No | ISO 8601 datetime with timezone for backfill start |
backfill_end | string | No | ISO 8601 datetime with timezone for backfill end |
auto_update_schema | boolean | No | Automatically update Iceberg table schema when data doesn’t match existing schema (default: false) |
Schema management
- Automatically creates Iceberg table schema from the first batch of data
- Maps pandas data types to Iceberg types:
int64→IntegerTypefloat64→FloatTypedatetime64[us]→TimestampTypeobject→StringType
- Fields with no null values are marked as
required - The
timecolumn is converted todatetime64[us]for Iceberg compatibility - Tables are created in format:
<namespace>.<table_name>
Automatic schema updates
When auto_update_schema=true:
- New fields: Automatically added to Iceberg table schema as optional (nullable) columns
- Missing fields: Added to DataFrame with null values based on existing schema types
- Schema evolution: Ensures data compatibility between InfluxDB and Iceberg without manual intervention
- Backward compatibility: Existing data remains valid as new columns are always optional
Software Requirements
- InfluxDB 3 Enterprise: with the Processing Engine enabled
- Python packages:
pandas(for data manipulation)pyarrow(for Parquet support)pyiceberg[catalog-options](for Iceberg integration)
Installation steps
Start InfluxDB 3 Enterprise with the Processing Engine enabled (
--plugin-dir /path/to/plugins):influxdb3 serve \ --node-id node0 \ --object-store file \ --data-dir ~/.influxdb3 \ --plugin-dir ~/.pluginsInstall required Python packages:
influxdb3 install package pandas influxdb3 install package pyarrow influxdb3 install package "pyiceberg[s3fs,hive,sql-sqlite]"
Note: Include the appropriate PyIceberg extras based on your catalog type:
[s3fs]for S3 storage[hive]for Hive metastore[sql-sqlite]for SQL catalog with SQLite- See PyIceberg documentation for all options
Schema requirement
The plugin assumes that the table schema is already defined in the database, as it relies on this schema to retrieve field and tag names required for processing.
Trigger setup
Scheduled data transfer
Periodically transfer data from InfluxDB 3 Enterprise to Iceberg:
influxdb3 create trigger \
--database mydb \
--path "gh:influxdata/influxdb_to_iceberg/influxdb_to_iceberg.py" \
--trigger-spec "every:1h" \
--trigger-arguments 'measurement=cpu,window=1h,catalog_configs="eyJ1cmkiOiAiaHR0cDovL25lc3NpZTo5MDAwIn0=",namespace=monitoring,table_name=cpu_metrics' \
hourly_iceberg_transferHTTP API endpoint
Create an on-demand transfer endpoint:
influxdb3 create trigger \
--database mydb \
--path "gh:influxdata/influxdb_to_iceberg/influxdb_to_iceberg.py" \
--trigger-spec "request:replicate" \
iceberg_http_transferEnable the trigger:
influxdb3 enable trigger --database mydb iceberg_http_transferThe endpoint is registered at /api/v3/engine/replicate.
Example usage
Example 1: Basic scheduled transfer
Transfer CPU metrics to Iceberg every hour:
# Create trigger with base64-encoded catalog config
# Original JSON: {"uri": "http://nessie:9000"}
# Base64: eyJ1cmkiOiAiaHR0cDovL25lc3NpZTo5MDAwIn0=
influxdb3 create trigger \
--database metrics \
--path "gh:influxdata/influxdb_to_iceberg/influxdb_to_iceberg.py" \
--trigger-spec "every:1h" \
--trigger-arguments 'measurement=cpu,window=24h,catalog_configs="eyJ1cmkiOiAiaHR0cDovL25lc3NpZTo5MDAwIn0="' \
cpu_to_iceberg
# Write test data
influxdb3 write \
--database metrics \
"cpu,host=server1 usage_user=45.2,usage_system=12.1"
# After trigger runs, data is available in Iceberg table "default.cpu"Expected output
- Creates Iceberg table
default.cpuwith schema matching the measurement - Transfers all CPU data from the last 24 hours
- Appends new data on each hourly run
Example 2: HTTP backfill with field filtering
Backfill specific fields from historical data:
# Create and enable HTTP trigger
influxdb3 create trigger \
--database metrics \
--path "gh:influxdata/influxdb_to_iceberg/influxdb_to_iceberg.py" \
--trigger-spec "request:replicate" \
iceberg_backfill
influxdb3 enable trigger --database metrics iceberg_backfill
# Request backfill via HTTP
curl -X POST http://localhost:8181/api/v3/engine/replicate \
-H "Authorization: Bearer YOUR_TOKEN" \
-d '{
"measurement": "temperature",
"catalog_configs": {
"type": "sql",
"uri": "sqlite:///path/to/catalog.db"
},
"included_fields": ["temp_celsius", "humidity", "sensor_id"],
"namespace": "weather",
"table_name": "temperature_history",
"batch_size": "12h",
"backfill_start": "2024-01-01T00:00:00+00:00",
"backfill_end": "2024-01-07T00:00:00+00:00"
}'Expected output
- Creates Iceberg table
weather.temperature_history - Transfers only
temp_celsiusandhumidityfields - Processes data in 12-hour batches for the specified week
- Returns status of the backfill operation
Example 3: S3-backed Iceberg catalog
Transfer data to Iceberg tables stored in S3:
# Create catalog config JSON
cat > catalog_config.json << EOF
{
"type": "sql",
"uri": "sqlite:///iceberg/catalog.db",
"warehouse": "s3://my-bucket/iceberg-warehouse/",
"s3.endpoint": "http://minio:9000",
"s3.access-key-id": "minioadmin",
"s3.secret-access-key": "minioadmin",
"s3.path-style-access": true
}
EOF
# Encode to base64
CATALOG_CONFIG=$(base64 < catalog_config.json)
# Create trigger
influxdb3 create trigger \
--database metrics \
--path "gh:influxdata/influxdb_to_iceberg/influxdb_to_iceberg.py" \
--trigger-spec "every:30m" \
--trigger-arguments "measurement=sensor_data,window=1h,catalog_configs=\"$CATALOG_CONFIG\",namespace=iot,table_name=sensors" \
s3_iceberg_transferCode overview
Files
influxdb_to_iceberg.py: The main plugin code containing handlers for scheduled and HTTP triggersinfluxdb_to_iceberg_config_scheduler.toml: Example TOML configuration file for scheduled triggers
Logging
Logs are stored in the trigger’s database in the system.processing_engine_logs table. To view logs:
influxdb3 query --database YOUR_DATABASE "SELECT * FROM system.processing_engine_logs WHERE trigger_name = 'your_trigger_name'"Log columns:
- event_time: Timestamp of the log event
- trigger_name: Name of the trigger that generated the log
- log_level: Severity level (INFO, WARN, ERROR)
- log_text: Message describing the action or error
Main functions
process_scheduled_call(influxdb3_local, call_time, args)
Handles scheduled data transfers. Queries data within the specified window and appends to Iceberg tables.
Key operations:
- Parses configuration and decodes catalog settings
- Queries source measurement with optional field filtering
- Creates Iceberg table if needed
- Appends data to Iceberg table
process_http_request(influxdb3_local, request_body, args)
Handles on-demand data transfers via HTTP. Supports backfill operations with configurable batch sizes.
Key operations:
- Validates request body parameters
- Determines backfill time range
- Processes data in batches
- Returns transfer status
Troubleshooting
Common issues
Issue: “Failed to decode catalog_configs” error
Solution: Ensure the catalog configuration is properly base64-encoded:
# Create JSON file
echo '{"uri": "http://nessie:9000"}' > config.json
# Encode to base64
base64 config.jsonIssue: “Failed to create Iceberg table” error
Solution:
- Verify catalog configuration is correct
- Check warehouse path permissions
- Ensure required PyIceberg extras are installed:
bash influxdb3 install package "pyiceberg[s3fs]"
Issue: No data in Iceberg table after transfer
Solution:
- Check if source measurement contains data:
bash influxdb3 query --database mydb "SELECT COUNT(*) FROM measurement" - Verify time window covers data:
bash influxdb3 query --database mydb "SELECT MIN(time), MAX(time) FROM measurement" - Check logs for errors:
bash influxdb3 query --database YOUR_DATABASE "SELECT * FROM system.processing_engine_logs WHERE log_level = 'ERROR'"
Issue: “Incompatible change: cannot add required column” error
Solution: This occurs when trying to add a required (non-nullable) column to an existing table. With auto_update_schema=true, new columns are automatically added as optional. If you encounter this error:
- Ensure
auto_update_schema=truein your configuration - Check that you’re using the latest version of the plugin
Debugging tips
- Test catalog connectivity:
from pyiceberg.catalog import load_catalog
catalog = load_catalog("my_catalog", **catalog_configs)
print(catalog.list_namespaces())Verify field names:
influxdb3 query --database mydb "SHOW FIELD KEYS FROM measurement"Use smaller windows for initial testing:
--trigger-arguments 'window=5m,...'
Performance considerations
- File sizing: Each scheduled run creates new Parquet files. Use appropriate window sizes to balance file count and size
- Batch processing: For HTTP transfers, adjust
batch_sizebased on available memory - Field and tag filtering: Use
included_fieldsto reduce data volume when only specific fields and tags are needed - Catalog choice: SQL catalogs (SQLite) are simpler but REST catalogs scale better
Report an issue
For plugin issues, see the Plugins repository issues page.
Find support for InfluxDB 3 Enterprise
The InfluxDB Discord server is the best place to find support for InfluxDB 3 Core and InfluxDB 3 Enterprise. For other InfluxDB versions, see the Support and feedback options.
Was this page helpful?
Thank you for your feedback!
Support and feedback
Thank you for being part of our community! We welcome and encourage your feedback and bug reports for InfluxDB 3 Enterprise and this documentation. To find support, use the following resources:
Customers with an annual or support contract can contact InfluxData Support. Customers using a trial license can email trial@influxdata.com for assistance.