Back to Ray

Loading Data

doc/source/data/loading-data.rst

1.13.137.2 KB
Original Source

.. _loading_data:

============ Loading Data

Ray Data loads data from various sources. This guide shows you how to:

  • Read files <#reading-files>_ like images
  • Load in-memory data <#loading-data-from-other-libraries>_ like pandas DataFrames
  • Read databases <#reading-databases>_ like MySQL

.. _reading-files:

Reading files

Ray Data reads files from local disk or cloud storage in a variety of file formats. To view the full list of supported file formats, see the :ref:Loading Data API <loading-data-api>.

.. tab-set::

.. tab-item:: Parquet

    To read Parquet files, call :func:`~ray.data.read_parquet`.

    .. testcode::

        import ray

        ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet")

        print(ds.schema())

    .. testoutput::

        Column        Type
        ------        ----
        sepal.length  double
        sepal.width   double
        petal.length  double
        petal.width   double
        variety       string

    .. tip::

        When reading parquet files, you can take advantage of column pruning to
        efficiently filter columns at the file scan level. See
        :ref:`Parquet column pruning <parquet_column_pruning>` for more details
        on the projection pushdown feature.

.. tab-item:: Images

    To read raw images, call :func:`~ray.data.read_images`. Ray Data represents
    images as NumPy ndarrays.

    .. testcode::

        import ray

        ds = ray.data.read_images("s3://anonymous@ray-example-data/batoidea/JPEGImages/")

        print(ds.schema())

    .. testoutput::

        Column  Type
        ------  ----
        image   ArrowTensorTypeV2(shape=(32, 32, 3), dtype=uint8)

.. tab-item:: Text

    To read lines of text, call :func:`~ray.data.read_text`.

    .. testcode::

        import ray

        ds = ray.data.read_text("s3://anonymous@ray-example-data/this.txt")

        print(ds.schema())

    .. testoutput::

        Column  Type
        ------  ----
        text    string

.. tab-item:: CSV

    To read CSV files, call :func:`~ray.data.read_csv`.

    .. testcode::

        import ray

        ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

        print(ds.schema())

    .. testoutput::

        Column             Type
        ------             ----
        sepal length (cm)  double
        sepal width (cm)   double
        petal length (cm)  double
        petal width (cm)   double
        target             int64

.. tab-item:: Binary

    To read raw binary files, call :func:`~ray.data.read_binary_files`.

    .. testcode::

        import ray

        ds = ray.data.read_binary_files("s3://anonymous@ray-example-data/documents")

        print(ds.schema())

    .. testoutput::

        Column  Type
        ------  ----
        bytes   binary

.. tab-item:: TFRecords

    To read TFRecords files, call :func:`~ray.data.read_tfrecords`.

    .. testcode::

        import ray

        ds = ray.data.read_tfrecords("s3://anonymous@ray-example-data/iris.tfrecords")

        print(ds.schema())

    .. testoutput::
        :options: +MOCK

        Column        Type
        ------        ----
        label         binary
        petal.length  float
        sepal.width   float
        petal.width   float
        sepal.length  float

Reading files from local disk


To read files from local disk, call a function like :func:`~ray.data.read_parquet` and
specify paths with the ``local://`` schema. Paths can point to files or directories.

To read formats other than Parquet, see the :ref:`Loading Data API <loading-data-api>`.

.. tip::

    If your files are accessible on every node, exclude ``local://`` to parallelize the
    read tasks across the cluster.

.. testcode::
    :skipif: True

    import ray

    ds = ray.data.read_parquet("local:///tmp/iris.parquet")

    print(ds.schema())

.. testoutput::

    Column        Type
    ------        ----
    sepal.length  double
    sepal.width   double
    petal.length  double
    petal.width   double
    variety       string

Reading files from cloud storage

To read files in cloud storage, authenticate all nodes with your cloud service provider. Then, call a method like :func:~ray.data.read_parquet and specify URIs with the appropriate schema. URIs can point to buckets, folders, or objects.

To read formats other than Parquet, see the :ref:Loading Data API <loading-data-api>.

.. tab-set::

