Back to Spark

Quickstart: Pandas API on Spark

python/docs/source/getting_started/quickstart_ps.ipynb

4.1.17.8 KB
Original Source

Quickstart: Pandas API on Spark

This is a short introduction to pandas API on Spark, geared mainly for new users. This notebook shows you some key differences between pandas and pandas API on Spark. You can run this examples by yourself in 'Live Notebook: pandas API on Spark' at the quickstart page.

Customarily, we import pandas API on Spark as follows:

python
import pandas as pd
import numpy as np
import pyspark.pandas as ps
from pyspark.sql import SparkSession

Object Creation

Creating a pandas-on-Spark Series by passing a list of values, letting pandas API on Spark create a default integer index:

python
s = ps.Series([1, 3, 5, np.nan, 6, 8])
python
s

Creating a pandas-on-Spark DataFrame by passing a dict of objects that can be converted to series-like.

python
psdf = ps.DataFrame(
    {'a': [1, 2, 3, 4, 5, 6],
     'b': [100, 200, 300, 400, 500, 600],
     'c': ["one", "two", "three", "four", "five", "six"]},
    index=[10, 20, 30, 40, 50, 60])
python
psdf

Creating a pandas DataFrame by passing a numpy array, with a datetime index and labeled columns:

python
dates = pd.date_range('20130101', periods=6)
python
dates
python
pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))
python
pdf

Now, this pandas DataFrame can be converted to a pandas-on-Spark DataFrame

python
psdf = ps.from_pandas(pdf)
python
type(psdf)

It looks and behaves the same as a pandas DataFrame.

python
psdf

Also, it is possible to create a pandas-on-Spark DataFrame from Spark DataFrame easily.

Creating a Spark DataFrame from pandas DataFrame

python
spark = SparkSession.builder.getOrCreate()
python
sdf = spark.createDataFrame(pdf)
python
sdf.show()

Creating pandas-on-Spark DataFrame from Spark DataFrame.

python
psdf = sdf.pandas_api()
python
psdf

Having specific dtypes . Types that are common to both Spark and pandas are currently supported.

python
psdf.dtypes

Here is how to show top rows from the frame below.

Note that the data in a Spark dataframe does not preserve the natural order by default. The natural order can be preserved by setting compute.ordered_head option but it causes a performance overhead with sorting internally.

python
psdf.head()

Displaying the index, columns, and the underlying numpy data.

python
psdf.index
python
psdf.columns
python
psdf.to_numpy()

Showing a quick statistic summary of your data

python
psdf.describe()

Transposing your data

python
psdf.T

Sorting by its index

python
psdf.sort_index(ascending=False)

Sorting by value

python
psdf.sort_values(by='B')

Missing Data

Pandas API on Spark primarily uses the value np.nan to represent missing data. It is by default not included in computations.

python
pdf1 = pdf.reindex(index=dates[0:4], columns=list(pdf.columns) + ['E'])
python
pdf1.loc[dates[0]:dates[1], 'E'] = 1
python
psdf1 = ps.from_pandas(pdf1)
python
psdf1

To drop any rows that have missing data.

python
psdf1.dropna(how='any')

Filling missing data.

python
psdf1.fillna(value=5)

Operations

Stats

Performing a descriptive statistic:

python
psdf.mean()

Spark Configurations

Various configurations in PySpark could be applied internally in pandas API on Spark. For example, you can enable Arrow optimization to hugely speed up internal pandas conversion. See also <a href="https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html">PySpark Usage Guide for Pandas with Apache Arrow</a> in PySpark documentation.

python
prev = spark.conf.get("spark.sql.execution.arrow.pyspark.enabled")  # Keep its default value.
ps.set_option("compute.default_index_type", "distributed")  # Use default index prevent overhead.
import warnings
warnings.filterwarnings("ignore")  # Ignore warnings coming from Arrow optimizations.
python
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)
%timeit ps.range(300000).to_pandas()
python
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", False)
%timeit ps.range(300000).to_pandas()
python
ps.reset_option("compute.default_index_type")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", prev)  # Set its default value back.

Grouping

By “group by” we are referring to a process involving one or more of the following steps:

  • Splitting the data into groups based on some criteria
  • Applying a function to each group independently
  • Combining the results into a data structure
python
psdf = ps.DataFrame({'A': ['foo', 'bar', 'foo', 'bar',
                          'foo', 'bar', 'foo', 'foo'],
                    'B': ['one', 'one', 'two', 'three',
                          'two', 'two', 'one', 'three'],
                    'C': np.random.randn(8),
                    'D': np.random.randn(8)})
python
psdf

Grouping and then applying the sum() function to the resulting groups.

python
psdf.groupby('A').sum()

Grouping by multiple columns forms a hierarchical index, and again we can apply the sum function.

python
psdf.groupby(['A', 'B']).sum()

Plotting

python
pser = pd.Series(np.random.randn(1000),
                 index=pd.date_range('1/1/2000', periods=1000))
python
psser = ps.Series(pser)
python
psser = psser.cummax()
python
psser.plot()

On a DataFrame, the plot() method is a convenience to plot all of the columns with labels:

python
pdf = pd.DataFrame(np.random.randn(1000, 4), index=pser.index,
                   columns=['A', 'B', 'C', 'D'])
python
psdf = ps.from_pandas(pdf)
python
psdf = psdf.cummax()
python
psdf.plot()

For more details, Plotting documentation.

Getting data in/out

CSV

CSV is straightforward and easy to use. See here to write a CSV file and here to read a CSV file.

python
psdf.to_csv('foo.csv')
ps.read_csv('foo.csv').head(10)

Parquet

Parquet is an efficient and compact file format to read and write faster. See here to write a Parquet file and here to read a Parquet file.

python
psdf.to_parquet('bar.parquet')
ps.read_parquet('bar.parquet').head(10)

Spark IO

In addition, pandas API on Spark fully supports Spark's various datasources such as ORC and an external datasource. See here to write it to the specified datasource and here to read it from the datasource.

python
psdf.spark.to_spark_io('zoo.orc', format="orc")
ps.read_spark_io('zoo.orc', format="orc").head(10)

See the Input/Output documentation for more details.