Back to Flink

Temporal Table Function

docs/content.zh/docs/concepts/sql-table-concepts/temporal_table_function.md

0.4-rc14.4 KB
Original Source
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -->

Temporal Table Function

A Temporal table function provides access to the version of a temporal table at a specific point in time. In order to access the data in a temporal table, one must pass a [time attribute]({{<ref "docs/concepts/sql-table-concepts/time_attributes">}}) that determines the version of the table that will be returned. Flink uses the SQL syntax of [table functions]({{<ref "docs/dev/table/functions/udfs" >}}#table-functions) to provide a way to express it.

Unlike a versioned table, temporal table functions can only be defined on top of append-only streams — it does not support changelog inputs. Additionally, a temporal table function cannot be defined in pure SQL DDL.

Defining a Temporal Table Function

Temporal table functions can be defined on top of append-only streams using the [Table API]({{< ref "docs/dev/table/tableApi" >}}). The table is registered with one or more key columns, and a time attribute used for versioning.

Suppose we have an append-only table of currency rates that we would like to register as a temporal table function.

sql
SELECT * FROM currency_rates;

update_time   currency   rate
============= =========  ====
09:00:00      Yen        102
09:00:00      Euro       114
09:00:00      USD        1
11:15:00      Euro       119
11:49:00      Pounds     108

Using the Table API, we can register this stream using currency for the key and update_time as the versioning time attribute.

{{< tabs "066b6695-5bc3-4d7a-9033-ff6b1d14b3a1" >}} {{< tab "Java" >}}

java
TemporalTableFunction rates = tEnv
    .from("currency_rates")
    .createTemporalTableFunction("update_time", "currency");
 
tEnv.createTemporarySystemFunction("rates", rates);                                                           

{{< /tab >}} {{< tab "Scala" >}}

scala
rates = tEnv
    .from("currency_rates")
    .createTemporalTableFunction("update_time", "currency")
 
tEnv.createTemporarySystemFunction("rates", rates);                                                           

{{< /tab >}} {{< tab "Python" >}}

python
Still not supported in Python Table API.

{{< /tab >}} {{< /tabs >}}

Temporal Table Function Join

Once defined, a temporal table function is used as a standard [table function]({{< ref "docs/dev/table/functions/udfs" >}}#table-functions). Append-only tables (left input/probe side) can join with a temporal table (right input/build side), i.e., a table that changes over time and tracks its changes, to retrieve the value for a key as it was at a particular point in time.

Consider an append-only table orders that tracks customers' orders in different currencies.

sql
SELECT * FROM orders;

order_time amount currency
========== ====== =========
10:15        2    Euro
10:30        1    USD
10:32       50    Yen
10:52        3    Euro
11:04        5    USD

Given these tables, we would like to convert orders to a common currency — USD.

{{< tabs "7ec4efc6-41ae-42c1-a261-4a94dd3b44e0" >}} {{< tab "SQL" >}}

sql
SELECT
  SUM(amount * rate) AS amount
FROM
  orders,
  LATERAL TABLE (rates(order_time))
WHERE
  rates.currency = orders.currency

{{< /tab >}} {{< tab "Java" >}}

java
Table result = orders
    .joinLateral(call("rates", $("o_proctime")), $("o_currency").isEqual($("r_currency")))
    .select($("(o_amount").times($("r_rate")).sum().as("amount"));

{{< /tab >}} {{< tab "Scala" >}}

scala
val result = orders
    .joinLateral($"rates(order_time)", $"orders.currency = rates.currency")
    .select($"(o_amount * r_rate).sum as amount"))

{{< /tab >}} {{< tab "Python" >}}

python
Still not supported in Python API.

{{< /tab >}} {{< /tabs >}}

{{< top >}}