.. tab-item:: S3

    To read files from Amazon S3, specify URIs with the ``s3://`` scheme.

    .. testcode::

        import ray

        ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet")

        print(ds.schema())

    .. testoutput::

        Column        Type
        ------        ----
        sepal.length  double
        sepal.width   double
        petal.length  double
        petal.width   double
        variety       string

    Ray Data relies on PyArrow for authentication with Amazon S3. For more on how to configure
    your credentials to be compatible with PyArrow, see their
    `S3 Filesystem docs <https://arrow.apache.org/docs/python/filesystems.html#s3>`_.

.. tab-item:: GCS

    To read files from Google Cloud Storage, install the
    `Filesystem interface to Google Cloud Storage <https://gcsfs.readthedocs.io/en/latest/>`_

    .. code-block:: console

        pip install gcsfs

    Then, create a ``GCSFileSystem`` and specify URIs with the ``gs://`` scheme.

    .. testcode::
        :skipif: True

        import ray

        filesystem = gcsfs.GCSFileSystem(project="my-google-project")
        ds = ray.data.read_parquet(
            "gs://...",
            filesystem=filesystem
        )

        print(ds.schema())

    .. testoutput::

        Column        Type
        ------        ----
        sepal.length  double
        sepal.width   double
        petal.length  double
        petal.width   double
        variety       string

    Ray Data relies on PyArrow for authentication with Google Cloud Storage. For more on how
    to configure your credentials to be compatible with PyArrow, see their
    `GCS Filesystem docs <https://arrow.apache.org/docs/python/filesystems.html#google-cloud-storage-file-system>`_.

.. tab-item:: ABS

    To read files from Azure Blob Storage, install the
    `Filesystem interface to Azure-Datalake Gen1 and Gen2 Storage <https://pypi.org/project/adlfs/>`_

    .. code-block:: console

        pip install adlfs

    Then, create a ``AzureBlobFileSystem`` and specify URIs with the `az://` scheme.

    .. testcode::
        :skipif: True

        import adlfs
        import ray

        ds = ray.data.read_parquet(
            "az://ray-example-data/iris.parquet",
            adlfs.AzureBlobFileSystem(account_name="azureopendatastorage")
        )

        print(ds.schema())

    .. testoutput::

        Column        Type
        ------        ----
        sepal.length  double
        sepal.width   double
        petal.length  double
        petal.width   double
        variety       string

    Ray Data relies on PyArrow for authentication with Azure Blob Storage. For more on how
    to configure your credentials to be compatible with PyArrow, see their
    `fsspec-compatible filesystems docs <https://arrow.apache.org/docs/python/filesystems.html#using-fsspec-compatible-filesystems-with-arrow>`_.

Reading files from NFS


To read files from NFS filesystems, call a function like :func:`~ray.data.read_parquet`
and specify files on the mounted filesystem. Paths can point to files or directories.

To read formats other than Parquet, see the :ref:`Loading Data API <loading-data-api>`.

.. testcode::
    :skipif: True

    import ray

    ds = ray.data.read_parquet("/mnt/cluster_storage/iris.parquet")

    print(ds.schema())

.. testoutput::

    Column        Type
    ------        ----
    sepal.length  double
    sepal.width   double
    petal.length  double
    petal.width   double
    variety       string

Handling compressed files

To read a compressed file, specify compression in arrow_open_stream_args. You can use any codec supported by Arrow <https://arrow.apache.org/docs/python/generated/pyarrow.CompressedInputStream.html>__.

.. testcode::

import ray

ds = ray.data.read_csv(
    "s3://anonymous@ray-example-data/iris.csv.gz",
    arrow_open_stream_args={"compression": "gzip"},
)

Downloading files from URIs


Sometimes you may have a metadata table with a column of URIs and you want to download the files referenced by the URIs.

You can download data in bulk by leveraging the :func:`~ray.data.Dataset.with_column` method together with the :func:`~ray.data.expressions.download` expression. This approach lets the system handle the parallel downloading of files referenced by URLs in your dataset, without needing to manage async code within your own transformations.

The following example shows how to download a batch of images from URLs listed in a Parquet file:

