Back to Spark

Chapter 4: Bug Busting - Debugging PySpark

python/docs/source/user_guide/bugbusting.ipynb

4.1.113.7 KB
Original Source

Chapter 4: Bug Busting - Debugging PySpark

PySpark executes applications in a distributed environment, making it challenging to monitor and debug these applications. It can be difficult to track which nodes are executing specific code. However, there are multiple methods available within PySpark to help with debugging. This section will outline how to effectively debug PySpark applications.

PySpark operates using Spark as its underlying engine, utilizing Spark Connect server or Py4J (Spark Classic) to submit and compute jobs in Spark.

On the driver side, PySpark interacts with the Spark Driver on JVM through Spark Connect server or Py4J (Spark Classic). When pyspark.sql.SparkSession is created and initialized, PySpark starts to communicate with the Spark Driver.

On the executor side, Python workers are responsible for executing and managing Python native functions or data. These workers are only launched if the PySpark application requires interaction between Python and JVMs such as Python UDF execution. They are initiated on-demand, for instance, when running pandas UDFs or PySpark RDD APIs.

Spark UI

Python UDF Execution

Debugging a Python UDF in PySpark can be done by simply adding print statements, though the output won't be visible in the client/driver side since the functions are executed on the executors - they can be seen in Spark UI. For example, if you have a working Python UDF:

python
from pyspark.sql.functions import udf

@udf("integer")
def my_udf(x):
    # Do something with x
    return x

You can add print statements for debugging as shown below:

python
@udf("integer")
def my_udf(x):
    # Do something with x
    print("What's going on?")
    return x

spark.range(1).select(my_udf("id")).collect()

The output can be viewed in the Spark UI under stdout/stderr at Executors tab.

Non-Python UDF

When running non-Python UDF code, debugging is typically done via the Spark UI or by using DataFrame.explain(True).

For instance, the code below performs a join between a large DataFrame (df1) and a smaller one (df2):

python
df1 = spark.createDataFrame([(x,) for x in range(100)])
df2 = spark.createDataFrame([(x,) for x in range(2)])
df1.join(df2, "_1").explain()

Using DataFrame.explain displays the physical plans, showing how the join will be executed. Those physical plans represent individual steps for the whole execution. Here, it exchanges, a.k.a. shuffles, the data and performs a sort-merge-join.

After checking how the plans are generated via this method, users can optimize their queries. For example, because df2 is very small, it can be broadcasted to executors and remove the shuffle

python
from pyspark.sql.functions import broadcast

df1.join(broadcast(df2), "_1").explain()

As can be seen the shuffle is removed, and it performs broadcast-hash-join:

These optimizations can also be visualized in the Spark UI under the SQL / DataFrame tab after execution.

python
df1.join(df2, "_1").collect()

python
df1.join(broadcast(df2), "_1").collect()

Monitor with top and ps

On the driver side, you can obtain the process ID from your PySpark shell to monitor resources:

python
import os; os.getpid()
python
%%bash
ps -fe 23976

On the executor side, you can use grep to find the process IDs and resources for Python workers, as these are forked from pyspark.daemon.

python
%%bash
ps -fe | grep pyspark.daemon | head -n 5

Typically, users leverage top and the identified PIDs to monitor the memory usage of Python processes in PySpark.

Use PySpark Profilers

Memory Profiler

In order to debug the driver side, users typically can use most of the existing Python tools such as memory_profiler that allow you to check the memory usage line by line. If your driver program is not running on another machine (e.g., YARN cluster mode), you can use a memory profiler to debug memory usage on the driver side. For example:

python
%%bash

echo "from pyspark.sql import SparkSession
#===Your function should be decorated with @profile===
from memory_profiler import profile
@profile
#=====================================================
def my_func():
    session = SparkSession.builder.getOrCreate()
    df = session.range(10000)
    return df.collect()
if __name__ == '__main__':
    my_func()" > profile_memory.py

python -m memory_profiler profile_memory.py 2> /dev/null

It shows which line consumes how much memory properly.

Python/Pandas UDF

<div class="alert alert-block alert-info"> <b>Note:</b> This section applies to Spark 4.0 </div>

