This tutorial is based on Spark SQL Guide - Getting started.
For this demo we used the city of Vienna trees dataset ("Baumkataster") made available by Open Data Österreich and downloadable from here .
We're going to start by creating a Spark session. Our Spark job will be named "Python Spark SQL basic example". spark
is the variable holding our Spark session.
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.getOrCreate()
Read the file into a Spark dataframe.
df = spark.read \
.load("FME_BaumdatenBearbeitet_OGD_20190205.csv",
format="csv", sep=";", header="true", encoding="iso-8859-1")
Note: we assume that the file FME_BaumdatenBearbeitet_OGD_20190205.csv
is in your local directory. If at this point you get an error message that looks like AnalysisException: 'Path does not exist
then check your Spark configuration for how to define the correct file path.
Show first three lines of Spark dataframe
df.show(3)
+-------+------+------------+----------+-----+-----------+-----+-----------------+-----------+---+-----+------+----------------+----------------+ |Flaeche|BaumNr| Gattung| Art|Sorte|NameDeutsch|Hoehe|Schirmdurchmesser|Stammumfang|Typ| XPos| YPos| lon| lat| +-------+------+------------+----------+-----+-----------+-----+-----------------+-----------+---+-----+------+----------------+----------------+ | 0| 0| ZumTesten| 0| 0| 20190205| 0| 0| 0| 0|70000|350000|14,2757549011314|48,2844031941042| | 870| 021a| Quercus| sp.| -| Eiche| 6| 3| 31| L|72431|354949|14,3093549528352|48,3286271802142| | 572| 127|Liriodendron|tulipifera| -| Tulpenbaum| 5| 2| 21| L|71171|353742|14,2921648325343|48,3179178510249| +-------+------+------------+----------+-----+-----------+-----+-----------------+-----------+---+-----+------+----------------+----------------+ only showing top 3 rows
For pretty-printing you can use toPandas()
df.toPandas().head(3)
Flaeche | BaumNr | Gattung | Art | Sorte | NameDeutsch | Hoehe | Schirmdurchmesser | Stammumfang | Typ | XPos | YPos | lon | lat | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 0 | 0 | ZumTesten | 0 | 0 | 20190205 | 0 | 0 | 0 | 0 | 70000 | 350000 | 14,2757549011314 | 48,2844031941042 |
1 | 870 | 021a | Quercus | sp. | - | Eiche | 6 | 3 | 31 | L | 72431 | 354949 | 14,3093549528352 | 48,3286271802142 |
2 | 572 | 127 | Liriodendron | tulipifera | - | Tulpenbaum | 5 | 2 | 21 | L | 71171 | 353742 | 14,2921648325343 | 48,3179178510249 |
Show number of different trees (count German names in df
and sort by count)
df.groupBy("NameDeutsch").count().orderBy('count', ascending=False).show()
+--------------------+-----+ | NameDeutsch|count| +--------------------+-----+ | Winter-Linde| 1583| | Weiß-Birke| 1442| | Spitz-Ahorn| 1273| | Stiel-Eiche| 1228| | Ahorn| 1079| | Hainbuche| 1036| | Gemeine Esche| 987| |Ahornblättrige-Pl...| 961| | Rotbuche| 747| | Linde| 688| | Feld-Ahorn| 637| | Berg-Ahorn| 566| | Säulen-Hainbuche| 507| |Gemeine Rosskastanie| 461| | Tulpenbaum| 436| | Robinie| 358| | Silber-Weide| 353| | Schwarz-Kiefer| 321| | Baumhasel| 258| | Serbische Fichte| 257| +--------------------+-----+ only showing top 20 rows
An example of SQL query (see Running SQL Queries Programmatically): let's sort trees by height ("Hoehe").
df.createOrReplaceTempView("baeume")
spark.sql("SELECT BaumNr, NameDeutsch, Hoehe, lat, lon FROM baeume order by Hoehe desc").show()
+------+--------------------+-----+----------------+----------------+ |BaumNr| NameDeutsch|Hoehe| lat| lon| +------+--------------------+-----+----------------+----------------+ | 844| Weißdorn| 99|48,3198141692826|14,3032456456049| | 005| Hainbuche| 99|48,3107341721266|14,2818381176027| | 129| Weiß-Birke| 99|48,2764221420506|14,3029622419918| | 037|Gemeine Kiefer / ...| 95|48,3021818905511|14,2769106844535| | 007| Tulpenbaum| 90|48,2765758955147|14,3027882173276| | 051| Weiß-Birke| 9|48,2671468323820|14,2832242240464| | 001| Säulen-Hainbuche| 9|48,2973425597313|14,3138560746343| | 009| Sommer-Linde| 9|48,2383321988402|14,3668869798697| | 025| Schwarz-Birke| 9|48,2749605071833|14,2781176169409| | 004| Rotbuche| 9|48,2768058741599|14,2821412254345| | 011| Robinie| 9|48,2382302739335|14,3347263036350| | 009| Götterbaum| 9|48,2766683843868|14,2814265266950| | 014| Götterbaum| 9|48,2764131088558|14,2809709854517| | 004| Rotbuche| 9|48,2768218589229|14,2821505374108| | 002| Winter-Linde| 9|48,2960178194628|14,2818038617467| | 001| Silber-Ahorn| 9|48,2777290002478|14,2797511856080| | 036| Ahorn| 9|48,2335935143941|14,3798873585795| | 047| Sommer-Linde| 9|48,2346725447652|14,3758058184719| | 056| Hainbuche| 9|48,2784551013593|14,3103718222569| | 022| Weiß-Birke| 9|48,2781732375800|14,2972684731463| +------+--------------------+-----+----------------+----------------+ only showing top 20 rows
The height data doesn't seem to be up-to-date.
Spark properties control most application settings and are configured separately for each application. These properties can be set directly on a SparkConf
passed to your SparkContext
(from Apache Spark documentation).
We've already seen how to modify the SparkConf
when we created our Spark application session with the command:
spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .getOrCreate()
Let us look at the rest of the Spark configuration.
from pyspark.conf import SparkConf
spark.sparkContext._conf.getAll()
[('spark.app.id', 'local-1574021194317'), ('spark.driver.port', '34544'), ('spark.rdd.compress', 'True'), ('spark.serializer.objectStreamReset', '100'), ('spark.master', 'local[*]'), ('spark.executor.id', 'driver'), ('spark.app.name', 'Python Spark SQL basic example'), ('spark.submit.deployMode', 'client'), ('spark.driver.host', 'c100.local'), ('spark.ui.showConsoleProgress', 'true')]
The property spark.app.name
is the name of our app that we just defined.
Another important property is spark.master
. This defines the master URL for the Spark application. A list of all admissible values for spark.master
is given here: master-urls.
In this example the Spark master URL is local[*]
, this means that our Spark application will run locally with as many worker threads as logical cores on our local machine.
If you have a Hadoop cluster available you can deploy your Spark application on Yarn by setting the option spark.master = yarn
. Let's do that and then check the Spark configuration once again.
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.master('yarn') \
.getOrCreate()
spark.sparkContext._conf.getAll()
[('spark.master', 'yarn'), ('spark.app.id', 'local-1574021194317'), ('spark.driver.port', '34544'), ('spark.rdd.compress', 'True'), ('spark.serializer.objectStreamReset', '100'), ('spark.executor.id', 'driver'), ('spark.app.name', 'Python Spark SQL basic example'), ('spark.submit.deployMode', 'client'), ('spark.driver.host', 'c100.local'), ('spark.ui.showConsoleProgress', 'true')]
With this configuration our Spark application will run on the Hadoop cluster and its resources will be managed by Yarn.
Note: If the Hadoop cluster is configured with HDFS as its default filesystem, then you need to upload your CSV file to Hadoop in order to be able to read it:
hdfs dfs -put FME_BaumdatenBearbeitet_OGD_20190205.csv FME_BaumdatenBearbeitet_OGD_20190205.csv
and then you can just use `.load( ...) ` again.
%%bash
hdfs dfs -put FME_BaumdatenBearbeitet_OGD_20190205.csv
hdfs dfs -ls FME_BaumdatenBearbeitet_OGD_20190205.csv
-rw-r--r-- 3 datalab supergroup 2623070 2019-11-17 20:53 FME_BaumdatenBearbeitet_OGD_20190205.csv
df = spark.read \
.load("FME_BaumdatenBearbeitet_OGD_20190205.csv",
format="csv", sep=";", header="true", encoding="iso-8859-1")
Let's now re-run the previous commands. This time the application is going to be deployed on the cluster.
df.createOrReplaceTempView("baeume")
spark.sql("SELECT BaumNr, NameDeutsch, Hoehe, lat, lon FROM baeume order by Hoehe desc").show()
+------+--------------------+-----+----------------+----------------+ |BaumNr| NameDeutsch|Hoehe| lat| lon| +------+--------------------+-----+----------------+----------------+ | 844| Weißdorn| 99|48,3198141692826|14,3032456456049| | 005| Hainbuche| 99|48,3107341721266|14,2818381176027| | 129| Weiß-Birke| 99|48,2764221420506|14,3029622419918| | 037|Gemeine Kiefer / ...| 95|48,3021818905511|14,2769106844535| | 007| Tulpenbaum| 90|48,2765758955147|14,3027882173276| | 051| Weiß-Birke| 9|48,2671468323820|14,2832242240464| | 001| Säulen-Hainbuche| 9|48,2973425597313|14,3138560746343| | 009| Sommer-Linde| 9|48,2383321988402|14,3668869798697| | 025| Schwarz-Birke| 9|48,2749605071833|14,2781176169409| | 004| Rotbuche| 9|48,2768058741599|14,2821412254345| | 011| Robinie| 9|48,2382302739335|14,3347263036350| | 009| Götterbaum| 9|48,2766683843868|14,2814265266950| | 014| Götterbaum| 9|48,2764131088558|14,2809709854517| | 004| Rotbuche| 9|48,2768218589229|14,2821505374108| | 002| Winter-Linde| 9|48,2960178194628|14,2818038617467| | 001| Silber-Ahorn| 9|48,2777290002478|14,2797511856080| | 036| Ahorn| 9|48,2335935143941|14,3798873585795| | 047| Sommer-Linde| 9|48,2346725447652|14,3758058184719| | 056| Hainbuche| 9|48,2784551013593|14,3103718222569| | 022| Weiß-Birke| 9|48,2781732375800|14,2972684731463| +------+--------------------+-----+----------------+----------------+ only showing top 20 rows
Note: After you're done, it's important to close the Spark session in order to release cluster resources.
spark.stop()