#!/usr/bin/env python # coding: utf-8 # # Chapter 4: Bug Busting - Debugging PySpark # # PySpark executes applications in a distributed environment, making it challenging to # monitor and debug these applications. It can be difficult to track which nodes are # executing specific code. However, there are multiple methods available within PySpark # to help with debugging. This section will outline how to effectively debug PySpark # applications. # # PySpark operates using Spark as its underlying engine, utilizing Spark Connect server # or Py4J (Spark Classic) to submit and compute jobs in Spark. # # On the driver side, PySpark interacts with the Spark Driver on JVM through Spark # Connect server or Py4J (Spark Classic). When `pyspark.sql.SparkSession` is created and # initialized, PySpark starts to communicate with the Spark Driver. # # On the executor side, Python workers are responsible for executing and managing Python # native functions or data. These workers are only launched if the PySpark application # requires interaction between Python and JVMs such as Python UDF execution. They are # initiated on-demand, for instance, when running pandas UDFs or PySpark RDD APIs. # ## Spark UI # # ### Python UDF Execution # # Debugging a Python UDF in PySpark can be done by simply adding print statements, though # the output won't be visible in the client/driver side since the functions are executed # on the executors - they can be seen in Spark UI. For example, if you have a working # Python UDF: # In[1]: from pyspark.sql.functions import udf @udf("integer") def my_udf(x): # Do something with x return x # You can add print statements for debugging as shown below: # In[2]: @udf("integer") def my_udf(x): # Do something with x print("What's going on?") return x spark.range(1).select(my_udf("id")).collect() # The output can be viewed in the Spark UI under `stdout`/`stderr` at `Executors` tab. # #  # ### Non-Python UDF # # When running non-Python UDF code, debugging is typically done via the Spark UI or # by using `DataFrame.explain(True)`. # # For instance, the code below performs a join between a large DataFrame (`df1`) and a # smaller one (`df2`): # In[3]: df1 = spark.createDataFrame([(x,) for x in range(100)]) df2 = spark.createDataFrame([(x,) for x in range(2)]) df1.join(df2, "_1").explain() # Using `DataFrame.explain` displays the physical plans, showing how the join will # be executed. Those physical plans represent individual steps for the whole execution. # Here, it exchanges, a.k.a. shuffles, the data and performs a sort-merge-join. # # After checking how the plans are generated via this method, users can optimize their queries. # For example, because `df2` is very small, it can be broadcasted to executors # and remove the shuffle # # In[4]: from pyspark.sql.functions import broadcast df1.join(broadcast(df2), "_1").explain() # As can be seen the shuffle is removed, and it performs broadcast-hash-join: # # These optimizations can also be visualized in the Spark UI under the SQL / DataFrame # tab after execution. # # In[5]: df1.join(df2, "_1").collect() # #  # # In[6]: df1.join(broadcast(df2), "_1").collect() #  # ## Monitor with `top` and `ps` # # On the driver side, you can obtain the process ID from your PySpark shell to # monitor resources: # In[7]: import os; os.getpid() # In[8]: get_ipython().run_cell_magic('bash', '', 'ps -fe 23976\n') # On the executor side, you can use `grep` to find the process IDs and resources for # Python workers, as these are forked from `pyspark.daemon`. # In[9]: get_ipython().run_cell_magic('bash', '', 'ps -fe | grep pyspark.daemon | head -n 5\n') # Typically, users leverage top and the identified PIDs to monitor the memory usage # of Python processes in PySpark. # ## Use PySpark Profilers # # ### Memory Profiler # # In order to debug the driver side, users typically can use most of the existing # Python tools such as [memory_profiler](https://github.com/pythonprofilers/memory_profiler) # that allow you to check the memory usage line by line. If your driver program # is not running on another machine (e.g., YARN cluster mode), you can use a memory # profiler to debug memory usage on the driver side. For example: # In[10]: get_ipython().run_cell_magic('bash', '', '\necho "from pyspark.sql import SparkSession\n#===Your function should be decorated with @profile===\nfrom memory_profiler import profile\n@profile\n#=====================================================\ndef my_func():\n session = SparkSession.builder.getOrCreate()\n df = session.range(10000)\n return df.collect()\nif __name__ == \'__main__\':\n my_func()" > profile_memory.py\n\npython -m memory_profiler profile_memory.py 2> /dev/null\n') # It shows which line consumes how much memory properly. # #### Python/Pandas UDF #