Back to Daft

Using Ray for Scaling Up

tutorials/text_to_image/using_cloud_with_ray.ipynb

0.7.104.0 KB
Original Source

Using Ray for Scaling Up

Daft's default Native runner is great for experimentation on your laptop, but when it comes times to running much more computationally expensive jobs that need to take advantage of large scale parallelism, you can run Daft on a Ray cluster instead.

What is a Ray Cluster, and why do I need it?

Ray is a framework that exposes a Python interface for running distributed computation over a cluster of machines. Daft is built to use Ray as a backend for running dataframe operations, allowing it to scale to huge amounts of data and computation.

However even if you do not have a big cluster to use Ray, you can run Ray locally on your laptop (in which case it would spin up a Ray cluster of just a single machine: your laptop), and using Daft's Ray backend would allow Daft to fully utilize your machine's cores.

Let's get started!

python
!pip install daft[ray]
!pip install Pillow
python
CI = False
python
import daft

USE_RAY = False if CI else True
NUM_ROWS_LIMIT = 16 if CI else 160
IO_CONFIG = daft.io.IOConfig(
    s3=daft.io.S3Config(anonymous=True, region_name="us-west-2")
)  # Use anonymous-mode for accessing AWS S3
PARQUET_URL = "s3://daft-public-data/tutorials/laion-parquet/train-00000-of-00001-6f24a7497df494ae.parquet"

daft.set_planning_config(default_io_config=IO_CONFIG)

By default, Daft uses the "Python Runner" which runs all processing in a single Python process.

To activate the RayRunner, you can either:

  1. Use the DAFT_RUNNER=ray and optionally the RAY_ADDRESS environment variables
  2. Call daft.set_runner_ray(...) at the start of your program.

We'll demonstrate option 2 here!

python
import daft

if USE_RAY:
    RAY_ADDRESS = None
    daft.set_runner_ray(
        # You may provide Daft with the address to an existing Ray cluster if you have one!
        # If this is not provided, Daft will default to spinning up a single-node Ray cluster consisting of just your current local machine
        address=RAY_ADDRESS,
    )

Let's try to download the images from our previous Text-to-Image Generatation tutorial with the RayRunner instead.

We limit the dataset to 160 rows and repartition it into 8 partitions for demonstration purposes. This just means that our data will be divided into 8 approximately equal-sized "chunks".

python
from daft import col

parquet_df = daft.read_parquet(PARQUET_URL, io_config=IO_CONFIG).limit(NUM_ROWS_LIMIT).repartition(8)
parquet_df.collect()

Download data from URLs

Now, let's try downloading the data from the URLs with .download()!

python
images_df = parquet_df.with_column("images", col("URL").download(on_error="null"))
images_df.collect()

On Google Colab, it should take approximately 10 seconds, vs about 20 seconds with the Py Runner!

With exactly the same code, we were able to achieve a 2x speedup in execution - what happened here?

It turns out that our workload is IO Bound because most of the time is spent waiting for data to be downloaded from the URL.

By default, the .download() UDF requests num_cpus=1. Since our Google Colab machine has 2 CPUs, the RayRunner is able to run two of these UDFs in parallel, hence achieving a 2x increase in throughput!

Remote Ray Clusters

We have seen that using the RayRunner even locally provides us with some speedup already. However, the real power of distributed computing is in allowing us to access thousands of CPUs and GPUs in the cloud, on a remote Ray cluster.

For example, UDFs that request for a single GPU with can run in parallel across hundreds of GPUs on a remote Ray cluster, effortlessly scaling your workloads up to take full advantage of the available hardware.

To run Daft on large clusters, check out Eventual where you have access to a fully managed platform for running Daft at scale.