Documentation

Join data with Flux

Use the Flux join package to join two data sets based on common values using the following join methods:

Inner join

Left outer join

Right outer join

Full outer join

The join package lets you join data from different data sources such as InfluxDB, SQL database, CSV, and others.

Use join functions to join your data

  1. Import the join package.

  2. Define the left and right data streams to join:

    • Each stream must have one or more columns with common values. Column labels do not need to match, but column values do.
    • Each stream should have identical group keys.

    For more information, see join data requirements.

  3. Use join.inner() to join the two streams together. Provide the following required parameters:

    • left: Stream of data representing the left side of the join.
    • right: Stream of data representing the right side of the join.
    • on: Join predicate. For example: (l, r) => l.column == r.column.
    • as: Join output function that returns a record with values from each input stream. For example: (l, r) => ({l with column1: r.column1, column2: r.column2}).
import "join"
import "sql"

left =
    from(bucket: "example-bucket-1")
        |> range(start: "-1h")
        |> filter(fn: (r) => r._measurement == "example-measurement")
        |> filter(fn: (r) => r._field == "example-field")

right =
    sql.from(
        driverName: "postgres",
        dataSourceName: "postgresql://username:password@localhost:5432",
        query: "SELECT * FROM example_table",
    )

join.inner(
    left: left,
    right: right,
    on: (l, r) => l.column == r.column,
    as: (l, r) => ({l with name: r.name, location: r.location}),
)
  • Copy
  • Fill window

For more information and detailed examples, see Perform an inner join in the Flux documentation.

  1. Import the join package.

  2. Define the left and right data streams to join:

    • Each stream must have one or more columns with common values. Column labels do not need to match, but column values do.
    • Each stream should have identical group keys.

    For more information, see join data requirements.

  3. Use join.left() to join the two streams together. Provide the following required parameters:

    • left: Stream of data representing the left side of the join.
    • right: Stream of data representing the right side of the join.
    • on: Join predicate. For example: (l, r) => l.column == r.column.
    • as: Join output function that returns a record with values from each input stream. For example: (l, r) => ({l with column1: r.column1, column2: r.column2}).
import "join"
import "sql"

left =
    from(bucket: "example-bucket-1")
        |> range(start: "-1h")
        |> filter(fn: (r) => r._measurement == "example-measurement")
        |> filter(fn: (r) => r._field == "example-field")

right =
    sql.from(
        driverName: "postgres",
        dataSourceName: "postgresql://username:password@localhost:5432",
        query: "SELECT * FROM example_table",
    )

join.left(
    left: left,
    right: right,
    on: (l, r) => l.column == r.column,
    as: (l, r) => ({l with name: r.name, location: r.location}),
)
  • Copy
  • Fill window

For more information and detailed examples, see Perform a left outer join in the Flux documentation.

  1. Import the join package.

  2. Define the left and right data streams to join:

    • Each stream must have one or more columns with common values. Column labels do not need to match, but column values do.
    • Each stream should have identical group keys.

    For more information, see join data requirements.

  3. Use join.right() to join the two streams together. Provide the following required parameters:

    • left: Stream of data representing the left side of the join.
    • right: Stream of data representing the right side of the join.
    • on: Join predicate. For example: (l, r) => l.column == r.column.
    • as: Join output function that returns a record with values from each input stream. For example: (l, r) => ({l with column1: r.column1, column2: r.column2}).
import "join"
import "sql"

left =
    from(bucket: "example-bucket-1")
        |> range(start: "-1h")
        |> filter(fn: (r) => r._measurement == "example-measurement")
        |> filter(fn: (r) => r._field == "example-field")

right =
    sql.from(
        driverName: "postgres",
        dataSourceName: "postgresql://username:password@localhost:5432",
        query: "SELECT * FROM example_table",
    )

join.right(
    left: left,
    right: right,
    on: (l, r) => l.column == r.column,
    as: (l, r) => ({l with name: r.name, location: r.location}),
)
  • Copy
  • Fill window

