examples/tutorial/jupyter/execution/pandas_on_dask/cluster/exercise_5.ipynb
GOAL: Learn how to set up a Dask cluster for Modin, connect Modin to a Dask cluster and run pandas queries on a cluster.
NOTE: This exercise has extra requirements. Read instructions carefully before attempting.
This exercise instructs users on how to start a 500+ core Dask cluster, and it is not shut down until the end of exercise.
Often in practice we have a need to exceed the capabilities of a single machine. Modin works and performs well in both local mode and in a cluster environment. The key advantage of Modin is that your python code does not change between local development and cluster execution. Users are not required to think about how many workers exist or how to distribute and partition their data; Modin handles all of this seamlessly and transparently.
First of all, install the necessary dependencies in your environment:
!pip install dask_cloudprovider[aws]
The next step is to setup your AWS credentials, namely, set AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY
and AWS_SESSION_TOKEN (Optional) (refer to AWS CLI environment variables to get more insight on this):
import os
os.environ["AWS_ACCESS_KEY_ID"] = "<aws_access_key_id>"
os.environ["AWS_SECRET_ACCESS_KEY"] = "<aws_secret_access_key>"
os.environ["AWS_SESSION_TOKEN"] = "<aws_session_token>"
This example starts 1 scheduler node (m5.24xlarge) and 6 worker nodes (m5.24xlarge), 576 total CPUs. Keep in mind the scheduler node manages cluster operation but doesn't perform any execution.
You can check the Amazon EC2 pricing page.
Dask cluster can be deployed in different ways (refer to Dask documentaion to get more information about it), but in this tutorial we will use the EC2Cluster from dask_cloudprovider to create and initialize a Dask cluster on Amazon Web Service (AWS).
Note: EC2Cluster uses a docker container to run the scheduler and each of the workers. Probably you need to use another docker image depending on your python version and requirements. You can find more docker-images on daskdev page.
In the next cell you can see how the EC2Cluster is being created. <b>Set your key_name and modify AWS settings as required before running it.</b>
from dask_cloudprovider.aws import EC2Cluster
n_workers = 6
cluster = EC2Cluster(
# AWS parameters
key_name = "", # set your keyname
region = "us-west-2",
availability_zone = ["us-west-2a"],
ami = "ami-0387d929287ab193e",
instance_type = "m5.24xlarge",
vpc = "vpc-002bd14c63f227832",
subnet_id = "subnet-09860dafd79720938",
filesystem_size = 200, # in GB
# DASK parameters
n_workers = n_workers,
docker_image = "daskdev/dask:latest",
debug = True,
security=False,
)
scheduler_adress = cluster.scheduler_address
print(f"Scheduler IP address of Dask cluster: {scheduler_adress}")
After creating the cluster you need to connect to it. To do this you should put the EC2Cluster instance or the scheduler IP address in distributed.Client.
When you connect to the cluster, the workers may not be initialized yet, so you need to wait for them using client.wait_for_workers.
Then you can call client.ncores() and check which workers are available and how many threads are used for each of them.
from distributed import Client
client = Client(cluster)
# Or use an IP address connection if the cluster instance is unavailable:
# client = Client(f"{scheduler_adress}:8687")
client.wait_for_workers(n_workers)
client.ncores()
After successful initialization of the cluster, you need to configure it.
You can use plugins to install any requirements into workers:
You have to install Modin package on each worker using PipInstall plugin.
from dask.distributed import PipInstall
client.register_plugin(PipInstall(packages=["modin"]))
If you need an additional workers configuration, you can create your own WorkerPlugin or function that will be executed on each worker upon calling client.run().
NOTE: Dask cluster does not check if this plugin or function has been called before. Therefore, you need to take this into account when using them.
In this tutorial a CSV file will be read, so you need to download it to each of the workers and local machine with the same global path.
from dask.distributed import Worker
import os
import urllib
def dataset_upload(file_url, file_path):
try:
dir_name = os.path.dirname(file_path)
if not os.path.exists(dir_name):
os.makedirs(dir_name)
if os.path.exists(file_path):
return "File has already existed."
else:
urllib.request.urlretrieve(file_url, file_path)
return "OK"
except Exception as ex:
return str(ex)
Set the directory where it should be downloaded (the local directory will be used by default):
directory_path = "./"
Then you need to run dataset_upload function on all workers. As the result, you will get a dictionary, where the result of the function execution will be for each workers:
file_path = os.path.join(os.path.abspath(directory_path), "taxi.csv")
client.run(dataset_upload, "https://modin-datasets.intel.com/testing/yellow_tripdata_2015-01.csv", file_path)
You have to also execute this function on the local machine:
dataset_upload("https://modin-datasets.intel.com/testing/yellow_tripdata_2015-01.csv", file_path)
<b>Congratulations! The cluster is now fully configured and we can start running Pandas queries.</b>
Same as local mode Modin on cluster uses Ray as an execution engine by default so no additional action is required to start to use it. Alternatively, if you need to use another engine, it should be specified either by setting the Modin config or by setting Modin environment variable before the first operation with Modin as it is shown below. Also, note that the full list of Modin configs and corresponding environment variables can be found in the Modin Configuration Settings section of the Modin documentation.
# Modin engine can be specified either by config
import modin.config as cfg
cfg.Engine.put("dask")
# or by setting the environment variable
# import os
# os.environ["MODIN_ENGINE"] = "dask"
Now you can use Modin on the Dask cluster.
Let's read the downloaded CSV file and execute such pandas operations as count, groupby and map:
import modin.pandas as pd
import time
t0 = time.perf_counter()
df = pd.read_csv(file_path, quoting=3)
df_count = df.count()
df_groupby_count = df.groupby("passenger_count").count()
df_map = df.map(str)
t1 = time.perf_counter()
print(f"Full script time is {(t1 - t0):.3f}")
Now that we have finished computation, we can shut down the cluster:
cluster.close()