tutorials/talks_and_demos/data-ai-summit-2024.ipynb
!pip install daft deltalake<0.17
CI = False
# Skip this notebook execution in CI because it requires AWS credentials for presigned URL generation
if CI:
import sys
sys.exit()
Let's go from: Images in an S3 Bucket
To: Multimodal Data Lake where we can run queries efficiently to power analytics, retrieval and more!
import daft
IO_CONFIG = daft.io.IOConfig(s3=daft.io.S3Config(anonymous=True)) # Use anonymous S3 access
daft.set_planning_config(default_io_config=IO_CONFIG)
df = daft.from_glob_path(
"s3://daft-public-data/open-images/validation-images/*",
)
df.show()
https:// URL or s3:// object store URLdf = df.with_column("image_bytes", df["path"].download())
df.show()
Daft makes working with opaque file formats/encodings easy
df = df.with_column("image", df["image_bytes"].decode_image())
df.show()
Easily create thumbnails for your image using the resize(...) Daft expression.
df = df.with_column("image_thumbnail", df["image"].resize(32, 32))
df.show()
Since we are running on just our laptop, we will be offloading our "heavy compute" (running the GPT-4o model on our image) to the OpenAI API.
If instead we wanted to run our own models or algorithms, Daft also lets us run on GPUs with the df.with_column(..., resource_request=ResourceRequest(num_gpus=1)) pattern.
import json
import os
import boto3
import requests
DEFAULT_PROMPT = "What’s in this image?"
api_key = os.getenv("OPENAI_API_KEY")
if api_key is None:
raise RuntimeError("Please specify your OpenAI API key as the environment variable `OPENAI_API_KEY`.")
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"}
@daft.udf(return_dtype=daft.DataType.string())
def generate_presigned_url(s3_urls, expires_in=3600):
"""Generate a presigned Amazon S3 URLs."""
s3_client = boto3.client("s3")
presigned_urls = []
for s3_url in s3_urls.to_pylist():
bucket, key = s3_url.strip("s3://").split("/", 1)
url = s3_client.generate_presigned_url(
ClientMethod="get_object", Params={"Bucket": bucket, "Key": key}, ExpiresIn=expires_in
)
presigned_urls.append(url)
return presigned_urls
@daft.udf(return_dtype=daft.DataType.string())
def run_gpt4o_on_urls(images_urls, prompt=DEFAULT_PROMPT):
"""Run the gpt-4o LLM by making an API call to OpenAI."""
results = []
for url in images_urls.to_pylist():
payload = {
"model": "gpt-4o",
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": "What’s in this image?"},
{"type": "image_url", "image_url": {"url": url}},
],
}
],
"max_tokens": 300,
}
response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)
results.append(json.dumps(response.json()))
return results
# Generate temporary URLs with a short expiration time
df = df.with_column("image_urls", generate_presigned_url(df["path"]))
# Make remote API calls to OpenAI endpoint
df = df.with_column("gpt_results", run_gpt4o_on_urls(df["image_urls"], prompt="What’s in this image?"))
# Parse JSON outputs from OpenAI endpoint
df = df.with_column("description", df["gpt_results"].json.query(".choices[0].message.content"))
df.show(3)
df = df.select(
# Larger multimodal data (such as large images or documents) can be written as URLs
"path",
# Small multimodal data (such as thumbnails or full-form text) can be written inline
df["image_thumbnail"].encode_image("JPEG"),
# Metadata such as size in bytes and descriptions should be stored as per normal
"size",
"description",
)
df
# Limit to running just 8 rows to save your OpenAI bill...
df = df.limit(8)
df.write_delta("my_table.delta_lake")
description) available for queryingread_df = daft.read_deltalake("my_table.delta_lake")
read_df
read_df = read_df.with_column("image_thumbnail", daft.col("image_thumbnail").decode_image()).where(
read_df["description"].contains("dog")
)
read_df.collect()