python/docs/source/user_guide/bugbusting.ipynb
PySpark executes applications in a distributed environment, making it challenging to monitor and debug these applications. It can be difficult to track which nodes are executing specific code. However, there are multiple methods available within PySpark to help with debugging. This section will outline how to effectively debug PySpark applications.
PySpark operates using Spark as its underlying engine, utilizing Spark Connect server or Py4J (Spark Classic) to submit and compute jobs in Spark.
On the driver side, PySpark interacts with the Spark Driver on JVM through Spark
Connect server or Py4J (Spark Classic). When pyspark.sql.SparkSession is created and
initialized, PySpark starts to communicate with the Spark Driver.
On the executor side, Python workers are responsible for executing and managing Python native functions or data. These workers are only launched if the PySpark application requires interaction between Python and JVMs such as Python UDF execution. They are initiated on-demand, for instance, when running pandas UDFs or PySpark RDD APIs.
Debugging a Python UDF in PySpark can be done by simply adding print statements, though the output won't be visible in the client/driver side since the functions are executed on the executors - they can be seen in Spark UI. For example, if you have a working Python UDF:
from pyspark.sql.functions import udf
@udf("integer")
def my_udf(x):
# Do something with x
return x
You can add print statements for debugging as shown below:
@udf("integer")
def my_udf(x):
# Do something with x
print("What's going on?")
return x
spark.range(1).select(my_udf("id")).collect()
The output can be viewed in the Spark UI under stdout/stderr at Executors tab.
When running non-Python UDF code, debugging is typically done via the Spark UI or
by using DataFrame.explain(True).
For instance, the code below performs a join between a large DataFrame (df1) and a
smaller one (df2):
df1 = spark.createDataFrame([(x,) for x in range(100)])
df2 = spark.createDataFrame([(x,) for x in range(2)])
df1.join(df2, "_1").explain()
Using DataFrame.explain displays the physical plans, showing how the join will
be executed. Those physical plans represent individual steps for the whole execution.
Here, it exchanges, a.k.a. shuffles, the data and performs a sort-merge-join.
After checking how the plans are generated via this method, users can optimize their queries.
For example, because df2 is very small, it can be broadcasted to executors
and remove the shuffle
from pyspark.sql.functions import broadcast
df1.join(broadcast(df2), "_1").explain()
As can be seen the shuffle is removed, and it performs broadcast-hash-join:
These optimizations can also be visualized in the Spark UI under the SQL / DataFrame tab after execution.
df1.join(df2, "_1").collect()
df1.join(broadcast(df2), "_1").collect()
top and psOn the driver side, you can obtain the process ID from your PySpark shell to monitor resources:
import os; os.getpid()
%%bash
ps -fe 23976
On the executor side, you can use grep to find the process IDs and resources for
Python workers, as these are forked from pyspark.daemon.
%%bash
ps -fe | grep pyspark.daemon | head -n 5
Typically, users leverage top and the identified PIDs to monitor the memory usage of Python processes in PySpark.
In order to debug the driver side, users typically can use most of the existing Python tools such as memory_profiler that allow you to check the memory usage line by line. If your driver program is not running on another machine (e.g., YARN cluster mode), you can use a memory profiler to debug memory usage on the driver side. For example:
%%bash
echo "from pyspark.sql import SparkSession
#===Your function should be decorated with @profile===
from memory_profiler import profile
@profile
#=====================================================
def my_func():
session = SparkSession.builder.getOrCreate()
df = session.range(10000)
return df.collect()
if __name__ == '__main__':
my_func()" > profile_memory.py
python -m memory_profiler profile_memory.py 2> /dev/null
It shows which line consumes how much memory properly.
PySpark provides remote memory_profiler
for Python/Pandas UDFs. That can be used on editors with line numbers such as
Jupyter notebooks. SparkSession-based memory profiler can be enabled by setting
the runtime SQL configuration spark.sql.pyspark.udf.profiler to memory:
from pyspark.sql.functions import pandas_udf
df = spark.range(10)
@pandas_udf("long")
def add1(x):
return x + 1
spark.conf.set("spark.sql.pyspark.udf.profiler", "memory")
added = df.select(add1("id"))
spark.profile.clear()
added.collect()
spark.profile.show(type="memory")
The UDF IDs (e.g., 16) can be seen in the query plan, for example, add1(...)#16L in
ArrowEvalPython as shown below.
added.explain()
Python Profilers are useful built-in features in Python itself for profiling performance. To use this on driver side, you can use it as you would do for regular Python programs because PySpark on driver side is a regular Python process unless you are running your driver program in another machine (e.g., YARN cluster mode).
%%bash
echo "from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.range(10).collect()" > app.py
python -m cProfile -s cumulative app.py 2> /dev/null | head -n 20
PySpark provides remote Python Profilers for Python/Pandas UDFs. UDFs with
iterators as inputs/outputs are not supported. SparkSession-based performance
profiler can be enabled by setting the runtime SQL configuration
spark.sql.pyspark.udf.profiler to perf. An example is as shown below.
import io
from contextlib import redirect_stdout
from pyspark.sql.functions import pandas_udf
df = spark.range(10)
@pandas_udf("long")
def add1(x):
return x + 1
added = df.select(add1("id"))
spark.conf.set("spark.sql.pyspark.udf.profiler", "perf")
spark.profile.clear()
added.collect()
# Only show top 10 lines
output = io.StringIO()
with redirect_stdout(output):
spark.profile.show(type="perf")
print("\n".join(output.getvalue().split("\n")[0:20]))
The UDF IDs (e.g., 22) can be seen in the query plan, for example, add1(...)#22L in
ArrowEvalPython below.
added.explain()
We can render the result with a preregistered renderer as shown below.
spark.profile.render(id=2, type="perf") # renderer="flameprof" by default
By default, JVM stacktraces and Python internal tracebacks are hidden especially in Python UDF executions. For example,
from pyspark.sql.functions import udf
spark.range(1).select(udf(lambda x: x / 0)("id")).collect()
To show the whole internal stacktraces, users can enable
spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled and spark.sql.pyspark.jvmStacktrace.enabled
respectively.
spark.conf.set("spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled", False)
spark.conf.set("spark.sql.pyspark.jvmStacktrace.enabled", False)
spark.range(1).select(udf(lambda x: x / 0)("id")).collect()
spark.conf.set("spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled", True)
spark.conf.set("spark.sql.pyspark.jvmStacktrace.enabled", True)
spark.range(1).select(udf(lambda x: x / 0)("id")).collect()
See also Stack Traces for more details.
PySpark provides a logging mechanism for Python workers that execute UDFs, UDTFs, Pandas UDFs, and Python data sources. When enabled, all logging output (including print statements, standard logging, and exceptions) is captured and made available for querying and analysis.
Worker logging is disabled by default. Enable it by setting the Spark SQL configuration:
spark.conf.set("spark.sql.pyspark.worker.logging.enabled", "true")
All captured logs can be queried as a DataFrame:
logs = spark.tvf.python_worker_logs()
The logs DataFrame contains the following columns:
"INFO", "WARNING", "ERROR")"stdout", "stderr")func_name, class_name, custom fields)from pyspark.sql.functions import udf
import logging
import sys
@udf("string")
def my_udf(value):
logger = logging.getLogger("my_custom_logger")
logger.setLevel(logging.INFO) # Set level to INFO to capture info messages
logger.info(f"Processing value: {value}")
logger.warning("This is a warning")
print("This is a stdout message") # INFO level, logger=stdout
print("This is a stderr message", file=sys.stderr) # ERROR level, logger=stderr
return value.upper()
# Enable logging and execute
spark.conf.set("spark.sql.pyspark.worker.logging.enabled", "true")
df = spark.createDataFrame([("hello",), ("world",)], ["text"])
df.select(my_udf("text")).show()
# Query the logs
logs = spark.tvf.python_worker_logs()
logs.select("level", "msg", "logger", "context").show(truncate=False)
You can add custom context information to your logs:
from pyspark.sql.functions import lit, udf
import logging
@udf("string")
def contextual_udf(value):
logger = logging.getLogger("contextual")
logger.warning(
"Processing with extra context",
extra={"context": {"user_id": 123, "operation": "transform"}}
)
return value
spark.conf.set("spark.sql.pyspark.worker.logging.enabled", "true")
spark.range(1).select(contextual_udf(lit("test"))).show()
logs = spark.tvf.python_worker_logs()
logs.filter("logger = 'contextual'").select("msg", "context").show(truncate=False)
The context includes both automatic fields (like func_name) and custom fields (like user_id, operation).
Exceptions are automatically captured with full stack traces:
from pyspark.sql.functions import udf
import logging
@udf("int")
def failing_udf(x):
logger = logging.getLogger("error_handler")
try:
result = 100 / x
except ZeroDivisionError:
logger.exception("Division by zero occurred")
return -1
return int(result)
spark.conf.set("spark.sql.pyspark.worker.logging.enabled", "true")
spark.createDataFrame([(0,), (5,)], ["value"]).select(failing_udf("value")).show()
logs = spark.tvf.python_worker_logs()
logs.filter("logger = 'error_handler'").select("msg", "exception").show(truncate=False)
Worker logging also works with UDTFs and Python Data Sources, capturing both the class and function names:
from pyspark.sql.functions import col, udtf
import logging
@udtf(returnType="word: string, length: int")
class WordSplitter:
def eval(self, text: str):
logger = logging.getLogger("udtf_logger")
logger.setLevel(logging.INFO) # Set level to INFO to capture info messages
words = text.split()
logger.info(f"Processing {len(words)} words")
for word in words:
yield (word, len(word))
spark.conf.set("spark.sql.pyspark.worker.logging.enabled", "true")
df = spark.createDataFrame([("hello world",)], ["text"])
df.lateralJoin(WordSplitter(col("text").outer())).show()
logs = spark.tvf.python_worker_logs()
logs.filter("logger = 'udtf_logger'").select("msg", "context").show(truncate=False)
You can use standard DataFrame operations to analyze logs:
logs = spark.tvf.python_worker_logs()
# Count logs by level
logs.groupBy("level").count().show()
# Find all errors
logs.filter("level = 'ERROR'").show()
# Logs from a specific function
logs.filter("context.func_name = 'my_udf'").show()
# Logs with exceptions
logs.filter("exception is not null").show()
# Time-based analysis
logs.orderBy("ts").show()
On the driver side, no additional steps are needed to use IDE for debugging your PySpark application. Refer to the guide below:
On the executor side, it requires several steps to set up the remote debugger. Refer to the guide below: