Back to Flink

Python REPL

flink-python/docs/user_guide/python_repl.rst

0.4-rc17.3 KB
Original Source

.. ################################################################################ Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

     http://www.apache.org/licenses/LICENSE-2.0

 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
limitations under the License.

################################################################################

=========== Python REPL

Flink comes with an integrated interactive Python Shell. It can be used in a local setup as well as in a cluster setup. See the :flinkdoc:standalone resource provider page <docs/deployment/resource-providers/standalone/overview/> for more information about how to setup a local Flink. You can also build a local setup from source <https://github.com/apache/flink#building-apache-flink-from-source>_.

.. note:: The Python Shell will run the command "python". Please refer to the :flinkdoc:First Steps guide <docs/getting-started/local_installation/> for PyFlink installation instructions.

To use the shell with an integrated Flink cluster, you can simply install PyFlink with PyPi and execute the shell directly:

.. code-block:: bash

# install PyFlink
$ python -m pip install apache-flink
# execute the shell
$ pyflink-shell.sh local

To run the shell on a cluster, please see the Setup section below.

Usage

The shell only supports Table API currently. The Table Environments are automatically prebound after startup. Use "bt_env" and "st_env" to access BatchTableEnvironment and StreamTableEnvironment respectively.

Table API

The example below is a simple program in the Python shell:

Streaming:

.. code-block:: python

>>> import tempfile
>>> import os
>>> import shutil
>>> sink_path = tempfile.gettempdir() + '/streaming.csv'
>>> if os.path.exists(sink_path):
...     if os.path.isfile(sink_path):
...         os.remove(sink_path)
...     else:
...         shutil.rmtree(sink_path)
>>> s_env.set_parallelism(1)
>>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
>>> st_env.create_temporary_table("stream_sink", TableDescriptor.for_connector("filesystem")
...     .schema(Schema.new_builder()
...         .column("a", DataTypes.BIGINT())
...         .column("b", DataTypes.STRING())
...         .column("c", DataTypes.STRING())
...         .build())
...     .option("path", path)
...     .format(FormatDescriptor.for_format("csv")
...         .option("field-delimiter", ",")
...         .build())
...     .build())
>>> t.select("a + 1, b, c")\
...     .execute_insert("stream_sink").wait()
>>> # If the job runs in local mode, you can exec following code in Python shell to see the result:
>>> with open(os.path.join(sink_path, os.listdir(sink_path)[0]), 'r') as f:
...     print(f.read())

Batch:

.. code-block:: python

>>> import tempfile
>>> import os
>>> import shutil
>>> sink_path = tempfile.gettempdir() + '/batch.csv'
>>> if os.path.exists(sink_path):
...     if os.path.isfile(sink_path):
...         os.remove(sink_path)
...     else:
...         shutil.rmtree(sink_path)
>>> b_env.set_parallelism(1)
>>> t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
>>> st_env.create_temporary_table("batch_sink", TableDescriptor.for_connector("filesystem")
...     .schema(Schema.new_builder()
...         .column("a", DataTypes.BIGINT())
...         .column("b", DataTypes.STRING())
...         .column("c", DataTypes.STRING())
...         .build())
...     .option("path", path)
...     .format(FormatDescriptor.for_format("csv")
...         .option("field-delimiter", ",")
...         .build())
...     .build())
>>> t.select("a + 1, b, c")\
...     .execute_insert("batch_sink").wait()
>>> # If the job runs in local mode, you can exec following code in Python shell to see the result:
>>> with open(os.path.join(sink_path, os.listdir(sink_path)[0]), 'r') as f:
...     print(f.read())

Setup

To get an overview of what options the Python Shell provides, please use

.. code-block:: bash

pyflink-shell.sh --help

Local

To use the shell with an integrated Flink cluster just execute:

.. code-block:: bash

pyflink-shell.sh local

Remote

To use it with a running cluster, please start the Python shell with the keyword remote and supply the host and port of the JobManager with:

.. code-block:: bash

pyflink-shell.sh remote <hostname> <portnumber>

Yarn Python Shell cluster

The shell can deploy a Flink cluster to YARN, which is used exclusively by the shell. The shell deploys a new Flink cluster on YARN and connects the cluster. You can also specify options for YARN cluster such as memory for JobManager, name of YARN application, etc.

For example, to start a Yarn cluster for the Python Shell with two TaskManagers use the following:

.. code-block:: bash

pyflink-shell.sh yarn -n 2

For all other options, see the full reference at the bottom.

Yarn Session

If you have previously deployed a Flink cluster using the Flink Yarn Session, the Python shell can connect with it using the following command:

.. code-block:: bash

pyflink-shell.sh yarn

Full Reference

.. code-block:: text

Flink Python Shell
Usage: pyflink-shell.sh [local|remote|yarn] [options] <args>...

Command: local [options]
Starts Flink Python shell with a local Flink cluster
usage:
     -h,--help   Show the help message with descriptions of all options.
Command: remote [options] <host> <port>
Starts Flink Python shell connecting to a remote cluster
  <host>
        Remote host name as string
  <port>
        Remote port as integer

usage:
     -h,--help   Show the help message with descriptions of all options.
Command: yarn [options]
Starts Flink Python shell connecting to a yarn cluster
usage:
     -h,--help                       Show the help message with descriptions of
                                     all options.
     -jm,--jobManagerMemory <arg>    Memory for JobManager Container with
                                     optional unit (default: MB)
     -nm,--name <arg>                Set a custom name for the application on
                                     YARN
     -qu,--queue <arg>               Specify YARN queue.
     -s,--slots <arg>                Number of slots per TaskManager
     -tm,--taskManagerMemory <arg>   Memory per TaskManager Container with
                                     optional unit (default: MB)
-h | --help
      Prints this usage text