python/docs/source/user_guide/sql.ipynb
import warnings
warnings.filterwarnings('ignore')
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("error")
This section explains how to use the Spark SQL API in PySpark and compare it with the DataFrame API. It also covers how to switch between the two APIs seamlessly, along with some practical tips and tricks.
PySpark offers two main ways to perform SQL operations:
spark.sql()The spark.sql() function allows you to execute SQL queries directly.
# Create a table via spark.sql()
spark.sql("DROP TABLE IF EXISTS people")
spark.sql("""
CREATE TABLE people USING PARQUET
AS SELECT * FROM VALUES (1, 'Alice', 10), (2, 'Bob', 20), (3, 'Charlie', 30) t(id, name, age)
""")
# Use spark.sql() to select data from a table
spark.sql("SELECT name, age FROM people WHERE age > 21").show()
The PySpark DataFrame API provides equivalent functionality to SQL but with a Pythonic approach.
# Read a table using the DataFrame API
people_df = spark.read.table("people")
# Use DataFrame API to select data
people_df.select("name", "age").filter("age > 21").show()
When to use which API depends on your background and the specific task:
SQL API:
DataFrame API:
Here are some examples comparing how common tasks are performed using the SQL API and PySpark's DataFrame API to give you an idea of their differences and when one might be more suitable than the other.
SQL API:
spark.sql("SELECT name FROM people WHERE age > 21").show()
DataFrame API:
spark.read.table("people").select("name").filter("age > 21").show()
spark.sql("DROP TABLE IF EXISTS orders")
spark.sql("""
CREATE TABLE orders USING PARQUET
AS SELECT * FROM VALUES (101, 1, 200), (102, 2, 150), (103,3, 300) t(order_id, customer_id, amount)
""")
SQL API:
spark.sql("""
SELECT p.name, o.order_id
FROM people p
JOIN orders o ON p.id = o.customer_id
""").show()
DataFrame API:
people_df = spark.read.table("people")
orders_df = spark.read.table("orders")
(
people_df
.join(orders_df, people_df.id == orders_df.customer_id)
.select(people_df.name, orders_df.order_id)
.show()
)
SQL API:
spark.sql("""
SELECT p.name, SUM(o.amount) AS total_amount
FROM people p
JOIN orders o ON p.id = o.customer_id
GROUP BY p.name
""").show()
DataFrame API:
from pyspark.sql.functions import sum
(
people_df
.join(orders_df, people_df.id == orders_df.customer_id)
.groupBy("name")
.agg(sum("amount").alias("total_amount"))
.show()
)
SQL API:
spark.sql("""
SELECT
p.name,
o.amount,
RANK() OVER (PARTITION BY p.name ORDER BY o.amount DESC) AS rank
FROM people p
JOIN orders o ON p.id = o.customer_id
""").show()
DataFrame API:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank
# Define the window specification
window_spec = Window.partitionBy("name").orderBy(orders_df.amount.desc())
# Window operation with RANK
(
people_df
.join(orders_df, people_df.id == orders_df.customer_id)
.withColumn("rank", rank().over(window_spec))
.select("name", "amount", "rank")
.show()
)
SQL API:
UNION operator combines rows from two queries and removes duplicates by default.spark.sql("CREATE OR REPLACE TEMP VIEW people2 AS SELECT * FROM VALUES (1, 'Alice', 10), (4, 'David', 35) t(id, name, age)")
spark.sql("""
SELECT * FROM people
UNION
SELECT * FROM people2
""").show()
DataFrame API:
union() method is used to combine two DataFrames, but it does not remove duplicates by default.people_df = spark.read.table("people")
people2_df = spark.read.table("people2")
# This will have duplicate values.
people_df.union(people2_df).show()
# Remove duplicate values
people_df.union(people2_df).dropDuplicates().show()
SQL API:
spark.sql("SET spark.sql.shuffle.partitions=8")
spark.sql("SET spark.sql.shuffle.partitions").show(truncate=False)
DataFrame API:
spark.conf.set("spark.sql.shuffle.partitions", 10)
spark.conf.get("spark.sql.shuffle.partitions")
SQL API:
spark.sql("SHOW TABLES").show()
DataFrame API:
tables = spark.catalog.listTables()
for table in tables:
print(f"Name: {table.name}, isTemporary: {table.isTemporary}")
Certain operations are exclusive to the DataFrame API and are not supported in SQL, such as:
withColumn: Adds or modifies columns in a DataFrame.
people_df.withColumn("new_col", people_df["age"] + 10).show()
people_df.withColumn("age", people_df["age"] + 10).show()
PySpark supports switching between SQL and DataFrame API, making it easy to mix and match.
PySpark’s DataFrame API allows you to chain multiple operations together to create efficient and readable transformations.
# Chaining DataFrame operations on SQL results
spark.sql("SELECT name, age FROM people").filter("age > 21").show()
selectExpr()The selectExpr() method allows you to run SQL expressions within the DataFrame API.
people_df.selectExpr("name", "age + 1 AS age_plus_one").show()
You can create a temporary view from a DataFrame and run SQL queries on it.
# First create a temp view on top of the DataFrame.
people_df.createOrReplaceTempView("people_view")
# Then it can be referenced in SQL.
spark.sql("SELECT * FROM people_view WHERE age > 21").show()
You can register Python user-defined functions (UDFs) for use within SQL queries, enabling custom transformations within SQL syntax.
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# Define the UDF
@udf("string")
def uppercase_name(name):
return name.upper()
# Register the UDF
spark.udf.register("uppercase_name", uppercase_name)
# Use it in SQL
spark.sql("SELECT name, uppercase_name(name) FROM people_view WHERE age > 21").show()