import coursier._
interp.repositories() ++= Seq(MavenRepository("https://repo1.maven.org/maven2"))
import coursier._
import $ivy.`org.apache.spark::spark-sql:2.3.1`
import $ivy.`sh.almond::almond-spark:0.4.0`
import $ivy.`com.github.julien-truffaut::monocle-core:1.5.0`
import $ivy.`com.github.julien-truffaut::monocle-macro:1.5.0`
import $ivy.`org.hablapps::spark-optics:0.1.0`
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)
import org.apache.spark.sql.functions._
import org.apache.spark.sql._
import $ivy.$ import $ivy.$ import $ivy.$ import $ivy.$ import $ivy.$ import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.functions._ import org.apache.spark.sql._
val sparkSession = NotebookSparkSession.builder().master("local").appName("jupiter").getOrCreate()
sparkSession.sparkContext.setLogLevel("ERROR")
Loading spark-stubs Getting spark JARs Creating SparkSession
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
sparkSession: SparkSession = org.apache.spark.sql.SparkSession@34c7457e
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)
case class Street(number: Int, name: String)
case class Address(city: String, street: Street)
case class Company(name: String, address: Address)
case class Employee(name: String, company: Company)
defined class Street defined class Address defined class Company defined class Employee
Spark lenses Spark has a columnar format, the columns can be of any basic sql type, integers, floats, strings, timestamps, dates. Also spark allow us to use complex structures, as structs what would be a product in ADT. Also arrays and maps are consider complex types. In our case we are going to focus in structs only, and to make it easier, first we are going to create a case class that will be our default structure. Due to issues of creating case classes in jupyter, we already have them precompiled in the project. And they follow the following code: case class Street(number: Int, name: String) case class Address(city: String, street: Street) case class Company(name: String, address: Address) case class Employee(name: String, company: Company)
//import org.habla.sparklens.{Employee,Company,Address,Street}
val employee = Employee("john", Company("awesome inc", Address("london", Street(23, "high street"))))
employee: Employee = Employee( "john", Company("awesome inc", Address("london", Street(23, "high street"))) )
import sparkSession.implicits._
val df = List(employee).toDS.toDF
import sparkSession.implicits._ df: DataFrame = [name: string, company: struct<name: string, address: struct<city: string, street: struct<number: int, name: string>>>]
df.show
df.printSchema
+----+--------------------+ |name| company| +----+--------------------+ |john|[awesome inc, [lo...| +----+--------------------+ root |-- name: string (nullable = true) |-- company: struct (nullable = true) | |-- name: string (nullable = true) | |-- address: struct (nullable = true) | | |-- city: string (nullable = true) | | |-- street: struct (nullable = true) | | | |-- number: integer (nullable = false) | | | |-- name: string (nullable = true)
As you can see, now we have a dataframe representation of the employee: The name is a string element, and the company is a struct, that also have complex types inside. Due to the sql oriented api of the dataset api, its hard to modify a single element, keeping the structure the same, eaven for the first level data.
val employeeNameChanged = df.select(concat(df("name"),lit("!!!")).as("name"),df("company"))
employeeNameChanged.show
employeeNameChanged.printSchema
+-------+--------------------+ | name| company| +-------+--------------------+ |john!!!|[awesome inc, [lo...| +-------+--------------------+ root |-- name: string (nullable = true) |-- company: struct (nullable = true) | |-- name: string (nullable = true) | |-- address: struct (nullable = true) | | |-- city: string (nullable = true) | | |-- street: struct (nullable = true) | | | |-- number: integer (nullable = false) | | | |-- name: string (nullable = true)
employeeNameChanged: DataFrame = [name: string, company: struct<name: string, address: struct<city: string, street: struct<number: int, name: string>>>]
And for the structs? Let's try to change the name of the company.
val companyNameChanged = df.select(
df("name"),
struct(
concat(df("company.name"),lit("!!!")).as("name"),
df("company.address")
).as("company")
)
companyNameChanged.show
companyNameChanged.printSchema
+----+--------------------+ |name| company| +----+--------------------+ |john|[awesome inc!!!, ...| +----+--------------------+ root |-- name: string (nullable = true) |-- company: struct (nullable = false) | |-- name: string (nullable = true) | |-- address: struct (nullable = true) | | |-- city: string (nullable = true) | | |-- street: struct (nullable = true) | | | |-- number: integer (nullable = false) | | | |-- name: string (nullable = true)
companyNameChanged: DataFrame = [name: string, company: struct<name: string, address: struct<city: string, street: struct<number: int, name: string>>>]
OMG!! 😱😱😱 We have to keep track of the name of the transformed element, and also for all the parents!!! But if we had our case classes, how come we do this?
employee.copy(name = employee.name+"!!!")
employee.copy(company = employee.company.copy(name = employee.company.name+"!!!"))
res9_0: Employee = Employee( "john!!!", Company("awesome inc", Address("london", Street(23, "high street"))) ) res9_1: Employee = Employee( "john", Company("awesome inc!!!", Address("london", Street(23, "high street"))) )
Well, it's sorter, but it's still a pain in the back. Luckly we have optics :D
import monocle.Lens
import monocle.macros.GenLens
val company : Lens[Employee, Company] = GenLens[Employee](_.company)
val address : Lens[Company , Address] = GenLens[Company](_.address)
val street : Lens[Address , Street] = GenLens[Address](_.street)
val streetName: Lens[Street , String] = GenLens[Street](_.name)
val employeeStreet = company composeLens address composeLens street composeLens streetName
import monocle.Lens import monocle.macros.GenLens company: Lens[Employee, Company] = ammonite.$sess.cmd10$Helper$$anon$1@44dfd41d address: Lens[Company, Address] = ammonite.$sess.cmd10$Helper$$anon$2@295d28eb street: Lens[Address, Street] = ammonite.$sess.cmd10$Helper$$anon$3@7f95e953 streetName: Lens[Street, String] = ammonite.$sess.cmd10$Helper$$anon$4@69d4a9e5 employeeStreet: monocle.PLens[Employee, Employee, String, String] = monocle.PLens$$anon$1@5391f7fe
val streetChanger:Employee => Employee = employeeStreet.modify(_ + "!!!")
streetChanger(employee)
streetChanger: Employee => Employee = <function1> res11_1: Employee = Employee( "john", Company("awesome inc", Address("london", Street(23, "high street!!!"))) )
That easy? Wish there was something like this in spark...
import org.hablapps.sparkOptics.Lens
import org.hablapps.sparkOptics.syntax._
val lens = Lens("company.address.street.name")(df.schema)
val transformedDF = df.select(lens.modify(concat(_,lit("!!!"))):_*)
transformedDF.printSchema
transformedDF.as[Employee].head
root |-- name: string (nullable = true) |-- company: struct (nullable = false) | |-- name: string (nullable = true) | |-- address: struct (nullable = false) | | |-- city: string (nullable = true) | | |-- street: struct (nullable = false) | | | |-- number: integer (nullable = true) | | | |-- name: string (nullable = true)
import org.hablapps.sparkOptics.Lens import org.hablapps.sparkOptics.syntax._ lens: Lens = Lens(company.address.street.name) transformedDF: DataFrame = [name: string, company: struct<name: string, address: struct<city: string, street: struct<number: int, name: string>>>] res14_5: Employee = Employee( "john", Company("awesome inc", Address("london", Street(23, "high street!!!"))) )
Hold on, explain me that. Start from the begin, make it like monocle. Ok, lets create our first lens.
import org.hablapps.sparkOptics.ProtoLens.ProtoLens
val companyProtoLens: ProtoLens = Lens("company")
//the name of the column, similar to the "_.company" of "GenLens[Employee](_.company),
//this is the element that we will focus in the structure
val companyLens: Lens = companyProtoLens(df.schema)
//providing the schema, it's similar to the "Employee" of "GenLens[Employee](_.company)
//this is the context of the lens
import org.hablapps.sparkOptics.ProtoLens.ProtoLens companyProtoLens: types.StructType => Lens = <function1> companyLens: Lens = Lens(company)
First difference with monocle and sparkOptics, monocle, due to the hard typed languaje of scala,
it returns compiling errors if you try to do a GenLens[Employee](_.unknownField)
.
But spark sql is a dynamic typed, but lenses helps you to make your transformations a little bit more safe.
import scala.util.Try
val unknownFieldLens:ProtoLens = Lens("unknownField")
Try{unknownFieldLens(df.schema)}
import scala.util.Try unknownFieldLens: types.StructType => Lens = <function1> res16_2: Try[Lens] = Failure( java.lang.AssertionError: assertion failed: the column unknownField not found in [name,company] )
It's not a compile error, but it's something! You can create a ProtoLens (a lens only with the column name defined) and when you try to generate a Lens, it gives you an error, you can't create invalid lenses! But lets see how we can compose new lenses.
import org.apache.spark.sql.types.StructType
val companyL: Lens = Lens("company")(df.schema)
val companySchema = df.schema.fields.find(_.name == "company").get.dataType.asInstanceOf[StructType]
val addressL = Lens("address")(companySchema)
val addressSchema = companySchema.fields.find(_.name == "address").get.dataType.asInstanceOf[StructType]
val streetL = Lens("street")(addressSchema)
val streetSchema = addressSchema.fields.find(_.name == "street").get.dataType.asInstanceOf[StructType]
val streetNameL = Lens("name")(streetSchema)
val employeeCompanyStreetName = companyL composeLens addressL composeLens streetL composeLens streetNameL
val modifiedDF = df.select(employeeCompanyStreetName.set(lit("new street name")):_*)
modifiedDF.printSchema
modifiedDF.as[Employee].head
root |-- name: string (nullable = true) |-- company: struct (nullable = false) | |-- name: string (nullable = true) | |-- address: struct (nullable = false) | | |-- city: string (nullable = true) | | |-- street: struct (nullable = false) | | | |-- number: integer (nullable = true) | | | |-- name: string (nullable = false)
import org.apache.spark.sql.types.StructType companyL: Lens = Lens(company) companySchema: StructType = StructType( StructField("name", StringType, true, {}), StructField( "address", StructType( StructField("city", StringType, true, {}), StructField( "street", StructType( StructField("number", IntegerType, false, {}), StructField("name", StringType, true, {}) ), true, {} ) ), true, {} ) ) addressL: Lens = Lens(address) addressSchema: StructType = StructType( StructField("city", StringType, true, {}), StructField( "street", StructType( StructField("number", IntegerType, false, {}), StructField("name", StringType, true, {}) ), true, {} ) ) streetL: Lens = Lens(street) streetSchema: StructType = StructType( StructField("number", IntegerType, false, {}), StructField("name", StringType, true, {}) ) streetNameL: Lens = Lens(name) employeeCompanyStreetName: Lens = Lens(company.address.street.name) modifiedDF: DataFrame = [name: string, company: struct<name: string, address: struct<city: string, street: struct<number: int, name: string>>>] res17_11: Employee = Employee( "john", Company("awesome inc", Address("london", Street(23, "new street name"))) )
Too much code still, passing all the time the schema of each element... In spark the schemas are recursive, they not only have the schema of the level, also of all the sub elements. So we can take advance of the ProtoLenses.
val shorterLens =
Lens("company")(df.schema) composeProtoLens Lens("address") composeProtoLens Lens("street") composeProtoLens Lens("name")
val modifiedDF = df.select(shorterLens.modify(upper):_*)
modifiedDF.printSchema
modifiedDF.as[Employee].head
root |-- name: string (nullable = true) |-- company: struct (nullable = false) | |-- name: string (nullable = true) | |-- address: struct (nullable = false) | | |-- city: string (nullable = true) | | |-- street: struct (nullable = false) | | | |-- number: integer (nullable = true) | | | |-- name: string (nullable = true)
shorterLens: Lens = Lens(company.address.street.name) modifiedDF: DataFrame = [name: string, company: struct<name: string, address: struct<city: string, street: struct<number: int, name: string>>>] res18_3: Employee = Employee( "john", Company("awesome inc", Address("london", Street(23, "HIGH STREET"))) )
We have created first a Lens, and then compose them with ProtoLenses, in the composition the lens will extract the schema of the selected element for you, checking if it exist. Still too much code? You can compose with a syntax closer to spark.
val flashLens = Lens("company.address.street.name")(df.schema)
val modifiedDF = df.select(flashLens.modify(upper):_*)
modifiedDF.printSchema
modifiedDF.as[Employee].head
root |-- name: string (nullable = true) |-- company: struct (nullable = false) | |-- name: string (nullable = true) | |-- address: struct (nullable = false) | | |-- city: string (nullable = true) | | |-- street: struct (nullable = false) | | | |-- number: integer (nullable = true) | | | |-- name: string (nullable = true)
flashLens: Lens = Lens(company.address.street.name) modifiedDF: DataFrame = [name: string, company: struct<name: string, address: struct<city: string, street: struct<number: int, name: string>>>] res19_3: Employee = Employee( "john", Company("awesome inc", Address("london", Street(23, "HIGH STREET"))) )
Whant to see how much code whould have been that example?
val mDF = df.select(df("name"),struct(
df("company.name").as("name"),
struct(
df("company.address.city").as("city"),
struct(
df("company.address.street.number").as("number"),
upper(df("company.address.street.name")).as("name")
).as("street")
).as("address")
).as("company"))
mDF.printSchema
val longCodeEmployee = mDF.as[Employee].head
longCodeEmployee == modifiedDF.as[Employee].head
root |-- name: string (nullable = true) |-- company: struct (nullable = false) | |-- name: string (nullable = true) | |-- address: struct (nullable = false) | | |-- city: string (nullable = true) | | |-- street: struct (nullable = false) | | | |-- number: integer (nullable = true) | | | |-- name: string (nullable = true)
mDF: DataFrame = [name: string, company: struct<name: string, address: struct<city: string, street: struct<number: int, name: string>>>] longCodeEmployee: Employee = Employee( "john", Company("awesome inc", Address("london", Street(23, "HIGH STREET"))) ) res20_3: Boolean = true
This is only for a 4 levels depth structure, and each level only 2 elements, imagine for a larger structure. 😱
Why use this utilities? Why not datasets? Datasets it's a great api, but it has the problem that it can only work with well defined case classes, and can't work with interfaces. So when you need to abstract yourself, you only have the dataframe api. Using protolens, you can interact with common elements of different dataframes, making simple, reusable and clear code. All your topics from kafka share common metadata fields? create lenses for them.
df.select(flashLens.prune(Vector.empty):_*).schema
res21: StructType = StructType( StructField("name", StringType, true, {}), StructField( "company", StructType( StructField("name", StringType, true, {}), StructField( "address", StructType( StructField("city", StringType, true, {}), StructField( "street", StructType(StructField("number", IntegerType, true, {})), false, {} ) ), false, {} ) ), false, {} ) )
df.select(flashLens.rename("newName"):_*).schema
res22: StructType = StructType( StructField("name", StringType, true, {}), StructField( "company", StructType( StructField("name", StringType, true, {}), StructField( "address", StructType( StructField("city", StringType, true, {}), StructField( "street", StructType( StructField("number", IntegerType, true, {}), StructField("newName", StringType, true, {}) ), false, {} ) ), false, {} ) ), false, {} ) )
flashLens.modifyDF(c => concat(c,c))(df).select(flashLens.get).as[String].head
res33: String = "high streethigh street"
flashLens.modifyDF(c => concat(c,c))(df).schema
res35: StructType = StructType( StructField("name", StringType, true, {}), StructField( "company", StructType( StructField("name", StringType, true, {}), StructField( "address", StructType( StructField("city", StringType, true, {}), StructField( "street", StructType( StructField("number", IntegerType, true, {}), StructField("name", StringType, true, {}) ), false, {} ) ), false, {} ) ), false, {} ) )