docs/content.zh/docs/deployment/resource-providers/standalone/docker.md
This Getting Started section guides you through the local setup (on one machine, but in separate containers) of a Flink cluster using Docker containers.
Docker is a popular container runtime. There are official Docker images for Apache Flink available on Docker Hub. You can use the Docker images to deploy a Session or Application cluster on Docker. This page focuses on the setup of Flink on Docker and Docker Compose.
Deployment into managed containerized environments, such as [standalone Kubernetes]({{< ref "docs/deployment/resource-providers/standalone/kubernetes" >}}) or [native Kubernetes]({{< ref "docs/deployment/resource-providers/native_kubernetes" >}}), are described on separate pages.
A Flink Session cluster can be used to run multiple jobs. Each job needs to be submitted to the cluster after the cluster has been deployed. To deploy a Flink Session cluster with Docker, you need to start a JobManager container. To enable communication between the containers, we first set a required Flink configuration property and create a network:
$ FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager"
$ docker network create flink-network
Then we launch the JobManager:
$ docker run \
--rm \
--name=jobmanager \
--network flink-network \
--publish 8081:8081 \
--env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}} jobmanager
and one or more TaskManager containers:
$ docker run \
--rm \
--name=taskmanager \
--network flink-network \
--env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}} taskmanager
The web interface is now available at localhost:8081.
Submission of a job is now possible like this (assuming you have a local distribution of Flink available):
$ ./bin/flink run ./examples/streaming/TopSpeedWindowing.jar
To shut down the cluster, either terminate (e.g. with CTRL-C) the JobManager and TaskManager processes, or use docker ps to identify and docker stop to terminate the containers.
The Flink image contains a regular Flink distribution with its default configuration and a standard entry point script. You can run its entry point in the following modes:
This allows you to deploy a standalone cluster (Session or Application Mode) in any containerised environment, for example:
<span class="label label-info">Note</span> [The native Kubernetes]({{< ref "docs/deployment/resource-providers/native_kubernetes" >}}) also runs the same image by default and deploys TaskManagers on demand so that you do not have to do it manually.
The next chapters describe how to start a single Flink Docker container for various purposes.
Once you've started Flink on Docker, you can access the Flink Web UI on localhost:8081 or submit jobs like this ./bin/flink run ./examples/streaming/TopSpeedWindowing.jar.
We recommend using Docker Compose for deploying Flink in Session Mode to ease system configuration.
{{< hint info >}} For high-level intuition behind the application mode, please refer to the [deployment mode overview]({{< ref "docs/deployment/overview#application-mode" >}}). {{< /hint >}}
A Flink Application cluster is a dedicated cluster which runs a single job. In this case, you deploy the cluster with the job as one step, thus, there is no extra job submission needed.
The job artifacts are included into the class path of Flink's JVM process within the container and consist of:
To deploy a cluster for a single job with Docker, you need to
/opt/flink/usrlib, or pass a list of jars via the --jars argumentTo make the job artifacts available locally in the container, you can
either mount a volume (or multiple volumes) with the artifacts to /opt/flink/usrlib when you start
the JobManager and TaskManagers:
$ FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager"
$ docker network create flink-network
$ docker run \
--mount type=bind,src=/host/path/to/job/artifacts1,target=/opt/flink/usrlib/artifacts1 \
--mount type=bind,src=/host/path/to/job/artifacts2,target=/opt/flink/usrlib/artifacts2 \
--rm \
--env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
--name=jobmanager \
--network flink-network \
flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}} standalone-job \
--job-classname com.job.ClassName \
[--job-id <job id>] \
[--fromSavepoint /path/to/savepoint [--allowNonRestoredState]] \
[job arguments]
$ docker run \
--mount type=bind,src=/host/path/to/job/artifacts1,target=/opt/flink/usrlib/artifacts1 \
--mount type=bind,src=/host/path/to/job/artifacts2,target=/opt/flink/usrlib/artifacts2 \
--env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}} taskmanager
or extend the Flink image by writing a custom Dockerfile, build it and use it for starting the JobManager and TaskManagers:
FROM flink
ADD /host/path/to/job/artifacts/1 /opt/flink/usrlib/artifacts/1
ADD /host/path/to/job/artifacts/2 /opt/flink/usrlib/artifacts/2
$ docker build --tag flink_with_job_artifacts .
$ docker run \
flink_with_job_artifacts standalone-job \
--job-classname com.job.ClassName \
[--job-id <job id>] \
[--fromSavepoint /path/to/savepoint [--allowNonRestoredState]] \
[job arguments]
$ docker run flink_with_job_artifacts taskmanager
or pass jar path by jars argument when you start the JobManager:
$ FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager"
$ docker network create flink-network
$ docker run \
--env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
--env ENABLE_BUILT_IN_PLUGINS=flink-s3-fs-hadoop-{{< version >}}.jar \
--name=jobmanager \
--network flink-network \
flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}} standalone-job \
--job-classname com.job.ClassName \
--jars s3://my-bucket/my-flink-job.jar,s3://my-bucket/my-flink-udf.jar \
[--job-id <job id>] \
[--fromSavepoint /path/to/savepoint [--allowNonRestoredState]] \
[job arguments]
The standalone-job argument starts a JobManager container in the Application Mode.
You can provide the following additional command line arguments to the cluster entrypoint:
--job-classname <job class name> (optional): Class name of the job to run.
By default, Flink scans its class path for a JAR with a Main-Class or program-class manifest entry and chooses it as the job class. Use this command line argument to manually set the job class.
{{< hint warning >}} This argument is required in case that no or more than one JAR with such a manifest entry is available on the class path. {{< /hint >}}
--job-id <job id> (optional): Manually set a Flink job ID for the job (default: 00000000000000000000000000000000)
--fromSavepoint /path/to/savepoint (optional): Restore from a savepoint
In order to resume from a savepoint, you also need to pass the savepoint path.
Note that /path/to/savepoint needs to be accessible in all Docker containers of the cluster
(e.g., storing it on a DFS or from the mounted volume or adding it to the image).
--allowNonRestoredState (optional): Skip broken savepoint state
Additionally, you can specify this argument to allow that savepoint state is skipped which cannot be restored.
--jars (optional): the paths of the job jar and any additional artifact(s) separated by commas
You can specify this argument to point the job artifacts stored in [flink filesystem]({{< ref "docs/deployment/filesystems/overview" >}}) or download via HTTP(S).
Flink will fetch these during the job deployment. (e.g. --jars s3://my-bucket/my-flink-job.jar, --jars s3://my-bucket/my-flink-job.jar,s3://my-bucket/my-flink-udf.jar ).
If the main function of the user job main class accepts arguments, you can also pass them at the end of the docker run command.
{{< hint info >}} For high-level intuition behind the session mode, please refer to the [deployment mode overview]({{< ref "docs/deployment/overview#session-mode" >}}). {{< /hint >}}
Local deployment in the Session Mode has already been described in the Getting Started section above.
{{< top >}}
There are two distribution channels for the Flink Docker images:
apache/flink (managed by the Flink developers)We recommend using the official images on Docker Hub, as they are reviewed by Docker. The images on apache/flink are provided in case of delays in the review process by Docker.
Launching an image named flink:latest will pull the latest image from Docker Hub. In order to use the images hosted in apache/flink, replace flink by apache/flink. Any of the image tags (starting from Flink 1.11.3) are available on apache/flink as well.
The Flink Docker repository is hosted on Docker Hub and serves images of Flink version 1.2.1 and later. The source for these images can be found in the Apache flink-docker repository.
Images for each supported combination of Flink and Scala versions are available, and tag aliases are provided for convenience.
For example, you can use the following aliases:
flink:latest → flink:<latest-flink>-scala_<latest-scala>flink:1.11 → flink:1.11.<latest-flink-1.11>-scala_2.12<span class="label label-info">Note</span> It is recommended to always use an explicit version tag of the docker image that specifies both the needed Flink and Scala
versions (for example flink:1.11-scala_2.12).
This will avoid some class conflicts that can occur if the Flink and/or Scala versions used in the application are different
from the versions provided by the docker image.
<span class="label label-info">Note</span> Prior to Flink 1.5 version, Hadoop dependencies were always bundled with Flink.
You can see that certain tags include the version of Hadoop, e.g. (e.g. -hadoop28).
Beginning with Flink 1.5, image tags that omit the Hadoop version correspond to Hadoop-free releases of Flink
that do not include a bundled Hadoop distribution.
Docker Compose is a way to run a group of Docker containers locally. The next sections show examples of configuration files to run Flink.
Create the docker-compose.yaml file. Please check the examples in the sections below:
Launch a cluster in the foreground (use -d for background)
$ docker compose up
Scale the cluster up or down to N TaskManagers
$ docker compose scale taskmanager=<N>
Access the JobManager container
$ docker exec -it $(docker ps --filter name=jobmanager --format={{.ID}}) /bin/sh
Kill the cluster
$ docker compose down
Access Web UI
When the cluster is running, you can visit the web UI at http://localhost:8081.
In application mode you start a Flink cluster that is dedicated to run only the Flink Jobs which have been bundled with the images.
Hence, you need to build a dedicated Flink Image per application.
Please check here for the details.
See also how to specify the JobManager arguments in the command for the jobmanager service.
<a id="app-cluster-yml">docker-compose.yml</a> for Application Mode.
version: "2.2"
services:
jobmanager:
image: flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}}
ports:
- "8081:8081"
command: standalone-job --job-classname com.job.ClassName [--jars /path/to/artifact1,/path/to/artifact2] [--job-id <job id>] [--fromSavepoint /path/to/savepoint] [--allowNonRestoredState] [job arguments]
volumes:
- /host/path/to/job/artifacts:/opt/flink/usrlib
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
parallelism.default: 2
taskmanager:
image: flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}}
depends_on:
- jobmanager
command: taskmanager
scale: 1
volumes:
- /host/path/to/job/artifacts:/opt/flink/usrlib
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
parallelism.default: 2
In Session Mode you use docker compose to spin up a long-running Flink Cluster to which you can then submit Jobs.
<a id="session-cluster-yml">docker-compose.yml</a> for Session Mode:
version: "2.2"
services:
jobmanager:
image: flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}}
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager:
image: flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}}
depends_on:
- jobmanager
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
In this example, you spin up a long-running session cluster and a Flink SQL CLI which uses this clusters to submit jobs to.
<a id="session-cluster-sql-yaml">docker-compose.yml</a> for Flink SQL Client with Session Cluster:
version: "2.2"
services:
jobmanager:
image: flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}}
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager:
image: flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}}
depends_on:
- jobmanager
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
sql-client:
image: flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}}
command: bin/sql-client.sh
depends_on:
- jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
rest.address: jobmanager
In order to start the SQL Client run
docker compose run sql-client
You can then start creating tables and queries those.
Note that all required dependencies (e.g. for connectors) need to be available in the cluster as well as the client. For example, if you would like to add and use the SQL Kafka Connector, you need to build a custom image.
kafka.Dockerfile as follows:FROM flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}}
ARG kafka_connector_version=4.0.0-2.0
RUN wget -P /opt/flink/lib https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/$kafka_connector_version/flink-sql-connector-kafka-$kafka_connector_version.jar
image config with the build command that references the Dockerfile for jobmanager, taskmanager and sql-client services.
For example, the jobmanager service will start with the following setting:jobmanager:
build:
dockerfile: ./kafka.Dockerfile
...
SQL Commands like ADD JAR will not work for JARs located on the host machine as they only work with the local filesystem, which in this case is Docker's overlay filesystem.
To build a custom image which has Python and PyFlink prepared, you can refer to the following Dockerfile:
{{< stable >}}
FROM flink:{{< version >}}
{{< /stable >}}
{{< unstable >}}
FROM flink:latest
{{< /unstable >}}
# install python3 and pip3
RUN apt-get update -y && \
apt-get install -y python3 python3-pip python3-dev && rm -rf /var/lib/apt/lists/*
RUN ln -s /usr/bin/python3 /usr/bin/python
# install PyFlink
{{< stable >}}
RUN pip3 install apache-flink=={{< version >}}
{{< /stable >}}
{{< unstable >}}
COPY apache_flink*.tar.gz /
RUN pip3 install /apache_flink_libraries*.tar.gz && pip3 install /apache_flink*.tar.gz
{{< /unstable >}}
{{< unstable >}} <span class="label label-info">Note</span> PyFlink packages could be built according to the [development guide]({{< github_repo >}}#building-apache-flink-from-source) {{< /unstable >}}
Build the image named as pyflink:latest:
$ docker build --tag pyflink:latest .
$ docker run flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}} \
<jobmanager|standalone-job|taskmanager|historyserver> \
-D jobmanager.rpc.address=host \
-D taskmanager.numberOfTaskSlots=3 \
-D blob.server.port=6124
Options set via dynamic properties overwrite the options from [Flink configuration file]({{< ref "docs/deployment/config#flink-配置文件" >}}).
When you run Flink image, you can also change its configuration options by setting the environment variable FLINK_PROPERTIES:
$ FLINK_PROPERTIES="jobmanager.rpc.address: host
taskmanager.numberOfTaskSlots: 3
blob.server.port: 6124
"
$ docker run --env FLINK_PROPERTIES=${FLINK_PROPERTIES} flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}} <jobmanager|standalone-job|taskmanager>
The [jobmanager.rpc.address]({{< ref "docs/deployment/config" >}}#jobmanager-rpc-address) option must be configured, others are optional to set.
The environment variable FLINK_PROPERTIES should contain a list of Flink cluster configuration options separated by new line,
the same way as in the [Flink configuration file]({{< ref "docs/deployment/config#flink-配置文件" >}}). FLINK_PROPERTIES takes precedence over configurations in [Flink configuration file]({{< ref "docs/deployment/config#flink-配置文件" >}}).
The configuration files ([Flink configuration file]({{< ref "docs/deployment/config#flink-配置文件" >}}), logging, hosts etc) are located in the /opt/flink/conf directory in the Flink image.
To provide a custom location for the Flink configuration files, you can
either mount a volume with the custom configuration files to this path /opt/flink/conf when you run the Flink image:
$ docker run \
--mount type=bind,src=/host/path/to/custom/conf,target=/opt/flink/conf \
flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}} <jobmanager|standalone-job|taskmanager>
or add them to your custom Flink image, build and run it:
FROM flink
ADD /host/path/to/config.yaml /opt/flink/conf/config.yaml
ADD /host/path/to/log4j.properties /opt/flink/conf/log4j.properties
{{< hint info >}} The mounted volume must contain all necessary configuration files. The [Flink configuration file]({{< ref "docs/deployment/config#flink-配置文件" >}}) must have write permission so that the Docker entry point script can modify it in certain cases. {{< /hint >}}
As described in the [plugins]({{< ref "docs/deployment/filesystems/plugins" >}}) documentation page: In order to use plugins they must be copied to the correct location in the Flink installation in the Docker container for them to work.
If you want to enable plugins provided with Flink (in the opt/ directory of the Flink distribution), you can pass the environment variable ENABLE_BUILT_IN_PLUGINS when you run the Flink image.
The ENABLE_BUILT_IN_PLUGINS should contain a list of plugin jar file names separated by ;. A valid plugin name is for example flink-s3-fs-hadoop-{{< version >}}.jar
$ docker run \
--env ENABLE_BUILT_IN_PLUGINS=flink-plugin1.jar;flink-plugin2.jar \
flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}} <jobmanager|standalone-job|taskmanager>
There are also more advanced ways for customizing the Flink image.
Flink introduced jemalloc as default memory allocator to resolve memory fragmentation problem (please refer to FLINK-19125).
You could switch back to use glibc as the memory allocator to restore the old behavior or if any unexpected memory consumption or problem observed
(and please report the issue via JIRA or mailing list if you found any), by setting environment variable DISABLE_JEMALLOC as true:
$ docker run \
--env DISABLE_JEMALLOC=true \
flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}} <jobmanager|standalone-job|taskmanager>
For users that are still using glibc memory allocator, the glibc bug can easily be reproduced, especially while savepoints or full checkpoints with RocksDBStateBackend are created.
Setting the environment variable MALLOC_ARENA_MAX can avoid unlimited memory growth:
$ docker run \
--env MALLOC_ARENA_MAX=1 \
flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}} <jobmanager|standalone-job|taskmanager>
There are several ways in which you can further customize the Flink image:
/opt/flink/opt into /opt/flink/lib or /opt/flink/plugins/opt/flink/lib (e.g. Hadoop)/opt/flink/pluginsYou can customize the Flink image in several ways:
override the container entry point with a custom script where you can run any bootstrap actions.
At the end you can call the standard /docker-entrypoint.sh script of the Flink image with the same arguments
as described in supported deployment modes.
The following example creates a custom entry point script which enables more libraries and plugins. The custom script, custom library and plugin are provided from a mounted volume. Then it runs the standard entry point script of the Flink image:
# create custom_lib.jar
# create custom_plugin.jar
$ echo "
# enable an optional library
ln -fs /opt/flink/opt/flink-sql-gateway-*.jar /opt/flink/lib/
# enable a custom library
ln -fs /mnt/custom_lib.jar /opt/flink/lib/
mkdir -p /opt/flink/plugins/flink-s3-fs-hadoop
# enable an optional plugin
ln -fs /opt/flink/opt/flink-s3-fs-hadoop-*.jar /opt/flink/plugins/flink-s3-fs-hadoop/
mkdir -p /opt/flink/plugins/custom_plugin
# enable a custom plugin
ln -fs /mnt/custom_plugin.jar /opt/flink/plugins/custom_plugin/
/docker-entrypoint.sh <jobmanager|standalone-job|taskmanager>
" > custom_entry_point_script.sh
$ chmod 755 custom_entry_point_script.sh
$ docker run \
--mount type=bind,src=$(pwd),target=/mnt
flink:{{< stable >}}{{< version >}}-scala{{< scala_version >}}{{< /stable >}}{{< unstable >}}latest{{< /unstable >}} /mnt/custom_entry_point_script.sh
extend the Flink image by writing a custom Dockerfile and build a custom image:
FROM flink
RUN set -ex; apt-get update; apt-get -y install python
ADD /host/path/to/config.yaml /container/local/path/to/custom/conf/config.yaml
ADD /host/path/to/log4j.properties /container/local/path/to/custom/conf/log4j.properties
RUN ln -fs /opt/flink/opt/flink-sql-gateway-*.jar /opt/flink/lib/.
RUN mkdir -p /opt/flink/plugins/flink-s3-fs-hadoop
RUN ln -fs /opt/flink/opt/flink-s3-fs-hadoop-*.jar /opt/flink/plugins/flink-s3-fs-hadoop/.
ENV VAR_NAME value
Commands for building:
```sh
$ docker build --tag custom_flink_image .
# optional push to your docker image registry if you have it,
# e.g. to distribute the custom image to your cluster
$ docker push custom_flink_image
```
{{< top >}}