doc/source/cluster/vms/user-guides/community/spark.rst
.. _ray-Spark-deploy:
This document describes a couple high-level steps to run Ray clusters on Spark Standalone cluster <https://spark.apache.org/docs/latest/spark-standalone.html>_.
This is a spark application example code that starts Ray cluster on spark, and then execute ray application code, then shut down initiated ray cluster.
.. code-block:: python
from pyspark.sql import SparkSession
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster, MAX_NUM_WORKER_NODES
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Ray on spark example 1") \
.config("spark.task.cpus", "4") \
.getOrCreate()
# Set up a ray cluster on this spark application, it creates a background
# spark job that each spark task launches one ray worker node.
# ray head node is launched in spark application driver side.
# Resources (CPU / GPU / memory) allocated to each ray worker node is equal
# to resources allocated to the corresponding spark task.
setup_ray_cluster(max_worker_nodes=MAX_NUM_WORKER_NODES)
# You can any ray application code here, the ray application will be executed
# on the ray cluster setup above.
# You don't need to set address for `ray.init`,
# it will connect to the cluster created above automatically.
ray.init()
...
# Terminate ray cluster explicitly.
# If you don't call it, when spark application is terminated, the ray cluster
# will also be terminated.
shutdown_ray_cluster()
2) Submit the spark application above to spark standalone cluster.
.. code-block:: bash
#!/bin/bash
spark-submit \
--master spark://{spark_master_IP}:{spark_master_port} \
path/to/ray-on-spark-example1.py
This is a spark application example code that starts a long running Ray cluster on spark. The created ray cluster can be accessed by remote python processes.
.. code-block:: python
from pyspark.sql import SparkSession
import time
from ray.util.spark import setup_ray_cluster, MAX_NUM_WORKER_NODES
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("long running ray cluster on spark") \
.config("spark.task.cpus", "4") \
.getOrCreate()
cluster_address = setup_ray_cluster(
max_worker_nodes=MAX_NUM_WORKER_NODES
)
print("Ray cluster is set up, you can connect to this ray cluster "
f"via address ray://{cluster_address}")
# Sleep forever until the spark application being terminated,
# at that time, the ray cluster will also be terminated.
while True:
time.sleep(10)
2) Submit the spark application above to spark standalone cluster.
.. code-block:: bash
#!/bin/bash
spark-submit \
--master spark://{spark_master_IP}:{spark_master_port} \
path/to/long-running-ray-cluster-on-spark.py
.. autofunction:: ray.util.spark.setup_ray_cluster
.. autofunction:: ray.util.spark.shutdown_ray_cluster
.. autofunction:: ray.util.spark.setup_global_ray_cluster