.. testcode::

    import ray
    from ray.data.expressions import download

    # Read a Parquet file containing a column of image URLs
    ds = ray.data.read_parquet("s3://anonymous@ray-example-data/imagenet/metadata_file.parquet")

    # Use `with_column` and `download` to download the images in parallel.
    # This creates a new column 'bytes' with the downloaded file contents.
    ds = ds.with_column(
        "bytes",
        download("image_url"),
    )

    ds.take(1)

Loading data from other libraries
=================================

Loading data from single-node data libraries

Ray Data interoperates with libraries like pandas, NumPy, and Arrow.

.. tab-set::

.. tab-item:: Python objects

    To create a :class:`~ray.data.dataset.Dataset` from Python objects, call
    :func:`~ray.data.from_items` and pass in a list of ``Dict``. Ray Data treats
    each ``Dict`` as a row.

    .. testcode::

        import ray

        ds = ray.data.from_items([
            {"food": "spam", "price": 9.34},
            {"food": "ham", "price": 5.37},
            {"food": "eggs", "price": 0.94}
        ])

        print(ds)

    .. testoutput::

        shape: (3, 2)
        ╭────────┬────────╮
        │ food   ┆ price  │
        │ ---    ┆ ---    │
        │ string ┆ double │
        ╞════════╪════════╡
        │ spam   ┆ 9.34   │
        │ ham    ┆ 5.37   │
        │ eggs   ┆ 0.94   │
        ╰────────┴────────╯
        (Showing 3 of 3 rows)

    You can also create a :class:`~ray.data.dataset.Dataset` from a list of regular
    Python objects. In the schema, the column name defaults to "item". 

    .. testcode::

        import ray

        ds = ray.data.from_items([1, 2, 3, 4, 5])

        print(ds)

    .. testoutput::

        shape: (5, 1)
        ╭───────╮
        │ item  │
        │ ---   │
        │ int64 │
        ╞═══════╡
        │ 1     │
        │ 2     │
        │ 3     │
        │ 4     │
        │ 5     │
        ╰───────╯
        (Showing 5 of 5 rows)

.. tab-item:: NumPy

    To create a :class:`~ray.data.dataset.Dataset` from a NumPy array, call
    :func:`~ray.data.from_numpy`. Ray Data treats the outer axis as the row
    dimension.

    .. testcode::

        import numpy as np
        import ray

        array = np.arange(3)
        ds = ray.data.from_numpy(array)

        print(ds)

    .. testoutput::

        shape: (3, 1)
        ╭───────╮
        │ data  │
        │ ---   │
        │ int64 │
        ╞═══════╡
        │ 0     │
        │ 1     │
        │ 2     │
        ╰───────╯
        (Showing 3 of 3 rows)

.. tab-item:: pandas

    To create a :class:`~ray.data.dataset.Dataset` from a pandas DataFrame, call
    :func:`~ray.data.from_pandas`.

    .. testcode::

        import pandas as pd
        import ray

        df = pd.DataFrame({
            "food": ["spam", "ham", "eggs"],
            "price": [9.34, 5.37, 0.94]
        })
        ds = ray.data.from_pandas(df)

        print(ds)

    .. testoutput::

        shape: (3, 2)
        ╭────────┬────────╮
        │ food   ┆ price  │
        │ ---    ┆ ---    │
        │ object ┆ double │
        ╞════════╪════════╡
        │ spam   ┆ 9.34   │
        │ ham    ┆ 5.37   │
        │ eggs   ┆ 0.94   │
        ╰────────┴────────╯
        (Showing 3 of 3 rows)

.. tab-item:: PyArrow

    To create a :class:`~ray.data.dataset.Dataset` from an Arrow table, call
    :func:`~ray.data.from_arrow`.

    .. testcode::

        import pyarrow as pa

        table = pa.table({
            "food": ["spam", "ham", "eggs"],
            "price": [9.34, 5.37, 0.94]
        })
        ds = ray.data.from_arrow(table)

        print(ds)

    .. testoutput::

        shape: (3, 2)
        ╭────────┬────────╮
        │ food   ┆ price  │
        │ ---    ┆ ---    │
        │ string ┆ double │
        ╞════════╪════════╡
        │ spam   ┆ 9.34   │
        │ ham    ┆ 5.37   │
        │ eggs   ┆ 0.94   │
        ╰────────┴────────╯
        (Showing 3 of 3 rows)