PySpark provides remote memory_profiler for Python/Pandas UDFs. That can be used on editors with line numbers such as Jupyter notebooks. SparkSession-based memory profiler can be enabled by setting the runtime SQL configuration spark.sql.pyspark.udf.profiler to memory:

python
from pyspark.sql.functions import pandas_udf

df = spark.range(10)

@pandas_udf("long")
def add1(x):
  return x + 1

spark.conf.set("spark.sql.pyspark.udf.profiler", "memory")

added = df.select(add1("id"))
spark.profile.clear()
added.collect()
spark.profile.show(type="memory")

The UDF IDs (e.g., 16) can be seen in the query plan, for example, add1(...)#16L in ArrowEvalPython as shown below.

python
added.explain()

Performance Profiler

<div class="alert alert-block alert-info"> <b>Note:</b> This section applies to Spark 4.0 </div>

Python Profilers are useful built-in features in Python itself for profiling performance. To use this on driver side, you can use it as you would do for regular Python programs because PySpark on driver side is a regular Python process unless you are running your driver program in another machine (e.g., YARN cluster mode).

python
%%bash

echo "from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.range(10).collect()" > app.py

python -m cProfile -s cumulative app.py  2> /dev/null | head -n 20

Python/Pandas UDF

<div class="alert alert-block alert-info"> <b>Note:</b> This section applies to Spark 4.0 </div>

PySpark provides remote Python Profilers for Python/Pandas UDFs. UDFs with iterators as inputs/outputs are not supported. SparkSession-based performance profiler can be enabled by setting the runtime SQL configuration spark.sql.pyspark.udf.profiler to perf. An example is as shown below.

python
import io
from contextlib import redirect_stdout

from pyspark.sql.functions import pandas_udf

df = spark.range(10)
@pandas_udf("long")
def add1(x):
    return x + 1

added = df.select(add1("id"))

spark.conf.set("spark.sql.pyspark.udf.profiler", "perf")
spark.profile.clear()
added.collect()

# Only show top 10 lines
output = io.StringIO()
with redirect_stdout(output):
    spark.profile.show(type="perf")

print("\n".join(output.getvalue().split("\n")[0:20]))

The UDF IDs (e.g., 22) can be seen in the query plan, for example, add1(...)#22L in ArrowEvalPython below.

python
added.explain()

We can render the result with a preregistered renderer as shown below.

python
spark.profile.render(id=2, type="perf")  # renderer="flameprof" by default

Display Stacktraces

<div class="alert alert-block alert-info"> <b>Note:</b> This section applies to Spark 4.0 </div>

By default, JVM stacktraces and Python internal tracebacks are hidden especially in Python UDF executions. For example,

python
from pyspark.sql.functions import udf

spark.range(1).select(udf(lambda x: x / 0)("id")).collect()

To show the whole internal stacktraces, users can enable spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled and spark.sql.pyspark.jvmStacktrace.enabled respectively.

python
spark.conf.set("spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled", False)
spark.conf.set("spark.sql.pyspark.jvmStacktrace.enabled", False)
spark.range(1).select(udf(lambda x: x / 0)("id")).collect()
python
spark.conf.set("spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled", True)
spark.conf.set("spark.sql.pyspark.jvmStacktrace.enabled", True)
spark.range(1).select(udf(lambda x: x / 0)("id")).collect()

See also Stack Traces for more details.

Python Worker Logging

<div class="alert alert-block alert-info"> <b>Note:</b> This section applies to Spark 4.1 </div>

PySpark provides a logging mechanism for Python workers that execute UDFs, UDTFs, Pandas UDFs, and Python data sources. When enabled, all logging output (including print statements, standard logging, and exceptions) is captured and made available for querying and analysis.

Enabling Worker Logging

Worker logging is disabled by default. Enable it by setting the Spark SQL configuration:

python
spark.conf.set("spark.sql.pyspark.worker.logging.enabled", "true")

Accessing Logs

All captured logs can be queried as a DataFrame:

python
logs = spark.tvf.python_worker_logs()

The logs DataFrame contains the following columns:

  • ts: Timestamp of the log entry
  • level: Log level (e.g., "INFO", "WARNING", "ERROR")
  • logger: Logger name (e.g., custom logger name, "stdout", "stderr")
  • msg: The log message
  • context: A map containing contextual information (e.g., func_name, class_name, custom fields)
  • exception: Exception details (if an exception was logged)

Examples

Basic UDF Logging

python
from pyspark.sql.functions import udf
import logging
import sys

@udf("string")
def my_udf(value):
    logger = logging.getLogger("my_custom_logger")
    logger.setLevel(logging.INFO)  # Set level to INFO to capture info messages
    logger.info(f"Processing value: {value}")
    logger.warning("This is a warning")
    print("This is a stdout message")  # INFO level, logger=stdout
    print("This is a stderr message", file=sys.stderr)  # ERROR level, logger=stderr
    return value.upper()

# Enable logging and execute
spark.conf.set("spark.sql.pyspark.worker.logging.enabled", "true")
df = spark.createDataFrame([("hello",), ("world",)], ["text"])
df.select(my_udf("text")).show()

# Query the logs
logs = spark.tvf.python_worker_logs()
logs.select("level", "msg", "logger", "context").show(truncate=False)

Logging with Custom Context

You can add custom context information to your logs:

python
from pyspark.sql.functions import lit, udf
import logging

@udf("string")
def contextual_udf(value):
    logger = logging.getLogger("contextual")
    logger.warning(
        "Processing with extra context",
        extra={"context": {"user_id": 123, "operation": "transform"}}
    )
    return value

spark.conf.set("spark.sql.pyspark.worker.logging.enabled", "true")
spark.range(1).select(contextual_udf(lit("test"))).show()

logs = spark.tvf.python_worker_logs()
logs.filter("logger = 'contextual'").select("msg", "context").show(truncate=False)

The context includes both automatic fields (like func_name) and custom fields (like user_id, operation).

Exception Logging

Exceptions are automatically captured with full stack traces:

python
from pyspark.sql.functions import udf
import logging

@udf("int")
def failing_udf(x):
    logger = logging.getLogger("error_handler")
    try:
        result = 100 / x
    except ZeroDivisionError:
        logger.exception("Division by zero occurred")
        return -1
    return int(result)

spark.conf.set("spark.sql.pyspark.worker.logging.enabled", "true")
spark.createDataFrame([(0,), (5,)], ["value"]).select(failing_udf("value")).show()

logs = spark.tvf.python_worker_logs()
logs.filter("logger = 'error_handler'").select("msg", "exception").show(truncate=False)

UDTF and Python Data Source Logging

Worker logging also works with UDTFs and Python Data Sources, capturing both the class and function names:

python
from pyspark.sql.functions import col, udtf
import logging

@udtf(returnType="word: string, length: int")
class WordSplitter:
    def eval(self, text: str):
        logger = logging.getLogger("udtf_logger")
        logger.setLevel(logging.INFO)  # Set level to INFO to capture info messages
        words = text.split()
        logger.info(f"Processing {len(words)} words")
        for word in words:
            yield (word, len(word))

spark.conf.set("spark.sql.pyspark.worker.logging.enabled", "true")
df = spark.createDataFrame([("hello world",)], ["text"])
df.lateralJoin(WordSplitter(col("text").outer())).show()

logs = spark.tvf.python_worker_logs()
logs.filter("logger = 'udtf_logger'").select("msg", "context").show(truncate=False)

Querying and Analyzing Logs

You can use standard DataFrame operations to analyze logs:

python
logs = spark.tvf.python_worker_logs()

# Count logs by level
logs.groupBy("level").count().show()

# Find all errors
logs.filter("level = 'ERROR'").show()

# Logs from a specific function
logs.filter("context.func_name = 'my_udf'").show()

# Logs with exceptions
logs.filter("exception is not null").show()

# Time-based analysis
logs.orderBy("ts").show()

IDE Debugging

On the driver side, no additional steps are needed to use IDE for debugging your PySpark application. Refer to the guide below:

On the executor side, it requires several steps to set up the remote debugger. Refer to the guide below: