InfluxDB subscriptions are local or remote endpoints to which all data written to InfluxDB is copied. Subscriptions are primarily used with Kapacitor, but any endpoint able to accept UDP, HTTP, or HTTPS connections can subscribe to InfluxDB and receive a copy of all data as it is written.
How subscriptions work
As data is written to InfluxDB, writes are duplicated to subscriber endpoints via HTTP, HTTPS, or UDP in line protocol. InfluxDB’s subscriber service creates multiple “writers” (goroutines) which send writes to the subscription endpoints.
The number of writer goroutines is defined by the
As writes occur in InfluxDB, each subscription writer sends the written data to the
specified subscription endpoints.
However, with a high
write-concurrency (multiple writers) and a high ingest rate,
nanosecond differences in writer processes and the transport layer can result
in writes being received out of order.
Important information about high write loads
While setting the subscriber
write-concurrencyto greater than 1 does increase your subscriber write throughput, it can result in out-of-order writes under high ingest rates. Setting
write-concurrencyto 1 ensures writes are passed to subscriber endpoints sequentially, but can create a bottleneck under high ingest rates.
write-concurrencyshould be set to depends on your specific workload and need for in-order writes to your subscription endpoint.
InfluxQL subscription statements
Use the following InfluxQL statements to manage subscriptions:
Create subscriptions using the
CREATE SUBSCRIPTION InfluxQL statement.
Specify the subscription name, the database name and retention policy to subscribe to,
and the URL of the host to which data written to InfluxDB should be copied.
-- Pattern: CREATE SUBSCRIPTION "<subscription_name>" ON "<db_name>"."<retention_policy>" DESTINATIONS <ALL|ANY> "<subscription_endpoint_host>" -- Examples: -- Create a SUBSCRIPTION on database 'mydb' and retention policy 'autogen' that send data to 'example.com:9090' via HTTP. CREATE SUBSCRIPTION "sub0" ON "mydb"."autogen" DESTINATIONS ALL 'http://example.com:9090' -- Create a SUBSCRIPTION on database 'mydb' and retention policy 'autogen' that round-robins the data to 'h1.example.com:9090' and 'h2.example.com:9090' via UDP. CREATE SUBSCRIPTION "sub0" ON "mydb"."autogen" DESTINATIONS ANY 'udp://h1.example.com:9090', 'udp://h2.example.com:9090'
Sending subscription data to multiple hosts
CREATE SUBSCRIPTION statement allows you to specify multiple hosts as endpoints for the subscription.
DESTINATIONS clause, you can pass multiple host strings separated by commas.
ANY in the
DESTINATIONS clause determines how InfluxDB writes data to each endpoint:
ALL: Writes data to all specified hosts.
ANY: Round-robins writes between specified hosts.
Subscriptions with multiple hosts
-- Write all data to multiple hosts CREATE SUBSCRIPTION "mysub" ON "mydb"."autogen" DESTINATIONS ALL 'http://host1.example.com:9090', 'http://host2.example.com:9090' -- Round-robin writes between multiple hosts CREATE SUBSCRIPTION "mysub" ON "mydb"."autogen" DESTINATIONS ANY 'http://host1.example.com:9090', 'http://host2.example.com:9090'
Subscriptions can use HTTP, HTTPS, or UDP transport protocols.
Which to use is determined by the protocol expected by the subscription endpoint.
If creating a Kapacitor subscription, this is defined by the
option in the
[[influxdb]] section of your
[[influxdb]] # ... subscription-protocol = "http" # ...
For information regarding HTTPS connections and secure communication between InfluxDB and Kapacitor, view the Kapacitor security documentation.
SHOW SUBSCRIPTIONS InfluxQL statement returns a list of all subscriptions registered in InfluxDB.
name: _internal retention_policy name mode destinations ---------------- ---- ---- ------------ monitor kapacitor-39545771-7b64-4692-ab8f-1796c07f3314 ANY [http://localhost:9092]
Remove or drop subscriptions using the
DROP SUBSCRIPTION InfluxQL statement.
-- Pattern: DROP SUBSCRIPTION "<subscription_name>" ON "<db_name>"."<retention_policy>" -- Example: DROP SUBSCRIPTION "sub0" ON "mydb"."autogen"
Drop all subscriptions
In some cases, it may be necessary to remove all subscriptions.
Run the following bash script that utilizes the
influx CLI, loops through all subscriptions, and removes them.
This script depends on the
$INFLUXPASS environment variables.
If these are not set, export them as part of the script.
# Environment variable exports: # Uncomment these if INFLUXUSER and INFLUXPASS are not already globally set. # export INFLUXUSER=influxdb-username # export INFLUXPASS=influxdb-password IFS=$'\n'; for i in $(influx -format csv -username $INFLUXUSER -password $INFLUXPASS -database _internal -execute 'show subscriptions' | tail -n +2 | grep -v name); do influx -format csv -username $INFLUXUSER -password $INFLUXPASS -database _internal -execute "drop subscription \"$(echo "$i" | cut -f 3 -d ',')\" ON \"$(echo "$i" | cut -f 1 -d ',')\".\"$(echo "$i" | cut -f 2 -d ',')\""; done
Configure InfluxDB subscriptions
InfluxDB subscription configuration options are available in the
section of the
In order to use subcriptions, the
enabled option in the
[subscriber] section must be set to
Below is an example
influxdb.conf subscriber configuration:
[subscriber] enabled = true http-timeout = "30s" insecure-skip-verify = false ca-certs = "" write-concurrency = 40 write-buffer-size = 1000
[subscriber] configuration options are available in the Configuring InfluxDB documentation.
Inaccessible or decommissioned subscription endpoints
Unless a subscription is dropped, InfluxDB assumes the endpoint should always receive data and will continue to attempt to send data. If an endpoint host is inaccessible or has been decommissioned, you will see errors similar to the following:
# Some message content omitted (...) for the sake of brevity "Post http://x.y.z.a:9092/write?consistency=...: net/http: request canceled while waiting for connection (Client.Timeout exceeded while awaiting headers)" ... service=subscriber "Post http://x.y.z.a:9092/write?consistency=...: dial tcp x.y.z.a:9092: getsockopt: connection refused" ... service=subscriber "Post http://x.y.z.a:9092/write?consistency=...: dial tcp 172.31.36.5:9092: getsockopt: no route to host" ... service=subscriber
In some cases, this may be caused by a networking error or something similar preventing a successful connection to the subscription endpoint. In other cases, it’s because the subscription endpoint no longer exists and the subscription hasn’t been dropped from InfluxDB.
Because InfluxDB does not know if a subscription endpoint will or will not become accessible again, subscriptions are not automatically dropped when an endpoint becomes inaccessible. If a subscription endpoint is removed, you must manually drop the subscription from InfluxDB.