Back to Spark

Chapter 1: DataFrames - A view into your structured data

python/docs/source/user_guide/dataframes.ipynb

4.1.17.8 KB
Original Source

Chapter 1: DataFrames - A view into your structured data

python
pip install pyspark
python
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

This section introduces the most fundamental data structure in PySpark: the DataFrame.

A DataFrame is a two-dimensional labeled data structure with columns of potentially different types. You can think of a DataFrame like a spreadsheet, a SQL table, or a dictionary of series objects. Apache Spark DataFrames support a rich set of APIs (select columns, filter, join, aggregate, etc.) that allow you to solve common data analysis problems efficiently.

Compared to traditional relational databases, Spark DataFrames offer several key advantages for big data processing and analytics:

  • Distributed computing: Spark distributes data across multiple nodes in a cluster, allowing for parallel processing of big data
  • In-memory processing: Spark performs computations in memory, which can be significantly faster than disk-based processing
  • Schema flexibility: Unlike traditional databases, PySpark DataFrames support schema evolution and dynamic typing.
  • Fault tolerance: PySpark DataFrames are built on top of Resilient Distributed Dataset (RDDs), which are inherently fault-tolerant. Spark automatically handles node failures and data replication, ensuring data reliability and integrity.

A note on RDDs: Direct use of RDDs are no longer supported on Spark Connect as of Spark 4.0. Interacting directly with Spark DataFrames uses a unified planning and optimization engine, allowing us to get nearly identical performance across all supported languages on Databricks (Python, SQL, Scala, and R).

Create a DataFrame

There are several ways to create a DataFrame in PySpark.

From a list of dictionaries

The simplest way is to use the createDataFrame() method like so:

python
employees = [{"name": "John D.", "age": 30},
  {"name": "Alice G.", "age": 25},
  {"name": "Bob T.", "age": 35},
  {"name": "Eve A.", "age": 28}]

# Create a DataFrame containing the employees data
df = spark.createDataFrame(employees)
df.show()

From a local file

We can also create a DataFrame from a local CSV file:

python
df = spark.read.csv("../data/employees.csv", header=True, inferSchema=True)
df.show()

Or from a local JSON file:

python
df = spark.read.option("multiline","true").json("../data/employees.json")
df.show()

From an existing DataFrame

We can even create a DataFrame from another existing DataFrame, by selecting certain columns:

python
employees = [
  {"name": "John D.", "age": 30, "department": "HR"},
  {"name": "Alice G.", "age": 25, "department": "Finance"},
  {"name": "Bob T.", "age": 35, "department": "IT"},
  {"name": "Eve A.", "age": 28, "department": "Marketing"}
]
df = spark.createDataFrame(employees)

# Select only the name and age columns
new_df = df.select("name", "age")

From a table

If you have an existing table table_name in your Spark environment, you can create a DataFrame like this:

python
df = spark.read.table("table_name")

From a database

If your table is in a database, you can use JDBC to read the table into a DataFrame.

python
url = "jdbc:mysql://localhost:3306/mydatabase"
table = "employees"
properties = {
  "user": "username",
  "password": "password"
}

# Read table into DataFrame
df = spark.read.jdbc(url=url, table=table, properties=properties)

View the DataFrame

We can use PySpark to view and interact with our DataFrame.

Display the DataFrame

df.show() displays a basic visualization of the DataFrame's contents. From our above createDataFrame() example:

python
employees = [{"name": "John D.", "age": 30},
  {"name": "Alice G.", "age": 25},
  {"name": "Bob T.", "age": 35},
  {"name": "Eve A.", "age": 28}]

# Create a DataFrame containing the employees data
df = spark.createDataFrame(employees)
python
df.show()

df.show() has 3 optional arguments: n, truncate, and vertical.

By default, df.show() displays up to the first 20 rows of the DataFrame. We can control the number of rows displayed by passing an argument to the show() method:

python
df.show(n=2)

The truncate argument controls the length of displayed column values (default value is 20):

python
df.show(truncate=3)

If we set vertical to True, the DataFrame will be displayed vertically with one line per value:

python
df.show(vertical=True)

We can view information about the DataFrame schema using the printSchema() method:

python
df.printSchema()

DataFrame Manipulation

Let's look at some ways we can transform our DataFrames.

For more detailed information, please see the section about data manipulation, Chapter 3: Function Junction - Data manipulation with PySpark.

Rename columns

We can rename DataFrame columns using the withColumnRenamed() method:

python
df.show()
df2 = df.withColumnRenamed("name", "full_name")
df2.show()

Filter rows

We can filter for employees within a certain age range. The following df.filter will create a new DataFrame with rows that match our age condition:

python
filtered_df = df.filter((df["age"] > 26) & (df["age"] < 32))
filtered_df.show()

We can also use df.where to get the same result:

python
where_df = df.where((df["age"] > 26) & (df["age"] < 32))
where_df.show()

DataFrames vs. Tables

A DataFrame is an immutable distributed collection of data, only available in the current Spark session.

A table is a persistent data structure that can be accessed across multiple Spark sessions.

If you wish to promote a DataFrame to a table, you can use the createOrReplaceTempView() method:

python
df.createOrReplaceTempView("employees")

Note that the lifetime of this temporary table is tied to the SparkSession that was used to create this DataFrame. To persist the table beyond this Spark session, you will need to save it to persistent storage.

Save DataFrame to Persistent Storage

There are several ways to save a DataFrame to persistent storage in PySpark. For more detailed information about saving data locally, see Chapter 7: Load and Behold - Data loading, storage, file formats.

Save to file-based data source

For file-based data source (text, parquet, json, etc.), you can specify a custom table path like so:

python
df.write.option("path", "../dataout").saveAsTable("dataframes_savetable_example")

Even if the table is dropped, the custom table path and table data will still be there.

If no custom table path is specified, Spark will write data to a default table path under the warehouse directory. When the table is dropped, the default table path will be removed too.

Save to Hive metastore

To save to Hive metastore, you can use the following:

python
df.write().mode("overwrite").saveAsTable("schemaName.tableName")

Native DataFrame Plotting

PySpark supports native plotting, allowing users to visualize data directly from PySpark DataFrames.

The user interacts with PySpark Plotting by calling the plot property on a PySpark DataFrame and specifying the desired type of plot, either as a submethod or by setting the kind parameter. For instance:

df.plot.line(x="category", y="int_val")

or equivalently:

df.plot(kind="line", x="category", y="int_val")

The feature is powered by Plotly as the default visualization backend, offering rich, interactive plotting capabilities, while native pandas is used internally to process data for most plots.