---
title: Extend plugins with API features and state management
description: The Processing engine includes an API that allows your plugins to interact with your data, build and write line protocol, and maintain state between executions.
url: https://docs.influxdata.com/influxdb3/enterprise/plugins/extend-plugin/
estimated_tokens: 11695
product: InfluxDB 3 Enterprise
version: enterprise
---

# Extend plugins with API features and state management

The Processing Engine includes a shared API that your plugins can use to interact with data, write new records in line protocol format, and maintain state between executions. These capabilities let you build plugins that transform, analyze, and respond to time series data as it flows through your database.

The plugin API lets you:

-   [Write data](#write-data)
-   [Query data](#query-data)
-   [Log messages for monitoring and debugging](#log-messages-for-monitoring-and-debugging)
-   [Maintain state with the in-memory cache](#maintain-state-with-in-memory-cache)
    -   [Store and retrieve cached data](#store-and-retrieve-cached-data)
    -   [Use TTL appropriately](#use-ttl-appropriately)
    -   [Share data across plugins](#share-data-across-plugins)
    -   [Build a counter](#building-a-counter)
-   [Guidelines for in-memory caching](#guidelines-for-in-memory-caching)
    -   [Consider cache limitations](#consider-cache-limitations)

## Get started with the shared API

Each plugin automatically has access to the shared API through the `influxdb3_local` object. You don’t need to import any libraries. The API becomes available as soon as your plugin runs.

If your plugin requires third-party Python packages (like `pandas`, `requests`, or `influxdb3-python`), see [Manage plugin dependencies](/influxdb3/enterprise/plugins/#manage-plugin-dependencies) for installation instructions.

## Write data

To write data into your database, use the `LineBuilder` API to create line protocol data:

```python
# Create a line protocol entry
line = LineBuilder("weather")
line.tag("location", "us-midwest")
line.float64_field("temperature", 82.5)
line.time_ns(1627680000000000000)

# Write the data to the database
influxdb3_local.write(line)
```

InfluxDB 3 buffers your writes while the plugin runs and flushes them when the plugin completes.

[](#view-the-linebuilder-python-implementation)

View the `LineBuilder` Python implementation

```python
from typing import Optional
from collections import OrderedDict

class InfluxDBError(Exception):
    """Base exception for InfluxDB-related errors"""
    pass

class InvalidMeasurementError(InfluxDBError):
    """Raised when measurement name is invalid"""
    pass

class InvalidKeyError(InfluxDBError):
    """Raised when a tag or field key is invalid"""
    pass

class InvalidLineError(InfluxDBError):
    """Raised when a line protocol string is invalid"""
    pass

class LineBuilder:
    def __init__(self, measurement: str):
        if ' ' in measurement:
            raise InvalidMeasurementError("Measurement name cannot contain spaces")
        self.measurement = measurement
        self.tags: OrderedDict[str, str] = OrderedDict()
        self.fields: OrderedDict[str, str] = OrderedDict()
        self._timestamp_ns: Optional[int] = None

    def _validate_key(self, key: str, key_type: str) -> None:
        """Validate that a key does not contain spaces, commas, or equals signs."""
        if not key:
            raise InvalidKeyError(f"{key_type} key cannot be empty")
        if ' ' in key:
            raise InvalidKeyError(f"{key_type} key '{key}' cannot contain spaces")
        if ',' in key:
            raise InvalidKeyError(f"{key_type} key '{key}' cannot contain commas")
        if '=' in key:
            raise InvalidKeyError(f"{key_type} key '{key}' cannot contain equals signs")

    def tag(self, key: str, value: str) -> 'LineBuilder':
        """Add a tag to the line protocol."""
        self._validate_key(key, "tag")
        self.tags[key] = str(value)
        return self

    def uint64_field(self, key: str, value: int) -> 'LineBuilder':
        """Add an unsigned integer field to the line protocol."""
        self._validate_key(key, "field")
        if value < 0:
            raise ValueError(f"uint64 field '{key}' cannot be negative")
        self.fields[key] = f"{value}u"
        return self

    def int64_field(self, key: str, value: int) -> 'LineBuilder':
        """Add an integer field to the line protocol."""
        self._validate_key(key, "field")
        self.fields[key] = f"{value}i"
        return self

    def float64_field(self, key: str, value: float) -> 'LineBuilder':
        """Add a float field to the line protocol."""
        self._validate_key(key, "field")
        # Check if value has no decimal component
        self.fields[key] = f"{int(value)}.0" if value % 1 == 0 else str(value)
        return self

    def string_field(self, key: str, value: str) -> 'LineBuilder':
        """Add a string field to the line protocol."""
        self._validate_key(key, "field")
        # Escape quotes and backslashes in string values
        escaped_value = value.replace('"', '\\"').replace('\\', '\\\\')
        self.fields[key] = f'"{escaped_value}"'
        return self

    def bool_field(self, key: str, value: bool) -> 'LineBuilder':
        """Add a boolean field to the line protocol."""
        self._validate_key(key, "field")
        self.fields[key] = 't' if value else 'f'
        return self

    def time_ns(self, timestamp_ns: int) -> 'LineBuilder':
        """Set the timestamp in nanoseconds."""
        self._timestamp_ns = timestamp_ns
        return self

    def build(self) -> str:
        """Build the line protocol string."""
        # Start with measurement name (escape commas only)
        line = self.measurement.replace(',', '\\,')

        # Add tags if present
        if self.tags:
            tags_str = ','.join(
                f"{k}={v}" for k, v in self.tags.items()
            )
            line += f",{tags_str}"

        # Add fields (required)
        if not self.fields:
            raise InvalidLineError(f"At least one field is required: {line}")

        fields_str = ','.join(
            f"{k}={v}" for k, v in self.fields.items()
        )
        line += f" {fields_str}"

        # Add timestamp if present
        if self._timestamp_ns is not None:
            line += f" {self._timestamp_ns}"

        return line
```

## Query data

Your plugins can execute SQL queries and process results directly:

```python
# Simple query
results = influxdb3_local.query("SELECT * FROM metrics WHERE time > now() - INTERVAL '1 hour'")

# Parameterized query for safer execution
params = {"table": "metrics", "threshold": 90}
results = influxdb3_local.query("SELECT * FROM $table WHERE value > $threshold", params)
```

Query results are a `List` of `Dict[String, Any]`, where each dictionary represents a row. Column names are keys, and column values are the corresponding values.

## Log messages for monitoring and debugging

Use the shared API’s `info`, `warn`, and `error` functions to log messages from your plugin. Each function accepts one or more arguments, converts them to strings, and logs them as a space-separated message.

Add logging to monitor plugin execution and assist with debugging:

```python
influxdb3_local.info("Starting data processing")
influxdb3_local.warn("Could not process some records")
influxdb3_local.error("Failed to connect to external API")

# Log structured data
obj_to_log = {"records": 157, "errors": 3}
influxdb3_local.info("Processing complete", obj_to_log)
```

The system writes all log messages to the server logs and stores them in [system tables](/influxdb3/enterprise/reference/cli/influxdb3/show/system/summary/), where you can query them using SQL.

## Maintain state with the in-memory cache

The Processing Engine provides an in-memory cache that enables your plugins to persist and retrieve data between executions.

Access the cache using the `cache` property of the shared API:

```python
# Basic usage pattern  
influxdb3_local.cache.METHOD(PARAMETERS)
```

`cache` provides the following methods to retrieve and manage cached values:

| Method | Parameters | Returns | Description |
| --- | --- | --- | --- |
| put | key (str): The key to store the value undervalue (Any): Any Python object to cachettl (Optional[float], default=None): Time in seconds before expirationuse_global (bool, default=False): If True, uses global namespace | None | Stores a value in the cache with an optional time-to-live |
| get | key (str): The key to retrievedefault (Any, default=None): Value to return if key not founduse_global (bool, default=False): If True, uses global namespace | Any | Retrieves a value from the cache or returns default if not found |
| delete | key (str): The key to deleteuse_global (bool, default=False): If True, uses global namespace | bool | Deletes a value from the cache. Returns True if deleted, False if not found |

### Understanding cache namespaces

The cache system offers two distinct namespaces:

| Namespace | Scope | Best For |
| --- | --- | --- |
| Trigger-specific (default) | Isolated to a single trigger | Plugin state, counters, timestamps specific to one plugin |
| Global | Shared across all triggers | Configuration, lookup tables, service states that should be available to all plugins |

### Common cache operations

-   [Store and retrieve cached data](#store-and-retrieve-cached-data)
-   [Store cached data with expiration](#store-cached-data-with-expiration)
-   [Share data across plugins](#share-data-across-plugins)
-   [Build a counter](#build-a-counter)

### Store and retrieve cached data

```python
# Store a value
influxdb3_local.cache.put("last_run_time", time.time())

# Retrieve a value with a default if not found
last_time = influxdb3_local.cache.get("last_run_time", default=0)

# Delete a cached value
influxdb3_local.cache.delete("temporary_data")
```

### Store cached data with expiration

```python
# Cache with a 5-minute TTL (time-to-live)
influxdb3_local.cache.put("api_response", response_data, ttl=300)
```

### Share data across plugins

```python
# Store in the global namespace
influxdb3_local.cache.put("config", {"version": "1.0"}, use_global=True)

# Retrieve from the global namespace
config = influxdb3_local.cache.get("config", use_global=True)
```

### Building a counter

You can track how many times a plugin has run:

```python
# Get current counter or default to 0
counter = influxdb3_local.cache.get("execution_count", default=0)

# Increment counter
counter += 1

# Store the updated value
influxdb3_local.cache.put("execution_count", counter)

influxdb3_local.info(f"This plugin has run {counter} times")
```

## Guidelines for in-memory caching

To get the most out of the in-memory cache, follow these guidelines:

-   [Use the trigger-specific namespace](#use-the-trigger-specific-namespace)
-   [Use TTL appropriately](#use-ttl-appropriately)
-   [Cache computation results](#cache-computation-results)
-   [Warm the cache](#warm-the-cache)
-   [Consider cache limitations](#consider-cache-limitations)

### Use the trigger-specific namespace

The Processing Engine provides a cache that supports stateful operations while maintaining isolation between different triggers. For most use cases, use the trigger-specific namespace to keep plugin state isolated. Use the global namespace only when you need to share data across triggers.

### Use TTL appropriately

Set appropriate expiration times based on how frequently your data changes:

```python
# Cache external API responses for 5 minutes  
influxdb3_local.cache.put("weather_data", api_response, ttl=300)
```

### Cache computation results

Store the results of expensive calculations that you frequently utilize:

```python
# Cache aggregated statistics  
influxdb3_local.cache.put("daily_stats", calculate_statistics(data), ttl=3600)
```

### Warm the cache

For critical data, prime the cache at startup. This can be especially useful for global namespace data where multiple triggers need the data:

```python
# Check if cache needs to be initialized  
if not influxdb3_local.cache.get("lookup_table"):   
    influxdb3_local.cache.put("lookup_table", load_lookup_data())
```

### Consider cache limitations

-   **Memory Usage**: Since the system stores cache contents in memory, monitor your memory usage when caching large datasets.
-   **Server Restarts**: Because the server clears the cache on restart, design your plugins to handle cache initialization (as noted above).
-   **Concurrency**: Be cautious of accessing inaccurate or out-of-date data when multiple trigger instances might simultaneously update the same cache key.

## Next Steps

With an understanding of the InfluxDB 3 Shared Plugin API, you can start building data workflows that transform, analyze, and respond to your time series data.

For official plugins and examples that you can extend, see the [plugin library](/influxdb3/enterprise/plugins/library/).

#### Related

-   [influxdb3 create trigger](/influxdb3/enterprise/reference/cli/influxdb3/create/trigger/)
-   [influxdb3 test](/influxdb3/enterprise/reference/cli/influxdb3/test/)
-   [Processing engine reference](/influxdb3/enterprise/reference/processing-engine/)

[processing engine](/influxdb3/enterprise/tags/processing-engine/) [plugins](/influxdb3/enterprise/tags/plugins/) [API](/influxdb3/enterprise/tags/api/) [python](/influxdb3/enterprise/tags/python/)
