Documentation

Migrate data from InfluxDB Cloud to InfluxDB Clustered

To migrate data from InfluxDB Cloud (TSM) to an InfluxDB Clustered powered by the v3 storage engine, query the data from your TSM-powered buckets in time-based batches and write the queried data to a database in your InfluxDB cluster. Because full data migrations will likely exceed your InfluxDB Cloud organizations' limits and adjustable quotas, migrate your data in batches.

The following guide provides instructions for setting up an InfluxDB task that queries data from an InfluxDB Cloud TSM-powered bucket in time-based batches and writes each batch to an InfluxDB Clustered (InfluxDB 3) database in another organization.

All query requests are subject to your InfluxDB Cloud organization’s rate limits and adjustable quotas.

Before you migrate

Before you migrate from InfluxDB Cloud (TSM) to InfluxDB Clustered, there are schema design practices supported by the TSM storage engine that are not supported in the InfluxDB 3 storage engine. Specifically, InfluxDB 3 enforces the following schema restrictions:

  • You can’t use duplicate names for tags and fields.
  • By default, measurements can contain up to 250 columns where each column represents time, a field, or a tag.

For more information, see Schema restrictions.

If your schema does not adhere to these restrictions, you must update your schema before migrating to InfluxDB Clustered.

Fix duplicate tag and field names

Fix measurements with more than 250 total columns

Set up the migration

The migration process requires two buckets in your source InfluxDB organization: one bucket to store the data you’re migrating and a second bucket to store migration metadata. If you’re using the InfluxDB Cloud Free Plan, and have more than one bucket to migrate, you will exceed your plan’s bucket limit. To migrate more than one bucket, you need to upgrade to the Usage-based plan to complete the migration.

  1. In the InfluxDB cluster you’re migrating data to:

    1. Create a database to migrate data to.
    2. Create a database token with write access to the database you want to migrate to.
  2. In the InfluxDB Cloud (TSM) organization you’re migrating data from:

    1. Add the InfluxDB Clustered token (created in step 1b)_ as a secret using the key, influxdb3_clustered_TOKEN. See Add secrets for more information.

    2. Create a bucket to store temporary migration metadata.

    3. Create a task using the provided migration task. Update the necessary migration configuration options.

    4. (Optional) Set up migration monitoring.

    5. Save the task.

      Newly-created tasks are enabled by default, so the data migration begins when you save the task.

After the migration is complete, each subsequent migration task execution will fail with the following error:

error exhausting result iterator: error calling function "die" @41:9-41:86:
Batch range is beyond the migration range. Migration is complete.

Migration task

Configure the migration

  1. Specify how often you want the task to run using the task.every option. See Determine your task interval.

  2. Define the following properties in the migration record:

    migration
    • start: Earliest time to include in the migration. See Determine your migration start time.
    • stop: Latest time to include in the migration.
    • batchInterval: Duration of each time-based batch. See Determine your batch interval.
    • batchBucket: InfluxDB Cloud (TSM) bucket to store migration batch metadata in.
    • sourceBucket: InfluxDB Cloud (TSM) bucket to migrate data from.
    • destinationHost: InfluxDB cluster URL to migrate data to.
    • destinationOrg: Provide an empty string (ignored by InfluxDB Clustered).
    • destinationToken: InfluxDB Clustered token. To keep the API token secure, store it as a secret in InfluxDB Cloud (TSM).
    • destinationDatabase: InfluxDB Clustered database to migrate data to.

Migration Flux script

import "array"
import "experimental"
import "date"
import "influxdata/influxdb/secrets"

// Configure the task
option task = {every: 5m, name: "Migrate data from TSM to v3"}

// Configure the migration
migration = {
    start: 2023-01-01T00:00:00Z,
    stop: 2023-02-01T00:00:00Z,
    batchInterval: 1h,
    batchBucket: "migration",
    sourceBucket: "example-cloud-bucket",
    destinationHost: "https://cluster-host.com",
    destinationOrg: "",
    destinationToken: secrets.get(key: "influxdb3_clustered_TOKEN"),
    destinationDatabase: "example-destination-database",
}

// batchRange dynamically returns a record with start and stop properties for
// the current batch. It queries migration metadata stored in the
// `migration.batchBucket` to determine the stop time of the previous batch.
// It uses the previous stop time as the new start time for the current batch
// and adds the `migration.batchInterval` to determine the current batch stop time.
batchRange = () => {
    _lastBatchStop =
        (from(bucket: migration.batchBucket)
            |> range(start: migration.start)
            |> filter(fn: (r) => r._field == "batch_stop")
            |> filter(fn: (r) => r.dstOrg == migration.destinationOrg)
            |> filter(fn: (r) => r.dstBucket == migration.destinationDatabase)
            |> last()
            |> findRecord(fn: (key) => true, idx: 0))._value
    _batchStart =
        if exists _lastBatchStop then
            time(v: _lastBatchStop)
        else
            migration.start

    return {start: _batchStart, stop: date.add(d: migration.batchInterval, to: _batchStart)}
}

