python/docs/source/user_guide/dataframes.ipynb
pip install pyspark
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
This section introduces the most fundamental data structure in PySpark: the DataFrame.
A DataFrame is a two-dimensional labeled data structure with columns of potentially different types. You can think of a DataFrame like a spreadsheet, a SQL table, or a dictionary of series objects. Apache Spark DataFrames support a rich set of APIs (select columns, filter, join, aggregate, etc.) that allow you to solve common data analysis problems efficiently.
Compared to traditional relational databases, Spark DataFrames offer several key advantages for big data processing and analytics:
A note on RDDs: Direct use of RDDs are no longer supported on Spark Connect as of Spark 4.0. Interacting directly with Spark DataFrames uses a unified planning and optimization engine, allowing us to get nearly identical performance across all supported languages on Databricks (Python, SQL, Scala, and R).
There are several ways to create a DataFrame in PySpark.
The simplest way is to use the createDataFrame() method like so:
employees = [{"name": "John D.", "age": 30},
{"name": "Alice G.", "age": 25},
{"name": "Bob T.", "age": 35},
{"name": "Eve A.", "age": 28}]
# Create a DataFrame containing the employees data
df = spark.createDataFrame(employees)
df.show()
We can also create a DataFrame from a local CSV file:
df = spark.read.csv("../data/employees.csv", header=True, inferSchema=True)
df.show()
Or from a local JSON file:
df = spark.read.option("multiline","true").json("../data/employees.json")
df.show()
We can even create a DataFrame from another existing DataFrame, by selecting certain columns:
employees = [
{"name": "John D.", "age": 30, "department": "HR"},
{"name": "Alice G.", "age": 25, "department": "Finance"},
{"name": "Bob T.", "age": 35, "department": "IT"},
{"name": "Eve A.", "age": 28, "department": "Marketing"}
]
df = spark.createDataFrame(employees)
# Select only the name and age columns
new_df = df.select("name", "age")
If you have an existing table table_name in your Spark environment, you can create a DataFrame like this:
df = spark.read.table("table_name")
If your table is in a database, you can use JDBC to read the table into a DataFrame.
url = "jdbc:mysql://localhost:3306/mydatabase"
table = "employees"
properties = {
"user": "username",
"password": "password"
}
# Read table into DataFrame
df = spark.read.jdbc(url=url, table=table, properties=properties)
We can use PySpark to view and interact with our DataFrame.
df.show() displays a basic visualization of the DataFrame's contents. From our above createDataFrame() example:
employees = [{"name": "John D.", "age": 30},
{"name": "Alice G.", "age": 25},
{"name": "Bob T.", "age": 35},
{"name": "Eve A.", "age": 28}]
# Create a DataFrame containing the employees data
df = spark.createDataFrame(employees)
df.show()
df.show() has 3 optional arguments: n, truncate, and vertical.
By default, df.show() displays up to the first 20 rows of the DataFrame.
We can control the number of rows displayed by passing an argument to the show() method:
df.show(n=2)
The truncate argument controls the length of displayed column values (default value is 20):
df.show(truncate=3)
If we set vertical to True, the DataFrame will be displayed vertically with one line per value:
df.show(vertical=True)
We can view information about the DataFrame schema using the printSchema() method:
df.printSchema()
Let's look at some ways we can transform our DataFrames.
For more detailed information, please see the section about data manipulation, Chapter 3: Function Junction - Data manipulation with PySpark.
We can rename DataFrame columns using the withColumnRenamed() method:
df.show()
df2 = df.withColumnRenamed("name", "full_name")
df2.show()
We can filter for employees within a certain age range.
The following df.filter will create a new DataFrame with rows that match our age condition:
filtered_df = df.filter((df["age"] > 26) & (df["age"] < 32))
filtered_df.show()
We can also use df.where to get the same result:
where_df = df.where((df["age"] > 26) & (df["age"] < 32))
where_df.show()
A DataFrame is an immutable distributed collection of data, only available in the current Spark session.
A table is a persistent data structure that can be accessed across multiple Spark sessions.
If you wish to promote a DataFrame to a table, you can use the createOrReplaceTempView() method:
df.createOrReplaceTempView("employees")
Note that the lifetime of this temporary table is tied to the SparkSession that was used to create this DataFrame. To persist the table beyond this Spark session, you will need to save it to persistent storage.
There are several ways to save a DataFrame to persistent storage in PySpark. For more detailed information about saving data locally, see Chapter 7: Load and Behold - Data loading, storage, file formats.
For file-based data source (text, parquet, json, etc.), you can specify a custom table path like so:
df.write.option("path", "../dataout").saveAsTable("dataframes_savetable_example")
Even if the table is dropped, the custom table path and table data will still be there.
If no custom table path is specified, Spark will write data to a default table path under the warehouse directory. When the table is dropped, the default table path will be removed too.
To save to Hive metastore, you can use the following:
df.write().mode("overwrite").saveAsTable("schemaName.tableName")
PySpark supports native plotting, allowing users to visualize data directly from PySpark DataFrames.
The user interacts with PySpark Plotting by calling the plot property on a PySpark DataFrame and specifying the desired type of plot, either as a submethod or by setting the kind parameter. For instance:
df.plot.line(x="category", y="int_val")
or equivalently:
df.plot(kind="line", x="category", y="int_val")
The feature is powered by Plotly as the default visualization backend, offering rich, interactive plotting capabilities, while native pandas is used internally to process data for most plots.