docs/content/docs/learn-flink/table_api.md
The focus of this training is to broadly cover the Table API well enough that you will be able to get started writing streaming analytics applications.
The Table API is Flink's declarative, relational API. You describe what you want to compute, and Flink figures out how to compute it efficiently. The same queries work on both batch and streaming data without modification, and Flink's optimizer automatically selects efficient execution plans.
The Table API works with structured data that has a defined schema. Every table has named columns with specific data types.
Flink supports a rich set of data types for table columns:
STRING, INT, BIGINT, DOUBLE, BOOLEAN, TIMESTAMPARRAY, MAP, ROW (for nested structures)RAW (for opaque byte data in UDFs), INTERVAL, NULLTables can be created from [external systems]({{< ref "docs/connectors/table/overview" >}}) (Kafka, files, databases), from DataStreams, or inline using values.
{{< top >}}
This example takes a table of records about people as input and filters it to only include adults.
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.*;
public class Example {
public static void main(String[] args) {
TableEnvironment tableEnv = TableEnvironment.create(
EnvironmentSettings.inStreamingMode());
Table people = tableEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("age", DataTypes.INT())
),
Row.of("Alice", 35),
Row.of("Bob", 35),
Row.of("Charlie", 2)
);
Table adults = people
.filter($("age").isGreaterOrEqual(18))
.select($("name"), $("age"));
adults.execute().print();
}
}
The output will look something like this:
+----+--------------------------------+-------------+
| op | name | age |
+----+--------------------------------+-------------+
| +I | Alice | 35 |
| +I | Bob | 35 |
+----+--------------------------------+-------------+
The op column shows the operation type (+I means INSERT). The +I flag indicates that
these rows are being inserted into the result table. The section on changelogs below explains
these flags in more detail.
{{< top >}}
Every Table API program needs a TableEnvironment. This is the central entry point for:
// For streaming applications
TableEnvironment tableEnv = TableEnvironment.create(
EnvironmentSettings.inStreamingMode());
// For batch applications
TableEnvironment tableEnv = TableEnvironment.create(
EnvironmentSettings.inBatchMode());
When you call execute() on a table, Flink compiles your table program into an optimized job
graph and submits it for execution.
{{< top >}}
Use select() to choose which columns to include and to create new computed columns.
The $() function references columns by name.
Table result = orders
.select($("product"), $("amount"), $("price").times($("amount")).as("total"));
Use filter() or where() (they are equivalent) to keep only rows matching a condition.
Table filtered = orders
.filter($("amount").isGreater(0))
.filter($("status").isNotEqual("cancelled"));
Use addColumns() to add new computed columns, renameColumns() to rename existing ones,
and dropColumns() to remove columns.
Table result = orders
.addColumns($("price").times($("amount")).as("total"))
.renameColumns($("product").as("item"))
.dropColumns($("internal_id"));
The Table API includes many built-in functions for common operations:
import static org.apache.flink.table.api.Expressions.*;
Table result = products
.select(
$("name").upperCase(), // String functions
$("price").round(2), // Math functions
$("created_at").extract(TimeIntervalUnit.HOUR) // Temporal functions
);
For a complete list of built-in functions, see the [Built-in Functions]({{< ref "docs/dev/table/functions/systemFunctions" >}}) reference.
{{< top >}}
Use groupBy() with aggregate functions to compute summary statistics.
Table counts = orders
.groupBy($("product"))
.select($("product"), $("amount").sum().as("total_sold"), $("product").count().as("order_count"));
Common aggregate functions include sum(), count(), avg(), min(), max().
In streaming mode, aggregations produce updating results. Each time a new row arrives for a group, Flink updates the aggregate and emits a new result for that group.
{{< top >}}
One of the most important concepts to understand when using the Table API for streaming is stream-table duality: a stream can be viewed as a table (with rows being inserted over time), and a table can be viewed as a changelog stream (each change to the table is an event).
Simple operations like filter() and select() produce append-only tables. New rows are
inserted, but existing rows are never modified or deleted. These produce only +I (INSERT) flags.
Aggregations and other stateful operations produce updating tables. When the aggregated result for a group changes, Flink must retract the old value and insert the new one.
The changelog flags you may see are:
| Flag | Meaning |
|---|---|
+I | INSERT: A new row is added |
-D | DELETE: An existing row is removed |
-U | UPDATE_BEFORE: The old value before an update (retraction) |
+U | UPDATE_AFTER: The new value after an update |
For example, if you count orders by product and three orders for "widget" arrive:
+I [widget, 1] -- First order: count is 1
-U [widget, 1] -- Retract previous count
+U [widget, 2] -- Second order: count is now 2
-U [widget, 2] -- Retract previous count
+U [widget, 3] -- Third order: count is now 3
This matters when connecting to sinks. Append-only sinks (like files) can only accept +I
operations. Updating sinks (like databases or Kafka upsert topics) can handle all operations.
{{< top >}}
Unbounded aggregations keep state forever, which is not practical for many streaming use cases. Windows let you aggregate over bounded portions of the stream.
Here's an example using a tumbling window to count orders per hour:
import static org.apache.flink.table.api.Expressions.*;
Table hourlyStats = orders
.window(Tumble.over(lit(1).hours()).on($("order_time")).as("w"))
.groupBy($("product"), $("w"))
.select(
$("product"),
$("w").start().as("window_start"),
$("w").end().as("window_end"),
$("amount").sum().as("total_sold")
);
Windowed aggregations produce append-only results because each window produces a final result once and never updates it. This makes them ideal for sinks that don't support updates.
For more on windows, see the [Group Aggregation]({{< ref "docs/dev/table/tableApi" >}}#group-windows) documentation.
{{< top >}}
When built-in functions don't meet your needs, you can create user-defined functions (UDFs).
A ScalarFunction takes one row and produces one value.
import org.apache.flink.table.functions.ScalarFunction;
public class HashFunction extends ScalarFunction {
public String eval(String input) {
return Integer.toHexString(input.hashCode());
}
}
// Register and use
tableEnv.createTemporaryFunction("hash", HashFunction.class);
Table result = orders.select($("id"), call("hash", $("customer_name")).as("hashed_name"));
A TableFunction takes one row and produces zero or more rows. Use joinLateral() to apply it.
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
public class SplitFunction extends TableFunction<Row> {
public void eval(String str) {
for (String s : str.split(",")) {
collect(Row.of(s.trim()));
}
}
}
// Use with joinLateral
tableEnv.createTemporaryFunction("split", SplitFunction.class);
Table result = orders.joinLateral(call("split", $("tags")).as("tag"));
For more details on UDFs, see the [User-defined Functions]({{< ref "docs/dev/table/functions/udfs" >}}) documentation.
{{< top >}}
When declarative operations aren't enough, Process Table Functions (PTFs) provide full control
over processing. PTFs are the Table API's "escape hatch" for complex logic, similar to how
ProcessFunction works in the DataStream API.
PTFs can:
Here's a simple stateful PTF that counts occurrences per key:
import org.apache.flink.table.annotation.*;
import org.apache.flink.table.functions.ProcessTableFunction;
import org.apache.flink.types.Row;
public class CountingFunction extends ProcessTableFunction<String> {
public static class CountState {
public long count = 0L;
}
public void eval(
@StateHint CountState state,
@ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row input) {
state.count++;
collect("Seen " + state.count + " rows for this partition");
}
}
// Use with partitionBy for per-key state
Table result = events
.partitionBy($("user_id"))
.process(CountingFunction.class);
PTFs bridge the gap between the declarative Table API and low-level stream processing. Use them when you need fine-grained control over state, timers, or event-by-event processing logic that cannot be expressed with standard operations.
For the complete PTF guide, see [Process Table Functions]({{< ref "docs/dev/table/functions/ptfs" >}}).
{{< top >}}
At this point you know enough to get started coding and running a simple Table API application. Clone the {{< training_repo >}}, and after following the instructions in the README, try the Table API exercises.
{{< top >}}
{{< top >}}