Back to Spark

Chapter 6: Old SQL, New Tricks - Running SQL on PySpark

python/docs/source/user_guide/sql.ipynb

4.1.17.1 KB
Original Source
python
import warnings
warnings.filterwarnings('ignore')

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("error")

Chapter 6: Old SQL, New Tricks - Running SQL on PySpark

Introduction

This section explains how to use the Spark SQL API in PySpark and compare it with the DataFrame API. It also covers how to switch between the two APIs seamlessly, along with some practical tips and tricks.

Running SQL with PySpark

PySpark offers two main ways to perform SQL operations:

Using spark.sql()

The spark.sql() function allows you to execute SQL queries directly.

python
# Create a table via spark.sql()
spark.sql("DROP TABLE IF EXISTS people")
spark.sql("""
CREATE TABLE people USING PARQUET
AS SELECT * FROM VALUES (1, 'Alice', 10), (2, 'Bob', 20), (3, 'Charlie', 30) t(id, name, age)
""")
python
# Use spark.sql() to select data from a table
spark.sql("SELECT name, age FROM people WHERE age > 21").show()

Using the PySpark DataFrame API

The PySpark DataFrame API provides equivalent functionality to SQL but with a Pythonic approach.

python
# Read a table using the DataFrame API
people_df = spark.read.table("people")

# Use DataFrame API to select data
people_df.select("name", "age").filter("age > 21").show()

SQL vs. DataFrame API in PySpark

When to use which API depends on your background and the specific task:

SQL API:

  • Ideal for users with SQL backgrounds who are more comfortable writing SQL queries.

DataFrame API:

  • Preferred by Python developers as it aligns with Python syntax and idioms.
  • Provides greater flexibility for complex transformations, especially with user-defined functions (UDFs).

Code Examples: SQL vs. DataFrame API

Here are some examples comparing how common tasks are performed using the SQL API and PySpark's DataFrame API to give you an idea of their differences and when one might be more suitable than the other.

Example: SELECT and FILTER Operation

SQL API:

python
spark.sql("SELECT name FROM people WHERE age > 21").show()

DataFrame API:

python
spark.read.table("people").select("name").filter("age > 21").show()

Example: JOIN Operation

python
spark.sql("DROP TABLE IF EXISTS orders")
spark.sql("""
CREATE TABLE orders USING PARQUET 
AS SELECT * FROM VALUES (101, 1, 200), (102, 2, 150), (103,3, 300) t(order_id, customer_id, amount)
""")

SQL API:

python
spark.sql("""
SELECT p.name, o.order_id
FROM people p
JOIN orders o ON p.id = o.customer_id
""").show()

DataFrame API:

python
people_df = spark.read.table("people")
orders_df = spark.read.table("orders")
(
    people_df
        .join(orders_df, people_df.id == orders_df.customer_id)
        .select(people_df.name, orders_df.order_id)
        .show()
)

Example: GROUP BY and Aggregate Operation

SQL API:

python
spark.sql("""
SELECT p.name, SUM(o.amount) AS total_amount
FROM people p
JOIN orders o ON p.id = o.customer_id
GROUP BY p.name
""").show()

DataFrame API:

python
from pyspark.sql.functions import sum

(
    people_df
        .join(orders_df, people_df.id == orders_df.customer_id)
        .groupBy("name")
        .agg(sum("amount").alias("total_amount"))
        .show()
)

Example: Window Operations

SQL API:

python
spark.sql("""
SELECT
    p.name,
    o.amount, 
    RANK() OVER (PARTITION BY p.name ORDER BY o.amount DESC) AS rank
FROM people p
JOIN orders o ON p.id = o.customer_id
""").show()

DataFrame API:

python
from pyspark.sql.window import Window
from pyspark.sql.functions import rank

# Define the window specification
window_spec = Window.partitionBy("name").orderBy(orders_df.amount.desc())

# Window operation with RANK
(
    people_df
        .join(orders_df, people_df.id == orders_df.customer_id)
        .withColumn("rank", rank().over(window_spec))
        .select("name", "amount", "rank")
        .show()
)

Example: UNION Operation

SQL API:

  • The UNION operator combines rows from two queries and removes duplicates by default.
python
spark.sql("CREATE OR REPLACE TEMP VIEW people2 AS SELECT * FROM VALUES (1, 'Alice', 10), (4, 'David', 35) t(id, name, age)")
python
spark.sql("""
SELECT * FROM people
UNION
SELECT * FROM people2
""").show()

DataFrame API:

  • The union() method is used to combine two DataFrames, but it does not remove duplicates by default.
  • To match the behavior of SQL's UNION, we use the .dropDuplicates() method to eliminate duplicates after the union operation.
python
people_df = spark.read.table("people")
people2_df = spark.read.table("people2")
# This will have duplicate values.
people_df.union(people2_df).show()
python
# Remove duplicate values
people_df.union(people2_df).dropDuplicates().show()

Example: SET Configurations

SQL API:

python
spark.sql("SET spark.sql.shuffle.partitions=8")
python
spark.sql("SET spark.sql.shuffle.partitions").show(truncate=False)

DataFrame API:

python
spark.conf.set("spark.sql.shuffle.partitions", 10)
python
spark.conf.get("spark.sql.shuffle.partitions")

Example: Listing Tables and Views

SQL API:

python
spark.sql("SHOW TABLES").show()

DataFrame API:

python
tables = spark.catalog.listTables()
for table in tables:
    print(f"Name: {table.name}, isTemporary: {table.isTemporary}")

DataFrame API Exclusive Functions

Certain operations are exclusive to the DataFrame API and are not supported in SQL, such as:

withColumn: Adds or modifies columns in a DataFrame.

python
people_df.withColumn("new_col", people_df["age"] + 10).show()
python
people_df.withColumn("age", people_df["age"] + 10).show()

Using SQL and DataFrame API Interchangeably

PySpark supports switching between SQL and DataFrame API, making it easy to mix and match.

Chaining DataFrame Operations on SQL Outputs

PySpark’s DataFrame API allows you to chain multiple operations together to create efficient and readable transformations.

python
# Chaining DataFrame operations on SQL results
spark.sql("SELECT name, age FROM people").filter("age > 21").show()

Using selectExpr()

The selectExpr() method allows you to run SQL expressions within the DataFrame API.

python
people_df.selectExpr("name", "age + 1 AS age_plus_one").show()

Querying a DataFrame in SQL

You can create a temporary view from a DataFrame and run SQL queries on it.

python
# First create a temp view on top of the DataFrame.
people_df.createOrReplaceTempView("people_view")

# Then it can be referenced in SQL.
spark.sql("SELECT * FROM people_view WHERE age > 21").show()

Use Python User-Defined Functions in SQL

You can register Python user-defined functions (UDFs) for use within SQL queries, enabling custom transformations within SQL syntax.

python
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define the UDF
@udf("string")
def uppercase_name(name):
    return name.upper()

# Register the UDF
spark.udf.register("uppercase_name", uppercase_name)

# Use it in SQL
spark.sql("SELECT name, uppercase_name(name) FROM people_view WHERE age > 21").show()