.. _loading_datasets_from_distributed_df:

Loading data from distributed DataFrame libraries


Ray Data interoperates with distributed data processing frameworks like `Daft <https://www.getdaft.io>`_,
:ref:`Dask <dask-on-ray>`, :ref:`Spark <spark-on-ray>`, :ref:`Modin <modin-on-ray>`, and
:ref:`Mars <mars-on-ray>`.

.. note::

    The Ray Community provides these operations but may not actively maintain them. If you run into issues,
    create a GitHub issue `here <https://github.com/ray-project/ray/issues>`__.

.. tab-set::

    .. tab-item:: Daft

        To create a :class:`~ray.data.dataset.Dataset` from a `Daft DataFrame <https://docs.getdaft.io/en/stable/api/dataframe/>`_, call
        :func:`~ray.data.from_daft`. This function executes the Daft dataframe and constructs a ``Dataset`` backed by the resultant arrow data produced
        by your Daft query.

        .. warning::
            :func:`~ray.data.from_daft` doesn't work with PyArrow 14 and later. For more
            information, see `this issue <https://github.com/ray-project/ray/issues/54837>`__.

        .. testcode::
            :skipif: True

            import daft
            import ray

            df = daft.from_pydict({"int_col": [i for i in range(10000)], "str_col": [str(i) for i in range(10000)]})
            ds = ray.data.from_daft(df)

            ds.show(3)

        .. testoutput::

            {'int_col': 0, 'str_col': '0'}
            {'int_col': 1, 'str_col': '1'}
            {'int_col': 2, 'str_col': '2'}

    .. tab-item:: Dask

        To create a :class:`~ray.data.dataset.Dataset` from a
        `Dask DataFrame <https://docs.dask.org/en/stable/dataframe.html>`__, call
        :func:`~ray.data.from_dask`. This function constructs a
        ``Dataset`` backed by the distributed Pandas DataFrame partitions that underly
        the Dask DataFrame.

        ..
          We skip the code snippet below because `from_dask` doesn't work with PyArrow 
          14 and later. For more information, see https://github.com/ray-project/ray/issues/54837

        .. testcode::
            :skipif: True

            import dask.dataframe as dd
            import pandas as pd
            import ray

            df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))})
            ddf = dd.from_pandas(df, npartitions=4)
            # Create a Dataset from a Dask DataFrame.
            ds = ray.data.from_dask(ddf)

            ds.show(3)

        .. testoutput::

            {'col1': 0, 'col2': '0'}
            {'col1': 1, 'col2': '1'}
            {'col1': 2, 'col2': '2'}

    .. tab-item:: Spark

        To create a :class:`~ray.data.dataset.Dataset` from a `Spark DataFrame
        <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/dataframe.html>`__,
        call :func:`~ray.data.from_spark`. This function creates a ``Dataset`` backed by
        the distributed Spark DataFrame partitions that underly the Spark DataFrame.

        ..
            TODO: This code snippet might not work correctly. We should test it.

        .. testcode::
            :skipif: True

            import ray
            import raydp

            spark = raydp.init_spark(app_name="Spark -> Datasets Example",
                                    num_executors=2,
                                    executor_cores=2,
                                    executor_memory="500MB")
            df = spark.createDataFrame([(i, str(i)) for i in range(10000)], ["col1", "col2"])
            ds = ray.data.from_spark(df)

            ds.show(3)

        .. testoutput::

            {'col1': 0, 'col2': '0'}
            {'col1': 1, 'col2': '1'}
            {'col1': 2, 'col2': '2'}

    .. tab-item:: Iceberg

        To create a :class:`~ray.data.dataset.Dataset` from an `Iceberg Table
        <https://iceberg.apache.org>`__,
        call :func:`~ray.data.read_iceberg`. This function creates a ``Dataset`` backed by
        the distributed files that underlie the Iceberg table.

        .. testcode::
            :skipif: True

            import ray
            from pyiceberg.expressions import EqualTo

            ds = ray.data.read_iceberg(
                table_identifier="db_name.table_name",
                row_filter=EqualTo("column_name", "literal_value"),
                catalog_kwargs={"name": "default", "type": "glue"}
            )
            ds.show(3)

        .. testoutput::
            :options: +MOCK

            {'col1': 0, 'col2': '0'}
            {'col1': 1, 'col2': '1'}
            {'col1': 2, 'col2': '2'}

    .. tab-item:: Modin

        To create a :class:`~ray.data.dataset.Dataset` from a Modin DataFrame, call
        :func:`~ray.data.from_modin`. This function constructs a ``Dataset`` backed by
        the distributed Pandas DataFrame partitions that underly the Modin DataFrame.

        .. testcode::

            import modin.pandas as md
            import pandas as pd
            import ray

            df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))})
            mdf = md.DataFrame(df)
            # Create a Dataset from a Modin DataFrame.
            ds = ray.data.from_modin(mdf)

            ds.show(3)

        .. testoutput::

            {'col1': 0, 'col2': '0'}
            {'col1': 1, 'col2': '1'}
            {'col1': 2, 'col2': '2'}

    .. tab-item:: Mars

        To create a :class:`~ray.data.dataset.Dataset` from a Mars DataFrame, call
        :func:`~ray.data.from_mars`. This function constructs a ``Dataset``
        backed by the distributed Pandas DataFrame partitions that underly the Mars
        DataFrame.

        .. testcode::
            :skipif: True

            import mars
            import mars.dataframe as md
            import pandas as pd
            import ray

            cluster = mars.new_cluster_in_ray(worker_num=2, worker_cpu=1)

            df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))})
            mdf = md.DataFrame(df, num_partitions=8)
            # Create a tabular Dataset from a Mars DataFrame.
            ds = ray.data.from_mars(mdf)

            ds.show(3)

        .. testoutput::

            {'col1': 0, 'col2': '0'}
            {'col1': 1, 'col2': '1'}
            {'col1': 2, 'col2': '2'}

.. _loading_huggingface_datasets:

Loading Hugging Face datasets
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

To read datasets from the Hugging Face Hub, use :func:`~ray.data.read_parquet` (or other
read functions) with the ``HfFileSystem`` filesystem. This approach provides better
performance and scalability than loading datasets into memory first.

First, install the required dependencies

.. code-block:: console

    pip install huggingface_hub

Set your Hugging Face token to authenticate. While public datasets can be read without
a token, Hugging Face rate limits are more aggressive without a token. To read Hugging
Face datasets without a token, simply set the filesystem argument to ``HfFileSystem()``.

.. code-block:: console

    export HF_TOKEN=<YOUR HUGGING FACE TOKEN>

For most Hugging Face datasets, the data is stored in Parquet files. You can directly
read from the dataset path:

.. testcode::
    :skipif: True

    import os
    import ray
    from huggingface_hub import HfFileSystem

    ds = ray.data.read_parquet(
        "hf://datasets/wikimedia/wikipedia",
        file_extensions=["parquet"],
        filesystem=HfFileSystem(token=os.environ["HF_TOKEN"]),
    )

    print(f"Dataset count: {ds.count()}")
    print(ds.schema())

.. testoutput::

    Dataset count: 61614907
    Column  Type
    ------  ----
    id      string
    url     string
    title   string
    text    string

.. tip::

    If you encounter serialization errors when reading from Hugging Face filesystems, try upgrading ``huggingface_hub`` to version 1.1.6 or later. For more details, see this issue: https://github.com/ray-project/ray/issues/59029



.. _loading_datasets_from_ml_libraries:

Loading data from ML libraries
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Ray Data interoperates with PyTorch and TensorFlow datasets.

.. tab-set::

    .. tab-item:: HuggingFace

        To load a HuggingFace Dataset into Ray Data, use the HuggingFace Hub ``HfFileSystem``
        with :func:`~ray.data.read_parquet`, :func:`~ray.data.read_csv`, or :func:`~ray.data.read_json`.
        Since HuggingFace datasets are often backed by these file formats, this approach enables efficient distributed
        reads directly from the Hub.

        .. testcode::
            :skipif: True

            import ray.data
            from huggingface_hub import HfFileSystem

            path = "hf://datasets/Salesforce/wikitext/wikitext-2-raw-v1/"
            fs = HfFileSystem()
            ds = ray.data.read_parquet(path, filesystem=fs)
            print(ds.take(5))

        .. testoutput::
            :options: +MOCK

            [{'text': '...'}, {'text': '...'}]

    .. tab-item:: PyTorch

        To convert a PyTorch dataset to a Ray Dataset, call :func:`~ray.data.from_torch`.

        .. testcode::

            import ray
            from torch.utils.data import Dataset
            from torchvision import datasets
            from torchvision.transforms import ToTensor

            tds = datasets.CIFAR10(root="data", train=True, download=True, transform=ToTensor())
            ds = ray.data.from_torch(tds)

            print(ds)

        .. testoutput::
            :options: +MOCK

            Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to data/cifar-10-python.tar.gz
            100%|███████████████████████| 170498071/170498071 [00:07<00:00, 23494838.54it/s]
            Extracting data/cifar-10-python.tar.gz to data
            Dataset(num_rows=50000, schema={item: object})


    .. tab-item:: TensorFlow

        To convert a TensorFlow dataset to a Ray Dataset, call :func:`~ray.data.from_tf`.

        .. warning::
            :class:`~ray.data.from_tf` doesn't support parallel reads. Only use this
            function with small datasets like MNIST or CIFAR.

        .. testcode::

            import ray
            import tensorflow_datasets as tfds

            tf_ds, _ = tfds.load("cifar10", split=["train", "test"])
            ds = ray.data.from_tf(tf_ds)

            print(ds)

        ..
            The following `testoutput` is mocked to avoid illustrating download logs like
            "Downloading and preparing dataset 162.17 MiB".

        .. testoutput::
            :options: +MOCK

            MaterializedDataset(
               num_blocks=...,
               num_rows=50000,
               schema={
                  id: binary,
                  image: ArrowTensorTypeV2(shape=(32, 32, 3), dtype=uint8),
                  label: int64
               }
            )

Reading databases
=================

Ray Data reads from databases like MySQL, PostgreSQL, MongoDB, and BigQuery.

.. _reading_sql:

Reading SQL databases
~~~~~~~~~~~~~~~~~~~~~

Call :func:`~ray.data.read_sql` to read data from a database that provides a
`Python DB API2-compliant <https://peps.python.org/pep-0249/>`_ connector.

.. tab-set::

    .. tab-item:: MySQL

        To read from MySQL, install
        `MySQL Connector/Python <https://dev.mysql.com/doc/connector-python/en/>`_. It's the
        first-party MySQL database connector.

        .. code-block:: console

            pip install mysql-connector-python

        Then, define your connection logic and query the database.

        .. testcode::
            :skipif: True

            import mysql.connector

            import ray

            def create_connection():
                return mysql.connector.connect(
                    user="admin",
                    password=...,
                    host="example-mysql-database.c2c2k1yfll7o.us-west-2.rds.amazonaws.com",
                    connection_timeout=30,
                    database="example",
                )

            # Get all movies
            dataset = ray.data.read_sql("SELECT * FROM movie", create_connection)
            # Get movies after the year 1980
            dataset = ray.data.read_sql(
                "SELECT title, score FROM movie WHERE year >= 1980", create_connection
            )
            # Get the number of movies per year
            dataset = ray.data.read_sql(
                "SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
            )


    .. tab-item:: PostgreSQL

        To read from PostgreSQL, install `Psycopg 2 <https://www.psycopg.org/docs>`_. It's
        the most popular PostgreSQL database connector.

        .. code-block:: console

            pip install psycopg2-binary

        Then, define your connection logic and query the database.

        .. testcode::
            :skipif: True

            import psycopg2

            import ray

            def create_connection():
                return psycopg2.connect(
                    user="postgres",
                    password=...,
                    host="example-postgres-database.c2c2k1yfll7o.us-west-2.rds.amazonaws.com",
                    dbname="example",
                )

            # Get all movies
            dataset = ray.data.read_sql("SELECT * FROM movie", create_connection)
            # Get movies after the year 1980
            dataset = ray.data.read_sql(
                "SELECT title, score FROM movie WHERE year >= 1980", create_connection
            )
            # Get the number of movies per year
            dataset = ray.data.read_sql(
                "SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
            )

    .. tab-item:: Snowflake

        To read from Snowflake, install the
        `Snowflake Connector for Python <https://docs.snowflake.com/en/user-guide/python-connector>`_.

        .. code-block:: console

            pip install snowflake-connector-python

        Then, define your connection logic and query the database.

        .. testcode::
            :skipif: True

            import snowflake.connector

            import ray

            def create_connection():
                return snowflake.connector.connect(
                    user=...,
                    password=...
                    account="ZZKXUVH-IPB52023",
                    database="example",
                )

            # Get all movies
            dataset = ray.data.read_sql("SELECT * FROM movie", create_connection)
            # Get movies after the year 1980
            dataset = ray.data.read_sql(
                "SELECT title, score FROM movie WHERE year >= 1980", create_connection
            )
            # Get the number of movies per year
            dataset = ray.data.read_sql(
                "SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
            )


    .. tab-item:: Databricks

        To read from Databricks, set the ``DATABRICKS_TOKEN`` environment variable to
        your Databricks warehouse access token.

        .. code-block:: console

            export DATABRICKS_TOKEN=...

        If you're not running your program on the Databricks runtime, also set the
        ``DATABRICKS_HOST`` environment variable.

        .. code-block:: console

            export DATABRICKS_HOST=adb-<workspace-id>.<random-number>.azuredatabricks.net

        Then, call :func:`ray.data.read_databricks_tables` to read from the Databricks
        SQL warehouse.

        .. testcode::
            :skipif: True

            import ray

            dataset = ray.data.read_databricks_tables(
                warehouse_id='...',  # Databricks SQL warehouse ID
                catalog='catalog_1',  # Unity catalog name
                schema='db_1',  # Schema name
                query="SELECT title, score FROM movie WHERE year >= 1980",
            )

    .. tab-item:: BigQuery

        To read from BigQuery, install the
        `Python Client for Google BigQuery <https://cloud.google.com/python/docs/reference/bigquery/latest>`_ and the `Python Client for Google BigQueryStorage <https://cloud.google.com/python/docs/reference/bigquerystorage/latest>`_.

        .. code-block:: console

            pip install google-cloud-bigquery
            pip install google-cloud-bigquery-storage

        To read data from BigQuery, call :func:`~ray.data.read_bigquery` and specify the project id, dataset, and query (if applicable).

        .. testcode::
            :skipif: True

            import ray

            # Read the entire dataset. Do not specify query.
            ds = ray.data.read_bigquery(
                project_id="my_gcloud_project_id",
                dataset="bigquery-public-data.ml_datasets.iris",
            )

            # Read from a SQL query of the dataset. Do not specify dataset.
            ds = ray.data.read_bigquery(
                project_id="my_gcloud_project_id",
                query = "SELECT * FROM `bigquery-public-data.ml_datasets.iris` LIMIT 50",
            )

            # Write back to BigQuery
            ds.write_bigquery(
                project_id="my_gcloud_project_id",
                dataset="destination_dataset.destination_table",
                overwrite_table=True,
            )

.. _reading_mongodb:

Reading MongoDB
~~~~~~~~~~~~~~~

To read data from MongoDB, call :func:`~ray.data.read_mongo` and specify
the source URI, database, and collection. You also need to specify a pipeline to
run against the collection.

.. testcode::
    :skipif: True

    import ray

    # Read a local MongoDB.
    ds = ray.data.read_mongo(
        uri="mongodb://localhost:27017",
        database="my_db",
        collection="my_collection",
        pipeline=[{"$match": {"col": {"$gte": 0, "$lt": 10}}}, {"$sort": "sort_col"}],
    )

    # Reading a remote MongoDB is the same.
    ds = ray.data.read_mongo(
        uri="mongodb://username:[email protected]:27017/?authSource=admin",
        database="my_db",
        collection="my_collection",
        pipeline=[{"$match": {"col": {"$gte": 0, "$lt": 10}}}, {"$sort": "sort_col"}],
    )

    # Write back to MongoDB.
    ds.write_mongo(
        MongoDatasource(),
        uri="mongodb://username:[email protected]:27017/?authSource=admin",
        database="my_db",
        collection="my_collection",
    )

Reading from Kafka
======================

Ray Data reads from message queues like Kafka.

