#!/usr/bin/env python # coding: utf-8 # # Chapter 4: Bug Busting - Debugging PySpark # # PySpark executes applications in a distributed environment, making it challenging to # monitor and debug these applications. It can be difficult to track which nodes are # executing specific code. However, there are multiple methods available within PySpark # to help with debugging. This section will outline how to effectively debug PySpark # applications. # # PySpark operates using Spark as its underlying engine, utilizing Spark Connect server # or Py4J (Spark Classic) to submit and compute jobs in Spark. # # On the driver side, PySpark interacts with the Spark Driver on JVM through Spark # Connect server or Py4J (Spark Classic). When `pyspark.sql.SparkSession` is created and # initialized, PySpark starts to communicate with the Spark Driver. # # On the executor side, Python workers are responsible for executing and managing Python # native functions or data. These workers are only launched if the PySpark application # requires interaction between Python and JVMs such as Python UDF execution. They are # initiated on-demand, for instance, when running pandas UDFs or PySpark RDD APIs. # ## Spark UI # # ### Python UDF Execution # # Debugging a Python UDF in PySpark can be done by simply adding print statements, though # the output won't be visible in the client/driver side since the functions are executed # on the executors - they can be seen in Spark UI. For example, if you have a working # Python UDF: # In[1]: from pyspark.sql.functions import udf @udf("integer") def my_udf(x): # Do something with x return x # You can add print statements for debugging as shown below: # In[2]: @udf("integer") def my_udf(x): # Do something with x print("What's going on?") return x spark.range(1).select(my_udf("id")).collect() # The output can be viewed in the Spark UI under `stdout`/`stderr` at `Executors` tab. # # ![Spark UI print](./assets/pyspark-ui-print.png) # ### Non-Python UDF # # When running non-Python UDF code, debugging is typically done via the Spark UI or # by using `DataFrame.explain(True)`. # # For instance, the code below performs a join between a large DataFrame (`df1`) and a # smaller one (`df2`): # In[3]: df1 = spark.createDataFrame([(x,) for x in range(100)]) df2 = spark.createDataFrame([(x,) for x in range(2)]) df1.join(df2, "_1").explain() # Using `DataFrame.explain` displays the physical plans, showing how the join will # be executed. Those physical plans represent individual steps for the whole execution. # Here, it exchanges, a.k.a. shuffles, the data and performs a sort-merge-join. # # After checking how the plans are generated via this method, users can optimize their queries. # For example, because `df2` is very small, it can be broadcasted to executors # and remove the shuffle # # In[4]: from pyspark.sql.functions import broadcast df1.join(broadcast(df2), "_1").explain() # As can be seen the shuffle is removed, and it performs broadcast-hash-join: # # These optimizations can also be visualized in the Spark UI under the SQL / DataFrame # tab after execution. # # In[5]: df1.join(df2, "_1").collect() # # ![PySpark UI SQL](./assets/pyspark-ui-sql.png) # # In[6]: df1.join(broadcast(df2), "_1").collect() # ![PySpark UI SQL broadcast](./assets/pyspark-ui-sql-broadcast.png) # ## Monitor with `top` and `ps` # # On the driver side, you can obtain the process ID from your PySpark shell to # monitor resources: # In[7]: import os; os.getpid() # In[8]: get_ipython().run_cell_magic('bash', '', 'ps -fe 23976\n') # On the executor side, you can use `grep` to find the process IDs and resources for # Python workers, as these are forked from `pyspark.daemon`. # In[9]: get_ipython().run_cell_magic('bash', '', 'ps -fe | grep pyspark.daemon | head -n 5\n') # Typically, users leverage top and the identified PIDs to monitor the memory usage # of Python processes in PySpark. # ## Use PySpark Profilers # # ### Memory Profiler # # In order to debug the driver side, users typically can use most of the existing # Python tools such as [memory_profiler](https://github.com/pythonprofilers/memory_profiler) # that allow you to check the memory usage line by line. If your driver program # is not running on another machine (e.g., YARN cluster mode), you can use a memory # profiler to debug memory usage on the driver side. For example: # In[10]: get_ipython().run_cell_magic('bash', '', '\necho "from pyspark.sql import SparkSession\n#===Your function should be decorated with @profile===\nfrom memory_profiler import profile\n@profile\n#=====================================================\ndef my_func():\n session = SparkSession.builder.getOrCreate()\n df = session.range(10000)\n return df.collect()\nif __name__ == \'__main__\':\n my_func()" > profile_memory.py\n\npython -m memory_profiler profile_memory.py 2> /dev/null\n') # It shows which line consumes how much memory properly. # #### Python/Pandas UDF #
# Note: This section applies to Spark 4.0 #
# PySpark provides remote [memory_profiler](https://github.com/pythonprofilers/memory_profiler) # for Python/Pandas UDFs. That can be used on editors with line numbers such as # Jupyter notebooks. SparkSession-based memory profiler can be enabled by setting # the runtime SQL configuration `spark.sql.pyspark.udf.profiler` to `memory`: # In[11]: from pyspark.sql.functions import pandas_udf df = spark.range(10) @pandas_udf("long") def add1(x): return x + 1 spark.conf.set("spark.sql.pyspark.udf.profiler", "memory") added = df.select(add1("id")) spark.profile.clear() added.collect() spark.profile.show(type="memory") # The UDF IDs (e.g., 16) can be seen in the query plan, for example, `add1(...)#16L` in # `ArrowEvalPython` as shown below. # In[12]: added.explain() # ### Performance Profiler #
# Note: This section applies to Spark 4.0 #
# [Python Profilers](https://docs.python.org/3/library/profile.html) are useful built-in # features in Python itself for profiling performance. To use this on driver side, you can use it as you would # do for regular Python programs because PySpark on driver side is a regular Python # process unless you are running your driver program in another machine # (e.g., YARN cluster mode). # In[13]: get_ipython().run_cell_magic('bash', '', '\necho "from pyspark.sql import SparkSession\nspark = SparkSession.builder.getOrCreate()\nspark.range(10).collect()" > app.py\n\npython -m cProfile -s cumulative app.py 2> /dev/null | head -n 20\n') # #### Python/Pandas UDF #
# Note: This section applies to Spark 4.0 #
# PySpark provides remote Python Profilers for Python/Pandas UDFs. UDFs with # iterators as inputs/outputs are not supported. SparkSession-based performance # profiler can be enabled by setting the runtime SQL configuration # `spark.sql.pyspark.udf.profiler` to `perf`. An example is as shown below. # In[14]: import io from contextlib import redirect_stdout from pyspark.sql.functions import pandas_udf df = spark.range(10) @pandas_udf("long") def add1(x): return x + 1 added = df.select(add1("id")) spark.conf.set("spark.sql.pyspark.udf.profiler", "perf") spark.profile.clear() added.collect() # Only show top 10 lines output = io.StringIO() with redirect_stdout(output): spark.profile.show(type="perf") print("\n".join(output.getvalue().split("\n")[0:20])) # The UDF IDs (e.g., 22) can be seen in the query plan, for example, `add1(...)#22L` in # `ArrowEvalPython` below. # In[15]: added.explain() # We can render the result with a preregistered renderer as shown below. # In[16]: spark.profile.render(id=2, type="perf") # renderer="flameprof" by default # ![PySpark UDF profiling](./assets/pyspark-udf-profile.png) # ## Disply Stacktraces #
# Note: This section applies to Spark 4.0 #
# By default, JVM stacktraces and Python internal tracebacks are hidden especially # in Python UDF executions. For example, # In[17]: from pyspark.sql.functions import udf spark.range(1).select(udf(lambda x: x / 0)("id")).collect() # # To show the whole internal stacktraces, users can enable # `spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled` and `spark.sql.pyspark.jvmStacktrace.enabled` # respectively. # # In[18]: spark.conf.set("spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled", False) spark.conf.set("spark.sql.pyspark.jvmStacktrace.enabled", False) spark.range(1).select(udf(lambda x: x / 0)("id")).collect() # In[19]: spark.conf.set("spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled", True) spark.conf.set("spark.sql.pyspark.jvmStacktrace.enabled", True) spark.range(1).select(udf(lambda x: x / 0)("id")).collect() # See also [Stack Traces](https://spark.apache.org/docs/latest/api/python/development/debugging.html#stack-traces) for more details. # # ## IDE Debugging # On the driver side, no additional steps are needed to use IDE for debugging your PySpark application. Refer to the guide below: # # - [Setting up IDEs](https://spark.apache.org/docs/latest/api/python/development/setting_ide.html) # # On the executor side, it requires several steps to set up the remote debugger. Refer to the guide below: # # - [Remote Debugging (PyCharm Professional)](https://spark.apache.org/docs/latest/api/python/development/debugging.html#remote-debugging-pycharm-professional).