#!/usr/bin/env python # coding: utf-8 # # Quickstart: Pandas API on Spark # # This is a short introduction to pandas API on Spark, geared mainly for new users. This notebook shows you some key differences between pandas and pandas API on Spark. You can run this examples by yourself in 'Live Notebook: pandas API on Spark' at [the quickstart page](https://spark.apache.org/docs/latest/api/python/getting_started/index.html). # # Customarily, we import pandas API on Spark as follows: # In[1]: import pandas as pd import numpy as np import pyspark.pandas as ps from pyspark.sql import SparkSession # ## Object Creation # # # Creating a pandas-on-Spark Series by passing a list of values, letting pandas API on Spark create a default integer index: # In[2]: s = ps.Series([1, 3, 5, np.nan, 6, 8]) # In[3]: s # Creating a pandas-on-Spark DataFrame by passing a dict of objects that can be converted to series-like. # In[4]: psdf = ps.DataFrame( {'a': [1, 2, 3, 4, 5, 6], 'b': [100, 200, 300, 400, 500, 600], 'c': ["one", "two", "three", "four", "five", "six"]}, index=[10, 20, 30, 40, 50, 60]) # In[5]: psdf # Creating a pandas DataFrame by passing a numpy array, with a datetime index and labeled columns: # In[6]: dates = pd.date_range('20130101', periods=6) # In[7]: dates # In[8]: pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD')) # In[9]: pdf # Now, this pandas DataFrame can be converted to a pandas-on-Spark DataFrame # In[10]: psdf = ps.from_pandas(pdf) # In[11]: type(psdf) # It looks and behaves the same as a pandas DataFrame. # In[12]: psdf # Also, it is possible to create a pandas-on-Spark DataFrame from Spark DataFrame easily. # # Creating a Spark DataFrame from pandas DataFrame # In[13]: spark = SparkSession.builder.getOrCreate() # In[14]: sdf = spark.createDataFrame(pdf) # In[15]: sdf.show() # Creating pandas-on-Spark DataFrame from Spark DataFrame. # In[16]: psdf = sdf.pandas_api() # In[17]: psdf # Having specific [dtypes](http://pandas.pydata.org/pandas-docs/stable/basics.html#basics-dtypes) . Types that are common to both Spark and pandas are currently supported. # In[18]: psdf.dtypes # Here is how to show top rows from the frame below. # # Note that the data in a Spark dataframe does not preserve the natural order by default. The natural order can be preserved by setting `compute.ordered_head` option but it causes a performance overhead with sorting internally. # In[19]: psdf.head() # Displaying the index, columns, and the underlying numpy data. # In[20]: psdf.index # In[21]: psdf.columns # In[22]: psdf.to_numpy() # Showing a quick statistic summary of your data # In[23]: psdf.describe() # Transposing your data # In[24]: psdf.T # Sorting by its index # In[25]: psdf.sort_index(ascending=False) # Sorting by value # In[26]: psdf.sort_values(by='B') # ## Missing Data # Pandas API on Spark primarily uses the value `np.nan` to represent missing data. It is by default not included in computations. # # In[27]: pdf1 = pdf.reindex(index=dates[0:4], columns=list(pdf.columns) + ['E']) # In[28]: pdf1.loc[dates[0]:dates[1], 'E'] = 1 # In[29]: psdf1 = ps.from_pandas(pdf1) # In[30]: psdf1 # To drop any rows that have missing data. # In[31]: psdf1.dropna(how='any') # Filling missing data. # In[32]: psdf1.fillna(value=5) # ## Operations # ### Stats # Performing a descriptive statistic: # In[33]: psdf.mean() # ### Spark Configurations # # Various configurations in PySpark could be applied internally in pandas API on Spark. # For example, you can enable Arrow optimization to hugely speed up internal pandas conversion. See also PySpark Usage Guide for Pandas with Apache Arrow in PySpark documentation. # In[34]: prev = spark.conf.get("spark.sql.execution.arrow.pyspark.enabled") # Keep its default value. ps.set_option("compute.default_index_type", "distributed") # Use default index prevent overhead. import warnings warnings.filterwarnings("ignore") # Ignore warnings coming from Arrow optimizations. # In[35]: spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True) get_ipython().run_line_magic('timeit', 'ps.range(300000).to_pandas()') # In[36]: spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", False) get_ipython().run_line_magic('timeit', 'ps.range(300000).to_pandas()') # In[37]: ps.reset_option("compute.default_index_type") spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", prev) # Set its default value back. # ## Grouping # By “group by” we are referring to a process involving one or more of the following steps: # # - Splitting the data into groups based on some criteria # - Applying a function to each group independently # - Combining the results into a data structure # In[38]: psdf = ps.DataFrame({'A': ['foo', 'bar', 'foo', 'bar', 'foo', 'bar', 'foo', 'foo'], 'B': ['one', 'one', 'two', 'three', 'two', 'two', 'one', 'three'], 'C': np.random.randn(8), 'D': np.random.randn(8)}) # In[39]: psdf # Grouping and then applying the [sum()](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.groupby.GroupBy.sum.html) function to the resulting groups. # In[40]: psdf.groupby('A').sum() # Grouping by multiple columns forms a hierarchical index, and again we can apply the sum function. # In[41]: psdf.groupby(['A', 'B']).sum() # ## Plotting # In[42]: pser = pd.Series(np.random.randn(1000), index=pd.date_range('1/1/2000', periods=1000)) # In[43]: psser = ps.Series(pser) # In[44]: psser = psser.cummax() # In[45]: psser.plot() # On a DataFrame, the [plot()](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.plot.html) method is a convenience to plot all of the columns with labels: # In[46]: pdf = pd.DataFrame(np.random.randn(1000, 4), index=pser.index, columns=['A', 'B', 'C', 'D']) # In[47]: psdf = ps.from_pandas(pdf) # In[48]: psdf = psdf.cummax() # In[49]: psdf.plot() # For more details, [Plotting](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/frame.html#plotting) documentation. # ## Getting data in/out # ### CSV # # CSV is straightforward and easy to use. See [here](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.to_csv.html) to write a CSV file and [here](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.read_csv.html) to read a CSV file. # In[50]: psdf.to_csv('foo.csv') ps.read_csv('foo.csv').head(10) # ### Parquet # # Parquet is an efficient and compact file format to read and write faster. See [here](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.to_parquet.html) to write a Parquet file and [here](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.read_parquet.html) to read a Parquet file. # In[51]: psdf.to_parquet('bar.parquet') ps.read_parquet('bar.parquet').head(10) # ### Spark IO # # In addition, pandas API on Spark fully supports Spark's various datasources such as ORC and an external datasource. See [here](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.to_orc.html) to write it to the specified datasource and [here](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.read_orc.html) to read it from the datasource. # In[52]: psdf.to_spark_io('zoo.orc', format="orc") ps.read_spark_io('zoo.orc', format="orc").head(10) # See the [Input/Output](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/io.html) documentation for more details.