.. _reading_kafka:

To read data from Kafka topics, call :func:`~ray.data.read_kafka` and specify
the topic names and broker addresses. Ray Data performs bounded reads between
a start and end offset. You can specify offsets as integers, ``"earliest"``/``"latest"``
strings, or ``datetime`` objects for time-based ranges.

First, install the required dependencies:

.. code-block:: console

    pip install confluent-kafka

Then, specify your Kafka configuration and read from topics.

.. testcode::
    :skipif: True

    import ray

    # Read from a single topic with offset range
    ds = ray.data.read_kafka(
        topics="my-topic",
        bootstrap_servers="localhost:9092",
        start_offset=0,
        end_offset=1000,
    )

    # Read from multiple topics
    ds = ray.data.read_kafka(
        topics=["topic1", "topic2"],
        bootstrap_servers="localhost:9092",
        start_offset="earliest",
        end_offset="latest",
    )

    # Read messages within a datetime range (datetimes with no timezone info are treated as UTC)
    from datetime import datetime
    ds = ray.data.read_kafka(
        topics="my-topic",
        bootstrap_servers="localhost:9092",
        start_offset=datetime(2025, 1, 1),
        end_offset=datetime(2025, 1, 2),
    )

    # Read with authentication (Confluent/librdkafka options)
    ds = ray.data.read_kafka(
        topics="secure-topic",
        bootstrap_servers="localhost:9092",
        consumer_config={
            "security.protocol": "SASL_SSL",
            "sasl.mechanism": "PLAIN",
            "sasl.username": "your-username",
            "sasl.password": "your-password",
        },
    )

    print(ds.schema())

.. testoutput::

    Column          Type
    ------          ----
    offset          int64
    key             binary
    value           binary
    topic           string
    partition       int32
    timestamp       int64
    timestamp_type  int32
    headers         map<string, binary>

Creating synthetic data
=======================

Synthetic datasets can be useful for testing and benchmarking.

.. tab-set::

    .. tab-item:: Int Range

        To create a synthetic :class:`~ray.data.Dataset` from a range of integers, call
        :func:`~ray.data.range`. Ray Data stores the integer range in a single column called
        "id".

        .. testcode::

            import ray

            ds = ray.data.range(10000)

            print(ds.schema())

        .. testoutput::

            Column  Type
            ------  ----
            id      int64

    .. tab-item:: Tensor Range

        To create a synthetic :class:`~ray.data.Dataset` containing arrays, call
        :func:`~ray.data.range_tensor`. Ray Data packs an integer range into ndarrays of
        the provided shape. In the schema, the column name defaults to "data". 

        .. testcode::

            import ray

            ds = ray.data.range_tensor(10, shape=(64, 64))

            print(ds.schema())

        .. testoutput::

            Column  Type
            ------  ----
            data    ArrowTensorTypeV2(shape=(64, 64), dtype=int64)

Loading other datasources
==========================

If Ray Data can't load your data, subclass
:class:`~ray.data.Datasource`. Then, construct an instance of your custom
datasource and pass it to :func:`~ray.data.read_datasource`. To write results, you might
also need to subclass :class:`ray.data.Datasink`. Then, create an instance of your custom
datasink and pass it to :func:`~ray.data.Dataset.write_datasink`. For more details, see
:ref:`Advanced: Read and Write Custom File Types <custom_datasource>`.

.. testcode::
    :skipif: True

    # Read from a custom datasource.
    ds = ray.data.read_datasource(YourCustomDatasource(), **read_args)

    # Write to a custom datasink.
    ds.write_datasink(YourCustomDatasink())

Performance considerations
==========================

By default, the number of output blocks from all read tasks is dynamically decided
based on input data size and available resources. It should work well in most cases.
However, you can also override the default value by setting the ``override_num_blocks``
argument. Ray Data decides internally how many read tasks to run concurrently to best
utilize the cluster, ranging from ``1...override_num_blocks`` tasks. In other words,
the higher the ``override_num_blocks``, the smaller the data blocks in the Dataset and
hence more opportunities for parallel execution.

For more information on how to tune the number of output blocks and other suggestions
for optimizing read performance, see `Optimizing reads <performance-tips.html#optimizing-reads>`__.