// Define a static record with batch start and stop time properties
batch = batchRange()

// Check to see if the current batch start time is beyond the migration.stop
// time and exit with an error if it is.
finished =
    if batch.start >= migration.stop then
        die(msg: "Batch range is beyond the migration range. Migration is complete.")
    else
        "Migration in progress"

// Query all data from the specified source bucket within the batch-defined time
// range. To limit migrated data by measurement, tag, or field, add a `filter()`
// function after `range()` with the appropriate predicate fn.
data = () =>
    from(bucket: migration.sourceBucket)
        |> range(start: batch.start, stop: batch.stop)

// rowCount is a stream of tables that contains the number of rows returned in
// the batch and is used to generate batch metadata.
rowCount =
    data()
        |> count()
        |> group(columns: ["_start", "_stop"])
        |> sum()

// emptyRange is a stream of tables that acts as filler data if the batch is
// empty. This is used to generate batch metadata for empty batches and is
// necessary to correctly increment the time range for the next batch.
emptyRange = array.from(rows: [{_start: batch.start, _stop: batch.stop, _value: 0}])

// metadata returns a stream of tables representing batch metadata.
metadata = () => {
    _input =
        if exists (rowCount |> findRecord(fn: (key) => true, idx: 0))._value then
            rowCount
        else
            emptyRange

    return
        _input
            |> map(
                fn: (r) =>
                    ({
                        _time: now(),
                        _measurement: "batches",
                        srcBucket: migration.sourceBucket,
                        dstOrg: migration.destinationOrg,
                        dstBucket: migration.destinationDatabase,
                        batch_start: string(v: batch.start),
                        batch_stop: string(v: batch.stop),
                        rows: r._value,
                        percent_complete:
                            float(v: int(v: r._stop) - int(v: migration.start)) / float(
                                    v: int(v: migration.stop) - int(v: migration.start),
                                ) * 100.0,
                    }),
            )
            |> group(columns: ["_measurement", "dstOrg", "srcBucket", "dstBucket"])
}

// Write the queried data to the specified InfluxDB OSS bucket.
data()
    |> to(
        host: migration.destinationHost,
        org: migration.destinationOrg,
        token: migration.destinationToken,
        bucket: migration.destinationDatabase
    )

// Generate and store batch metadata in the migration.batchBucket.
metadata()
    |> experimental.to(bucket: migration.batchBucket)

Configuration help

Determine your task interval

Determine your migration start time

Determine your batch interval

Troubleshoot migration task failures

If the migration task fails, view your task logs to identify the specific error. Below are common causes of migration task failures.

Exceeded rate limits

If your data migration causes you to exceed your InfluxDB Cloud organization’s limits and quotas, the task will return an error similar to:

too many requests

Possible solutions:

  • Update the migration.batchInterval setting in your migration task to use a smaller interval. Each batch will then query less data.

Invalid API token

If the API token you add as the influxdb3_clustered_TOKEN doesn’t have write access to your InfluxDB Clustered database, the task will return an error similar to:

unauthorized access

Possible solutions:

  • Ensure the API token has write access to your InfluxDB Clustered database.
  • Generate a new InfluxDB Clustered token with write access to the database you want to migrate to. Then, update the influxdb3_clustered_TOKEN secret in your InfluxDB Cloud (TSM) instance with the new token.

Query timeout

The InfluxDB Cloud query timeout is 90 seconds. If it takes longer than this to return the data from the batch interval, the query will time out and the task will fail.

Possible solutions:

  • Update the migration.batchInterval setting in your migration task to use a smaller interval. Each batch will then query less data and take less time to return results.

Batch size is too large

If your batch size is too large, the task returns an error similar to the following:

internal error: error calling function "metadata" @97:1-97:11: error calling function "findRecord" @67:32-67:69: wrong number of fields

Possible solutions:

  • Update the migration.batchInterval setting in your migration task to use a smaller interval and retrieve less data per batch.

Was this page helpful?

Thank you for your feedback!


The future of Flux

Flux is going into maintenance mode. You can continue using it as you currently are without any changes to your code.

Read more

InfluxDB 3 Open Source Now in Public Alpha

InfluxDB 3 Open Source is now available for alpha testing, licensed under MIT or Apache 2 licensing.

We are releasing two products as part of the alpha.

InfluxDB 3 Core, is our new open source product. It is a recent-data engine for time series and event data. InfluxDB 3 Enterprise is a commercial version that builds on Core’s foundation, adding historical query capability, read replicas, high availability, scalability, and fine-grained security.

For more information on how to get started, check out: