Back to Flink

Row-based Operations

flink-python/docs/user_guide/table/operations/row_based_operations.rst

0.4-rc112.0 KB
Original Source

.. Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

 http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Row-based Operations

This page describes how to use row-based operations in PyFlink Table API.

Map


Performs a ``map`` operation with a python :doc:`general scalar function <../udfs>` or
:doc:`vectorized scalar function <../udfs>`. The output will be flattened if the
output type is a composite type.

.. code:: python

   from pyflink.common import Row
   from pyflink.table import EnvironmentSettings, TableEnvironment
   from pyflink.table.expressions import col
   from pyflink.table.udf import udf

   env_settings = EnvironmentSettings.in_batch_mode()
   table_env = TableEnvironment.create(env_settings)

   table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])

   @udf(result_type='ROW<id BIGINT, data STRING>')
   def func1(id: int, data: str) -> Row:
       return Row(id, data * 2)

   # the input columns are specified as the inputs
   table.map(func1(col('id'), col('data'))).execute().print()
   # result is
   #+----------------------+--------------------------------+
   #|                   id |                           data |
   #+----------------------+--------------------------------+
   #|                    1 |                           HiHi |
   #|                    2 |                     HelloHello |
   #+----------------------+--------------------------------+

It also supports to take a Row object (containing all the columns of the
input table) as input.

.. code:: python

   @udf(result_type='ROW<id BIGINT, data STRING>')
   def func2(data: Row) -> Row:
       return Row(data.id, data.data * 2)

   # specify the function without the input columns
   table.map(func2).execute().print()
   # result is
   #+----------------------+--------------------------------+
   #|                   id |                           data |
   #+----------------------+--------------------------------+
   #|                    1 |                           HiHi |
   #|                    2 |                     HelloHello |
   #+----------------------+--------------------------------+

Note The input columns should not be specified when using func2 in the
map operation.

It also supports to use :doc:`vectorized scalar function <../udfs>` in the ``map`` operation. It should be
noted that the input type and output type should be pandas.DataFrame
instead of Row in this case.

.. code:: python

   import pandas as pd
   @udf(result_type='ROW<id BIGINT, data STRING>', func_type='pandas')
   def func3(data: pd.DataFrame) -> pd.DataFrame:
       res = pd.concat([data.id, data.data * 2], axis=1)
       return res

   table.map(func3).execute().print()
   # result is
   #+----------------------+--------------------------------+
   #|                   id |                           data |
   #+----------------------+--------------------------------+
   #|                    1 |                           HiHi |
   #|                    2 |                     HelloHello |
   #+----------------------+--------------------------------+

FlatMap

Performs a flat_map operation with a python :doc:table function <../udfs>.

.. code:: python

from pyflink.common import Row from pyflink.table.udf import udtf from pyflink.table import EnvironmentSettings, TableEnvironment

env_settings = EnvironmentSettings.in_batch_mode() table_env = TableEnvironment.create(env_settings)

table = table_env.from_elements([(1, 'Hi,Flink'), (2, 'Hello')], ['id', 'data'])

@udtf(result_types=['INT', 'STRING']) def split(x: Row) -> Row: for s in x.data.split(","): yield x.id, s

use split in flat_map

table.flat_map(split).execute().print()

result is

#+-------------+--------------------------------+ #| f0 | f1 | #+-------------+--------------------------------+ #| 1 | Hi | #| 1 | Flink | #| 2 | Hello | #+-------------+--------------------------------+

The python :doc:table function <../udfs> could also be used in join_lateral and left_outer_join_lateral.

.. code:: python

use table function in join_lateral or left_outer_join_lateral

table.join_lateral(split.alias('a', 'b')).execute().print()

result is

#+----------------------+--------------------------------+-------------+--------------------------------+ #| id | data | a | b | #+----------------------+--------------------------------+-------------+--------------------------------+ #| 1 | Hi,Flink | 1 | Hi | #| 1 | Hi,Flink | 1 | Flink | #| 2 | Hello | 2 | Hello | #+----------------------+--------------------------------+-------------+--------------------------------+

Aggregate


Performs an ``aggregate`` operation with a python :doc:`general aggregate function <../udfs>` or :doc:`vectorized aggregate function <../udfs>`.

.. code:: python

   from pyflink.common import Row
   from pyflink.table import EnvironmentSettings, TableEnvironment
   from pyflink.table.expressions import col
   from pyflink.table.udf import AggregateFunction, udaf

   class CountAndSumAggregateFunction(AggregateFunction):

       def get_value(self, accumulator):
           return Row(accumulator[0], accumulator[1])

       def create_accumulator(self):
           return Row(0, 0)

       def accumulate(self, accumulator, row):
           accumulator[0] += 1
           accumulator[1] += row.b

       def retract(self, accumulator, row):
           accumulator[0] -= 1
           accumulator[1] -= row.b

       def merge(self, accumulator, accumulators):
           for other_acc in accumulators:
               accumulator[0] += other_acc[0]
               accumulator[1] += other_acc[1]

       def get_accumulator_type(self):
           return 'ROW<a BIGINT, b BIGINT>'

       def get_result_type(self):
           return 'ROW<a BIGINT, b BIGINT>'

   function = CountAndSumAggregateFunction()
   agg = udaf(function,
              result_type=function.get_result_type(),
              accumulator_type=function.get_accumulator_type(),
              name=str(function.__class__.__name__))

   # aggregate with a python general aggregate function

   env_settings = EnvironmentSettings.in_streaming_mode()
   table_env = TableEnvironment.create(env_settings)
   t = table_env.from_elements([(1, 2), (2, 1), (1, 3)], ['a', 'b'])

   result = t.group_by(col('a')) \
       .aggregate(agg.alias("c", "d")) \
       .select(col('a'), col('c'), col('d'))
   result.execute().print()

   # the result is
   #+----+----------------------+----------------------+----------------------+
   #| op |                    a |                    c |                    d |
   #+----+----------------------+----------------------+----------------------+
   #| +I |                    1 |                    2 |                    5 |
   #| +I |                    2 |                    1 |                    1 |
   #+----+----------------------+----------------------+----------------------+

   # aggregate with a python vectorized aggregate function
   env_settings = EnvironmentSettings.in_batch_mode()
   table_env = TableEnvironment.create(env_settings)

   t = table_env.from_elements([(1, 2), (2, 1), (1, 3)], ['a', 'b'])

   pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
                      result_type='ROW<a FLOAT, b INT>',
                      func_type="pandas")
   t.aggregate(pandas_udaf.alias("a", "b")) \
    .select(col('a'), col('b')).execute().print()

   # the result is
   #+--------------------------------+-------------+
   #|                              a |           b |
   #+--------------------------------+-------------+
   #|                            2.0 |           3 |
   #+--------------------------------+-------------+

Note Similar to ``map`` operation, if you specify the aggregate function
without the input columns in ``aggregate`` operation, it will take Row
or Pandas.DataFrame as input which contains all the columns of the input
table including the grouping keys. Note You have to close the
“aggregate” with a select statement and it should not contain aggregate
functions in the select statement. Besides, the output of aggregate will
be flattened if it is a composite type.

FlatAggregate

Performs a flat_aggregate operation with a python general :doc:Table Aggregate Function <../udfs>

Similar to GroupBy Aggregation, FlatAggregate groups the inputs on the grouping keys. Different from AggregateFunction, TableAggregateFunction could return 0, 1, or more records for a grouping key. Similar to aggregate, you have to close the flat_aggregate with a select statement and the select statement should not contain aggregate functions.

.. code:: python

from pyflink.common import Row from pyflink.table import TableEnvironment, EnvironmentSettings from pyflink.table.expressions import col from pyflink.table.udf import udtaf, TableAggregateFunction

class Top2(TableAggregateFunction):

   def emit_value(self, accumulator):
       yield Row(accumulator[0])
       yield Row(accumulator[1])

   def create_accumulator(self):
       return [None, None]

   def accumulate(self, accumulator, row):
       if row.a is not None:
           if accumulator[0] is None or row.a > accumulator[0]:
               accumulator[1] = accumulator[0]
               accumulator[0] = row.a
           elif accumulator[1] is None or row.a > accumulator[1]:
               accumulator[1] = row.a

   def get_accumulator_type(self):
       return 'ARRAY<BIGINT>'

   def get_result_type(self):
       return 'ROW<a BIGINT>'

env_settings = EnvironmentSettings.in_streaming_mode() table_env = TableEnvironment.create(env_settings)

the result type and accumulator type can also be specified in the udtaf decorator:

top2 = udtaf(Top2(), result_type=DataTypes.ROW([DataTypes.FIELD("a", DataTypes.BIGINT())]), accumulator_type=DataTypes.ARRAY(DataTypes.BIGINT()))

top2 = udtaf(Top2()) t = table_env.from_elements([(1, 'Hi', 'Hello'), (3, 'Hi', 'hi'), (5, 'Hi2', 'hi'), (7, 'Hi', 'Hello'), (2, 'Hi', 'Hello')], ['a', 'b', 'c'])

call function "inline" without registration in Table API

result = t.group_by(col('b')).flat_aggregate(top2).select(col('*')).execute().print()

the result is:

#+----+--------------------------------+----------------------+ #| op | b | a | #+----+--------------------------------+----------------------+ #| +I | Hi2 | 5 | #| +I | Hi2 | <NULL> | #| +I | Hi | 7 | #| +I | Hi | 3 | #+----+--------------------------------+----------------------+