For more information and detailed examples, see Perform a right outer join in the Flux documentation.

  1. Import the join package.

  2. Define the left and right data streams to join:

    • Each stream must have one or more columns with common values. Column labels do not need to match, but column values do.
    • Each stream should have identical group keys.

    For more information, see join data requirements.

  3. Use join.full() to join the two streams together. Provide the following required parameters:

    • left: Stream of data representing the left side of the join.
    • right: Stream of data representing the right side of the join.
    • on: Join predicate. For example: (l, r) => l.column == r.column.
    • as: Join output function that returns a record with values from each input stream. For example: (l, r) => ({l with column1: r.column1, column2: r.column2}).

Full outer joins must account for non-group-key columns in both l and r records being null. Use conditional logic to check which record contains non-null values for columns not in the group key. For more information, see Account for missing, non-group-key values.

import "join"
import "sql"

left =
    from(bucket: "example-bucket-1")
        |> range(start: "-1h")
        |> filter(fn: (r) => r._measurement == "example-measurement")
        |> filter(fn: (r) => r._field == "example-field")

right =
    sql.from(
        driverName: "postgres",
        dataSourceName: "postgresql://username:password@localhost:5432",
        query: "SELECT * FROM example_table",
    )

join.full(
    left: left,
    right: right,
    on: (l, r) => l.id== r.id,
    as: (l, r) => {
        id = if exists l.id then l.id else r.id
        
        return {name: l.name, location: r.location, id: id}
    },
)
  • Copy
  • Fill window

For more information and detailed examples, see Perform a full outer join in the Flux documentation.

  1. Import the join package.

  2. Define the left and right data streams to join:

    • Each stream must also have a _time column.
    • Each stream must have one or more columns with common values. Column labels do not need to match, but column values do.
    • Each stream should have identical group keys.

    For more information, see join data requirements.

  3. Use join.time() to join the two streams together based on time values. Provide the following parameters:

    • left: (Required) Stream of data representing the left side of the join.
    • right: (Required) Stream of data representing the right side of the join.
    • as: (Required) Join output function that returns a record with values from each input stream. For example: (l, r) => ({r with column1: l.column1, column2: l.column2}).
    • method: Join method to use. Default is inner.
import "join"
import "sql"

left =
    from(bucket: "example-bucket-1")
        |> range(start: "-1h")
        |> filter(fn: (r) => r._measurement == "example-m1")
        |> filter(fn: (r) => r._field == "example-f1")

right =
    from(bucket: "example-bucket-2")
        |> range(start: "-1h")
        |> filter(fn: (r) => r._measurement == "example-m2")
        |> filter(fn: (r) => r._field == "example-f2")

join.time(method: "left", left: left, right: right, as: (l, r) => ({l with f2: r._value}))
  • Copy
  • Fill window

For more information and detailed examples, see Join on time in the Flux documentation.


When to use union and pivot instead of join functions

We recommend using the join package to join streams that have mostly different schemas or that come from two separate data sources. If you’re joining two datasets queried from InfluxDB, using union() and pivot() to combine the data will likely be more performant.

For example, if you need to query fields from different InfluxDB buckets and align field values in each row based on time:

f1 =
    from(bucket: "example-bucket-1")
        |> range(start: "-1h")
        |> filter(fn: (r) => r._field == "f1")
        |> drop(columns: "_measurement")

f2 =
    from(bucket: "example-bucket-2")
        |> range(start: "-1h")
        |> filter(fn: (r) => r._field == "f2")
        |> drop(columns: "_measurement")

union(tables: [f1, f2])
    |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  • Copy
  • Fill window

View example input and output data


Was this page helpful?

Thank you for your feedback!


The future of Flux

Flux is going into maintenance mode. You can continue using it as you currently are without any changes to your code.

Read more

InfluxDB 3 Core and Enterprise are now in Beta

InfluxDB 3 Core and Enterprise are now available for beta testing, available under MIT or Apache 2 license.

InfluxDB 3 Core is a high-speed, recent-data engine that collects and processes data in real-time, while persisting it to local disk or object storage. InfluxDB 3 Enterprise is a commercial product that builds on Core’s foundation, adding high availability, read replicas, enhanced security, and data compaction for faster queries. A free tier of InfluxDB 3 Enterprise will also be available for at-home, non-commercial use for hobbyists to get the full historical time series database set of capabilities.

For more information, check out: