tutorials/text_to_image/using_cloud_with_ray.ipynb
Daft's default Native runner is great for experimentation on your laptop, but when it comes times to running much more computationally expensive jobs that need to take advantage of large scale parallelism, you can run Daft on a Ray cluster instead.
Ray is a framework that exposes a Python interface for running distributed computation over a cluster of machines. Daft is built to use Ray as a backend for running dataframe operations, allowing it to scale to huge amounts of data and computation.
However even if you do not have a big cluster to use Ray, you can run Ray locally on your laptop (in which case it would spin up a Ray cluster of just a single machine: your laptop), and using Daft's Ray backend would allow Daft to fully utilize your machine's cores.
!pip install daft[ray]
!pip install Pillow
CI = False
import daft
USE_RAY = False if CI else True
NUM_ROWS_LIMIT = 16 if CI else 160
IO_CONFIG = daft.io.IOConfig(
s3=daft.io.S3Config(anonymous=True, region_name="us-west-2")
) # Use anonymous-mode for accessing AWS S3
PARQUET_URL = "s3://daft-public-data/tutorials/laion-parquet/train-00000-of-00001-6f24a7497df494ae.parquet"
daft.set_planning_config(default_io_config=IO_CONFIG)
By default, Daft uses the "Python Runner" which runs all processing in a single Python process.
To activate the RayRunner, you can either:
DAFT_RUNNER=ray and optionally the RAY_ADDRESS environment variablesdaft.set_runner_ray(...) at the start of your program.We'll demonstrate option 2 here!
import daft
if USE_RAY:
RAY_ADDRESS = None
daft.set_runner_ray(
# You may provide Daft with the address to an existing Ray cluster if you have one!
# If this is not provided, Daft will default to spinning up a single-node Ray cluster consisting of just your current local machine
address=RAY_ADDRESS,
)
Let's try to download the images from our previous Text-to-Image Generatation tutorial with the RayRunner instead.
We limit the dataset to 160 rows and repartition it into 8 partitions for demonstration purposes. This just means that our data will be divided into 8 approximately equal-sized "chunks".
from daft import col
parquet_df = daft.read_parquet(PARQUET_URL, io_config=IO_CONFIG).limit(NUM_ROWS_LIMIT).repartition(8)
parquet_df.collect()
Now, let's try downloading the data from the URLs with .download()!
images_df = parquet_df.with_column("images", col("URL").download(on_error="null"))
images_df.collect()
On Google Colab, it should take approximately 10 seconds, vs about 20 seconds with the Py Runner!
With exactly the same code, we were able to achieve a 2x speedup in execution - what happened here?
It turns out that our workload is IO Bound because most of the time is spent waiting for data to be downloaded from the URL.
By default, the .download() UDF requests num_cpus=1. Since our Google Colab machine has 2 CPUs, the RayRunner is able to run two of these UDFs in parallel, hence achieving a 2x increase in throughput!
We have seen that using the RayRunner even locally provides us with some speedup already. However, the real power of distributed computing is in allowing us to access thousands of CPUs and GPUs in the cloud, on a remote Ray cluster.
For example, UDFs that request for a single GPU with can run in parallel across hundreds of GPUs on a remote Ray cluster, effortlessly scaling your workloads up to take full advantage of the available hardware.
To run Daft on large clusters, check out Eventual where you have access to a fully managed platform for running Daft at scale.