content/influxdb3/cloud-dedicated/process-data/downsample/downsample-client-libraries.md
Query and downsample time series data stored in InfluxDB and write the downsampled data back to InfluxDB.
This guide uses Python and the InfluxDB 3 Python client library, but you can use your runtime of choice and any of the available InfluxDB 3 client libraries. This guide also assumes you have already setup your Python project and virtual environment.
Use pip to install the following dependencies:
influxdb_client_3pandaspip install influxdb3-python pandas
The downsampling process involves two InfluxDB databases. Each database has a retention period that specifies how long data persists in the database before it expires and is deleted. By using two databases, you can store unmodified, high-resolution data in a database with a shorter retention period and then downsampled, low-resolution data in a database with a longer retention period.
Ensure you have a database for each of the following:
For information about creating databases, see Create a database.
Use the InfluxDBClient3 function in the influxdb_client_3 module to
instantiate two InfluxDB clients:
Provide the following credentials for each client:
{{% code-placeholders "((RAW_|DOWNSAMPLED_)*DATABASE)_(NAME|TOKEN)" %}}
from influxdb_client_3 import InfluxDBClient3
import pandas
# Instantiate an InfluxDBClient3 client configured for your unmodified database
influxdb_raw = InfluxDBClient3(
host='{{< influxdb/host >}}',
token='DATABASE_TOKEN',
database='RAW_DATABASE_NAME'
)
# Instantiate an InfluxDBClient3 client configured for your downsampled database.
# When writing, the org= argument is required by the client (but ignored by InfluxDB).
influxdb_downsampled = InfluxDBClient3(
host='{{< influxdb/host >}}',
token='DATABASE_TOKEN',
database='DOWNSAMPLED_DATABASE_NAME',
org=''
)
{{% /code-placeholders %}}
The most common method used to downsample time series data is to perform aggregate or selector operations on intervals of time. For example, return the average value for each hour in the queried time range.
Use either SQL or InfluxQL to downsample data by applying aggregate or selector functions to time intervals.
{{< tabs-wrapper >}} {{% tabs "medium" %}} SQL InfluxQL {{% /tabs %}}
<!--------------------------------- BEGIN SQL --------------------------------->{{% tab-content %}}
In the SELECT clause:
DATE_BIN
to assign each row to an interval based on the row's timestamp and update
the time column with the assigned interval timestamp.
You can also use DATE_BIN_GAPFILL
to fill any gaps created by intervals with no data
(see Fill gaps in data with SQL).Include a GROUP BY clause that groups by intervals returned from the DATE_BIN
function in your SELECT clause and any other queried tags.
The example below uses GROUP BY 1 to group by the first column in the
SELECT clause.
Include an ORDER BY clause that sorts data by time.
For more information, see Aggregate data with SQL - Downsample data by applying interval-based aggregates.
SELECT
DATE_BIN(INTERVAL '1 hour', time) AS time,
room,
AVG(temp) AS temp,
AVG(hum) AS hum,
AVG(co) AS co
FROM home
--In WHERE, time refers to <source_table>.time
WHERE time >= now() - INTERVAL '24 hours'
--1 refers to the DATE_BIN column
GROUP BY 1, room
ORDER BY time
{{% /tab-content %}}
<!---------------------------------- END SQL ----------------------------------> <!------------------------------- BEGIN INFLUXQL ------------------------------>{{% tab-content %}}
In the SELECT clause, apply an
aggregate
or selector
function to queried fields.
Include a GROUP BY clause that groups by time() at a specified interval.
SELECT
MEAN(temp) AS temp,
MEAN(hum) AS hum,
MEAN(co) AS co
FROM home
WHERE time >= now() - 24h
GROUP BY time(1h)
{{% /tab-content %}}
<!-------------------------------- END INFLUXQL ------------------------------->{{< /tabs-wrapper >}}
Assign the query string to a variable.
Use the query method of your instantiated client
to query raw data from InfluxDB. Provide the following arguments.
sql or influxqlUse the to_pandas method to convert the returned Arrow table to a Pandas DataFrame.
{{< code-tabs-wrapper >}} {{% code-tabs %}} SQL InfluxQL {{% /code-tabs %}}
<!--------------------------------- BEGIN SQL --------------------------------->{{% code-tab-content %}}
# ...
query = '''
SELECT
DATE_BIN(INTERVAL '1 hour', time) AS time,
room,
AVG(temp) AS temp,
AVG(hum) AS hum,
AVG(co) AS co
FROM home
--In WHERE, time refers to <source_table>.time
WHERE time >= now() - INTERVAL '24 hours'
--1 refers to the DATE_BIN column
GROUP BY 1, room
ORDER BY 1
'''
table = influxdb_raw.query(query=query, language="sql")
data_frame = table.to_pandas()
{{% /code-tab-content %}}
<!---------------------------------- END SQL ----------------------------------> <!------------------------------- BEGIN INFLUXQL ------------------------------>{{% code-tab-content %}}
# ...
query = '''
SELECT
MEAN(temp) AS temp,
MEAN(hum) AS hum,
MEAN(co) AS co
FROM home
WHERE time >= now() - 24h
GROUP BY time(1h)
'''
table = influxdb_raw.query(query=query, language="influxql")
data_frame = table.to_pandas()
{{% /code-tab-content %}}
<!-------------------------------- END INFLUXQL ------------------------------->\{{< /code-tabs-wrapper >}}
For InfluxQL query results, delete (drop) the iox::measurement column before writing data back to InfluxDB.
You'll avoid measurement name conflicts when querying your downsampled data later.
Use the sort_values method to sort data in the Pandas DataFrame by time
to ensure writing back to InfluxDB is as performant as possible.
Use the write method of your instantiated downsampled client
to write the query results back to your InfluxDB database for downsampled data.
Include the following arguments:
[!Note] Columns not listed in the data_frame_tag_columns or data_frame_timestamp_column arguments are written to InfluxDB as fields.
# ...
data_frame = data_frame.sort_values(by="time")
influxdb_downsampled.write(
record=data_frame,
data_frame_measurement_name="home_ds",
data_frame_timestamp_column="time",
data_frame_tag_columns=['room']
)
{{< code-tabs-wrapper >}} {{% code-tabs %}} SQL InfluxQL {{% /code-tabs %}}
<!--------------------------------- BEGIN SQL --------------------------------->{{% code-tab-content %}}
{{% code-placeholders "((RAW_|DOWNSAMPLED_)*DATABASE)_(NAME|TOKEN)" %}}
from influxdb_client_3 import InfluxDBClient3
import pandas
influxdb_raw = InfluxDBClient3(
host='{{< influxdb/host >}}',
token='DATABASE_TOKEN',
database='RAW_DATABASE_NAME'
)
# When writing, the org= argument is required by the client (but ignored by InfluxDB).
influxdb_downsampled = InfluxDBClient3(
host='{{< influxdb/host >}}',
token='DATABASE_TOKEN',
database='DOWNSAMPLED_DATABASE_NAME',
org=''
)
query = '''
SELECT
DATE_BIN(INTERVAL '1 hour', time) AS time,
room,
AVG(temp) AS temp,
AVG(hum) AS hum,
AVG(co) AS co
FROM home
--In WHERE, time refers to <source_table>.time
WHERE time >= now() - INTERVAL '24 hours'
--1 refers to the DATE_BIN column
GROUP BY 1, room
ORDER BY 1
'''
table = influxdb_raw.query(query=query, language="sql")
data_frame = table.to_pandas()
data_frame = data_frame.sort_values(by="time")
influxdb_downsampled.write(
record=data_frame,
data_frame_measurement_name="home_ds",
data_frame_timestamp_column="time",
data_frame_tag_columns=['room']
)
{{% /code-placeholders %}}
{{% /code-tab-content %}}
<!---------------------------------- END SQL ----------------------------------> <!------------------------------- BEGIN INFLUXQL ------------------------------>{{% code-tab-content %}}
{{% code-placeholders "((RAW_|DOWNSAMPLED_)*DATABASE)_(NAME|TOKEN)" %}}
from influxdb_client_3 import InfluxDBClient3
import pandas
influxdb_raw = InfluxDBClient3(
host='{{< influxdb/host >}}',
token='DATABASE_TOKEN',
database='RAW_DATABASE_NAME'
)
# When writing, the org= argument is required by the client (but ignored by InfluxDB).
influxdb_downsampled = InfluxDBClient3(
host='{{< influxdb/host >}}',
token='DATABASE_TOKEN',
database='DOWNSAMPLED_DATABASE_NAME',
org=''
)
query = '''
SELECT
MEAN(temp) AS temp,
MEAN(hum) AS hum,
MEAN(co) AS co
FROM home
WHERE time >= now() - 24h
GROUP BY time(1h)
'''
# To prevent naming conflicts when querying downsampled data,
# drop the iox::measurement column before writing the data
# with the new measurement.
data_frame = data_frame.drop(columns=['iox::measurement'])
table = influxdb_raw.query(query=query, language="influxql")
data_frame = table.to_pandas()
data_frame = data_frame.sort_values(by="time")
influxdb_downsampled.write(
record=data_frame,
data_frame_measurement_name="home_ds",
data_frame_timestamp_column="time",
data_frame_tag_columns=['room']
)
{{% /code-placeholders %}}
{{% /code-tab-content %}}
<!-------------------------------- END INFLUXQL ------------------------------->{{< /code-tabs-wrapper >}}