Back to Daft

Skip this notebook execution in CI because it requires AWS credentials for presigned URL generation

tutorials/talks_and_demos/data-ai-summit-2024.ipynb

0.7.105.2 KB
Original Source
python
!pip install daft deltalake<0.17
python
CI = False
python
# Skip this notebook execution in CI because it requires AWS credentials for presigned URL generation
if CI:
    import sys

    sys.exit()

Multimodal data lake annotation and indexing

Let's go from: Images in an S3 Bucket

To: Multimodal Data Lake where we can run queries efficiently to power analytics, retrieval and more!

python
import daft

IO_CONFIG = daft.io.IOConfig(s3=daft.io.S3Config(anonymous=True))  # Use anonymous S3 access

daft.set_planning_config(default_io_config=IO_CONFIG)
python
df = daft.from_glob_path(
    "s3://daft-public-data/open-images/validation-images/*",
)
df.show()

Working with URLs in Daft is really easy and efficient

  • URLs are extremely common when working with multimodal data, most commonly as a https:// URL or s3:// object store URL
  • Daft runs URL downloads using async Rust kernels, saturating your machine's network bandwidth even for millions of small files (see: demo at PyData Global 2023)
python
df = df.with_column("image_bytes", df["path"].download())
df.show()

Reading Images

Daft makes working with opaque file formats/encodings easy

  • Native type available for images and tensors
  • Support for arbitrary Python objects in columns so you can use all your favorite Python libraries as well for datatypes not yet supported by Daft (e.g. video, audio, PDFs)
python
df = df.with_column("image", df["image_bytes"].decode_image())
df.show()

Thumbnail creation

Easily create thumbnails for your image using the resize(...) Daft expression.

python
df = df.with_column("image_thumbnail", df["image"].resize(32, 32))
df.show()

Running multimodal LLMs

Since we are running on just our laptop, we will be offloading our "heavy compute" (running the GPT-4o model on our image) to the OpenAI API.

If instead we wanted to run our own models or algorithms, Daft also lets us run on GPUs with the df.with_column(..., resource_request=ResourceRequest(num_gpus=1)) pattern.

python
import json
import os

import boto3
import requests

DEFAULT_PROMPT = "What’s in this image?"
api_key = os.getenv("OPENAI_API_KEY")
if api_key is None:
    raise RuntimeError("Please specify your OpenAI API key as the environment variable `OPENAI_API_KEY`.")

headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"}


@daft.udf(return_dtype=daft.DataType.string())
def generate_presigned_url(s3_urls, expires_in=3600):
    """Generate a presigned Amazon S3 URLs."""
    s3_client = boto3.client("s3")
    presigned_urls = []
    for s3_url in s3_urls.to_pylist():
        bucket, key = s3_url.strip("s3://").split("/", 1)
        url = s3_client.generate_presigned_url(
            ClientMethod="get_object", Params={"Bucket": bucket, "Key": key}, ExpiresIn=expires_in
        )
        presigned_urls.append(url)
    return presigned_urls


@daft.udf(return_dtype=daft.DataType.string())
def run_gpt4o_on_urls(images_urls, prompt=DEFAULT_PROMPT):
    """Run the gpt-4o LLM by making an API call to OpenAI."""
    results = []
    for url in images_urls.to_pylist():
        payload = {
            "model": "gpt-4o",
            "messages": [
                {
                    "role": "user",
                    "content": [
                        {"type": "text", "text": "What’s in this image?"},
                        {"type": "image_url", "image_url": {"url": url}},
                    ],
                }
            ],
            "max_tokens": 300,
        }

        response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)
        results.append(json.dumps(response.json()))

    return results
python
# Generate temporary URLs with a short expiration time
df = df.with_column("image_urls", generate_presigned_url(df["path"]))

# Make remote API calls to OpenAI endpoint
df = df.with_column("gpt_results", run_gpt4o_on_urls(df["image_urls"], prompt="What’s in this image?"))

# Parse JSON outputs from OpenAI endpoint
df = df.with_column("description", df["gpt_results"].json.query(".choices[0].message.content"))

df.show(3)
python
df = df.select(
    # Larger multimodal data (such as large images or documents) can be written as URLs
    "path",
    # Small multimodal data (such as thumbnails or full-form text) can be written inline
    df["image_thumbnail"].encode_image("JPEG"),
    # Metadata such as size in bytes and descriptions should be stored as per normal
    "size",
    "description",
)
python
df
python
# Limit to running just 8 rows to save your OpenAI bill...
df = df.limit(8)

df.write_delta("my_table.delta_lake")

Now we have our "Multimodal Data Lake"!

  1. Thumbnails readily available for visualization
  2. URLs available for access to the raw data
  3. Extracted metadata (description) available for querying
python
read_df = daft.read_deltalake("my_table.delta_lake")
read_df
python
read_df = read_df.with_column("image_thumbnail", daft.col("image_thumbnail").decode_image()).where(
    read_df["description"].contains("dog")
)
python
read_df.collect()