import $ivy.`org.apache.spark::spark-sql:3.1.1`
import $ivy.`org.typelevel::cats-core:2.3.0`
import $ivy.`com.lihaoyi::sourcecode:0.2.6`
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.{functions => f}
import $ivy.$ import $ivy.$ import $ivy.$ import org.apache.spark._ import org.apache.spark.sql._ import org.apache.spark.sql.{functions => f}
There is no problem in combining conventional Spark column expressions and doric columns. However, to avoid name clashes, we will use the prefix
f
for the former ones.
val spark = org.apache.spark.sql.SparkSession.builder().appName("test").master("local").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
spark: SparkSession = org.apache.spark.sql.SparkSession@7c5776d5
import spark.implicits._
import spark.implicits._
val userDF = List(
("Foo", "Madrid", 35),
("Bar", "New York", 40),
("John", "Paris", 30)
).toDF("name_user", "city_user", "age_user")
userDF: DataFrame = [name_user: string, city_user: string ... 1 more field]
def userS(colName: String): Column =
f.col(colName + "_user")
defined function userS
val userW = userS("name1") //wrong column :S
scala.util.Try(userDF.select(userW)).fold(_.printStackTrace, identity)
org.apache.spark.sql.AnalysisException: cannot resolve '`name1_user`' given input columns: [age_user, city_user, name_user]; 'Project ['name1_user] +- Project [_1#3 AS name_user#10, _2#4 AS city_user#11, _3#5 AS age_user#12] +- LocalRelation [_1#3, _2#4, _3#5] at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:155) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:152) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:341) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:73) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:341) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUp$1(QueryPlan.scala:104) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:116) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:73) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:116) at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:127) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:132) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at scala.collection.TraversableLike.map(TraversableLike.scala:273) at scala.collection.TraversableLike.map$(TraversableLike.scala:266) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:132) at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:137) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:243) at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:137) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:104) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:152) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:93) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:183) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:93) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:90) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:155) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:176) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:228) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:173) at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:73) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:143) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:143) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:73) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:71) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:63) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:88) at org.apache.spark.sql.Dataset.withPlan(Dataset.scala:3715) at org.apache.spark.sql.Dataset.select(Dataset.scala:1462) at ammonite.$sess.cmd8$Helper.$anonfun$res8_1$1(cmd8.sc:2) at scala.util.Try$.apply(Try.scala:213) at ammonite.$sess.cmd8$Helper.<init>(cmd8.sc:2) at ammonite.$sess.cmd8$.<init>(cmd8.sc:7) at ammonite.$sess.cmd8$.<clinit>(cmd8.sc) at ammonite.$sess.cmd8.$main(cmd8.sc) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:564) at ammonite.runtime.Evaluator$$anon$1.$anonfun$evalMain$1(Evaluator.scala:108) at ammonite.util.Util$.withContextClassloader(Util.scala:24) at ammonite.runtime.Evaluator$$anon$1.evalMain(Evaluator.scala:90) at ammonite.runtime.Evaluator$$anon$1.$anonfun$processLine$2(Evaluator.scala:127) at ammonite.util.Catching.map(Res.scala:117) at ammonite.runtime.Evaluator$$anon$1.$anonfun$processLine$1(Evaluator.scala:121) at ammonite.util.Res$Success.flatMap(Res.scala:62) at ammonite.runtime.Evaluator$$anon$1.processLine(Evaluator.scala:120) at ammonite.interp.Interpreter.$anonfun$evaluateLine$4(Interpreter.scala:295) at ammonite.util.Res$Success.flatMap(Res.scala:62) at ammonite.interp.Interpreter.$anonfun$evaluateLine$2(Interpreter.scala:281) at ammonite.util.Catching.flatMap(Res.scala:115) at ammonite.interp.Interpreter.evaluateLine(Interpreter.scala:280) at ammonite.interp.Interpreter.$anonfun$processLine$6(Interpreter.scala:268) at ammonite.util.Res$Success.flatMap(Res.scala:62) at ammonite.interp.Interpreter.$anonfun$processLine$4(Interpreter.scala:251) at ammonite.util.Res$Success.flatMap(Res.scala:62) at ammonite.interp.Interpreter.$anonfun$processLine$2(Interpreter.scala:244) at ammonite.util.Catching.flatMap(Res.scala:115) at ammonite.interp.Interpreter.processLine(Interpreter.scala:243) at almond.Execute.$anonfun$ammResult$11(Execute.scala:238) at almond.internals.CaptureImpl.$anonfun$apply$2(CaptureImpl.scala:53) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at scala.Console$.withErr(Console.scala:196) at almond.internals.CaptureImpl.$anonfun$apply$1(CaptureImpl.scala:45) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at scala.Console$.withOut(Console.scala:167) at almond.internals.CaptureImpl.apply(CaptureImpl.scala:45) at almond.Execute.capturingOutput(Execute.scala:166) at almond.Execute.$anonfun$ammResult$10(Execute.scala:225) at almond.Execute.$anonfun$withClientStdin$1(Execute.scala:146) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at scala.Console$.withIn(Console.scala:230) at almond.Execute.withClientStdin(Execute.scala:142) at almond.Execute.$anonfun$ammResult$9(Execute.scala:225) at almond.Execute.withInputManager(Execute.scala:134) at almond.Execute.$anonfun$ammResult$8(Execute.scala:224) at ammonite.repl.Signaller.apply(Signaller.scala:28) at almond.Execute.interruptible(Execute.scala:183) at almond.Execute.$anonfun$ammResult$7(Execute.scala:223) at ammonite.util.Res$Success.flatMap(Res.scala:62) at almond.Execute.$anonfun$ammResult$1(Execute.scala:214) at almond.Execute.withOutputHandler(Execute.scala:157) at almond.Execute.ammResult(Execute.scala:214) at almond.Execute.apply(Execute.scala:311) at almond.ScalaInterpreter.execute(ScalaInterpreter.scala:127) at almond.interpreter.InterpreterToIOInterpreter.$anonfun$execute$2(InterpreterToIOInterpreter.scala:69) at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:87) at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:366) at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:387) at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:330) at cats.effect.internals.IOShift$Tick.run(IOShift.scala:36) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) at java.base/java.lang.Thread.run(Thread.java:832)
userW: Column = name1_user res8_1: Any = ()
import $ivy.`org.hablapps::doric:0.0.1`
Specific doric imports:
import doric._
doric
¶Here it's a list of use cases for doric:
Let's start!
We can't mix apples and oranges, and Spark knows that. For instance, Spark complains if we try to add integers with booleans:
val df = List(1,2,3).toDF.select($"value" * f.lit(true))
But it complains too late, with an exception raised at runtime. If we delay the creation of the DataFrame, the error dissapears ...
def df = List(1,2,3).toDF.select($"value" * f.lit(true))
... momentarily, until we eventually invoke that code:
df
Using doric, there is no need to wait so long: errors will be reported at compile-time!
// This doesn't compile
def df = List(1,2,3).toDF.select(col[Int]("value") * lit(true))
Changes in column expressions are minimal: just annotate column references with the intended type, i.e. col[Int]("value")
, instead of a plain col("value")
. If you are not used to generic parameters, aliases colInt
, colString
, etc., are also available. We will use these aliases in the sequel.
Naturally, this only works if you, the programmer, know the intended type of the column at compile-time. In a pure dynamic setting, doric is useless. Note, however, that you don't need to know in advance the whole row type as with
Dataset
s. In this way, doric sits between a whole-hearted static setting and a purely dynamic one. It offers type-safety at a minimum cost, without compromising performance.
We can also use doric columns within the context of a withColumn
expression, or, in general, wherever we use plain columns: join
, filter
, etc.:
List(1,2,3).toDF.withColumn("other", colInt("value") * lit(1))
List(1,2,3).toDF.filter(colInt("value") > lit(3))
Join expressions are explained in a separate notebook in more detail.
Implicit type conversions in Spark are pervasive. For instance, the following code won't cause Spark to complain at all:
val df0 = spark.range(1,10).withColumn("x", f.concat(f.col("id"), f.lit("jander")))
which means that an implicit conversion from integer to string is in effect:
df0.select(f.col("x")).show
Assuming that you are certain that your column holds vales of type bigint
, the same code in doric won't compile:
val df = spark.range(1,10).toDF.withColumn("x", concat(colLong("id"), "jander".lit))
Note that the Spark type
bigint
corresponds to the Scala typeLong
. The correspondences between Spark and Scala types in doric is the same as the one established inDataset
s byEncoder
instances.
Still, doric will allow you to perform that operation provided that you explicitly enact the conversion:
val df = spark.range(1,10).toDF.withColumn("x", concat(colLong("id").cast[String], "jander".lit))
df.show
Let's also consider the following example:
val dfEq = List((1, "1"), (1, " 1"), (1, " 1 ")).toDF("int", "str")
dfEq.withColumn("eq", f.col("int") === f.col("str"))
What would you expect to be the result? Well, it all depends on the implicit conversion that Spark chooses to apply, if at all: 1) it may return false for the new column, given that the types of both input columns differ, thus choosing to apply no conversion; 2) it may convert the integer column into a string column; 3) it may convert strings to integers. Let's see what happens:
dfEq.show
Option 3 wins, but you can only learn this by trial and error. With doric, you can depart from all this magic and explicitly cast types, if you desired so:
// Option 1, no castings: compile error
dfEq.withColumn("eq", colInt("int") === colString("str")).show
// Option 2, casting from int to string
dfEq.withColumn("eq", colInt("int").cast[String] === colString("str")).show
// Option 3, casting from string to int, not safe!
dfEq.withColumn("eq", colInt("int") === colString("str").unsafeCast[Int]).show
Note that we can't simply cast
an string to an integer, since this conversion is partial. If the programmer insists in doing this unsafe casting, doric will force her to explicitly acknowledge this fact using the conversion function unsafeCast
.
It's all about being explicit in order to enhance readability and avoid unexpected behaviours at runtime. Doric is a coding accelerator!
Let's suppose that your DataFrame contains a reference to a non-existing column. No problem, Spark will detect that and will complain with an exception at runtime:
List(1,2,3).toDF.select(f.col("id")+1)
Now, let's assume that the column exists but its type is not what we expected. Spark won't be able to detect that, since type expectations are not encoded in plain columns. Thus, the following code will compile and execute without errors:
val df = List("1","2","three").toDF.select(f.col("value") + 1)
and we will be able to run the DataFrame:
df.show
obtaining null
values and garbage results, in general.
Using doric we can prevent the creation of the DataFrame, since column expressions are typed:
val df = List("1","2","three").toDF.select(colInt("value") + 1.lit)
If the column doesn't exist, it will complain with a similar message to that given by Spark:
val df = List("1","2","three").toDF.select(colInt("id") + 1.lit)
But note that the location of the error is also included. This will prove immensely useful, as we will see later on!
Given the following DataFrame:
val dfadd = List((1,2),(3,4)).toDF("int1", "int2")
let's try to add both columns as follows:
dfadd.withColumn("add", f.col("Int_1") + f.col("Int_2"))
Rightly, Spark complains because column "Int_1" doesn't exist. Let's fix that problem:
dfadd.withColumn("add", f.col("int1") + f.col("Int_2"))
Ooops, another error. Fortunately, this is the last one:
dfadd.withColumn("add", f.col("int1") + f.col("int2"))
But, why didn't Spark give us all errors at once? Well, a plain fail-fast strategy for error reporting is simpler. Unlike Spark, doric won't stop at the first error, and will keep accumulating all errors until no further one is found:
dfadd.withColumn("add", colInt("int_1") + colInt("int_2"))
Let's pretend that our business logic is very complex, and modularised in different functions. For instance:
val col1: Column = f.col("int_1")
val col2: Column = f.col("int2")
val addColumns: Column = col1 + col2
There is an error when referring to the first column and Spark reports it:
dfadd.withColumn("add", addColumns)
But, Spark does not give a clue about the exact source of the error. It marks the error in the withColumn
method, but the actual problem is elsewhere, in expression col1
. You have no choice but diving into the code and perform a brute exhaustive search.
Using doric, we can modularise our code without remorse:
val col1: DoricColumn[Int] = colInt("int_1")
val col2: DoricColumn[Int] = colString("int2").unsafeCast[Int]
val addColumns: DoricColumn[Int] = col1 + col2
When we attempt to compose the DataFrame:
dfadd.withColumn("add", addColumns)
we will get not only the errors, but the exact location of the culprit:
habla.doric.DoricMultiError: Found 2 errors in withColumn
Cannot resolve column name "int_1" among (int1, int2)
located at . (cmd83.sc:1)
The column with name 'int2' is of type IntegerType and it was expected to be StringType
located at . (cmd83.sc:2)
As you can see, errors are reported referring to the source files (cmd83.sc
) and line numbers (1
and 2
, respectively) where they are located. If you are using an IDE, you will additionally obtain an hyperlink to the error. Isn't that nice? :)