%classpath config resolver jitpack.io https://jitpack.io %%classpath add mvn com.github.twosigma flint master-SNAPSHOT org.apache.spark spark-sql_2.11 2.2.1 org.apache.spark spark-mllib_2.11 2.2.1 org.scalanlp breeze_2.10 0.13.2 //Creates spark session import com.twosigma.flint.timeseries.CSV import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("Simple Application") .master("local[4]") .config("spark.ui.enabled", "false") .getOrCreate() //Creates a TimeSeriesRDD from a CSV file val tsRdd = CSV.from( spark.sqlContext, "../resources/data/flint-demo.csv", header = true, dateFormat = "yyyyMMdd HH:mm:ss.SSS", sorted = true ) //Basic operations import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.types.DoubleType import org.apache.spark.sql.Row def changeTimeFunction(id: Int, time: Long) : Long = { return if (id == 3) time + 25 else time } val priceAsInteger = tsRdd.cast("price" -> IntegerType) val filteredRowsByPrice = tsRdd.keepRows { row: Row => row.getAs[Double]("price") > 4.0 } val timeColumnOnly = tsRdd.keepColumns("time") val withoutIdColumn = tsRdd.deleteColumns("id") val renamedColumns = tsRdd.renameColumns("id" -> "ticker", "price" -> "highPrice") val updatedTimeColumn = tsRdd.setTime { row: Row => changeTimeFunction(row.getAs[Int]("id"), row.getAs[Long]("time")) } //Crate columns val newHighPriceColumn = tsRdd.addColumns( "highPrice" -> DoubleType -> { r: Row => r.getAs[Double]("price") + 1.5 } ) val results = tsRdd.addColumnsForCycle( "adjustedPrice" -> DoubleType -> { rows: Seq[Row] => rows.map { row => (row, row.getAs[Double]("price") * rows.size) }.toMap } ) //Group functions import com.twosigma.flint.timeseries.Windows val groupedByCycle = tsRdd.groupByCycle() val intervals = tsRdd .keepRows { row: Row => row.getAs[Long]("time") % 100 == 0 } .keepRows { row: Row => row.getAs[Int]("id") == 3} .keepColumns("time") val groupedByInterval = tsRdd.groupByInterval(intervals) val groupedByWindows = tsRdd.addWindows(Windows.pastAbsoluteTime("1000ns")) //Temporal join functions val leftTSRdd = tsRdd.keepRows { row: Row => row.getAs[Long]("time") % 100 == 0 } .keepColumns("time", "price") val rightTSRdd = tsRdd.keepRows { row: Row => row.getAs[Long]("time") % 100 != 0 } .keepColumns("time", "id") val leftJoin = leftTSRdd.leftJoin(rightTSRdd, tolerance = "50ns") val futureLeftJoin = leftTSRdd.futureLeftJoin(rightTSRdd, tolerance = "50ns") //Summarize functions import com.twosigma.flint.timeseries.Summarizers val summarizedCycles = tsRdd.summarizeCycles(Summarizers.sum("price")) //stat.regression import breeze.linalg.DenseVector import org.apache.spark.mllib.random.RandomRDDs import com.twosigma.flint.math.stats.regression.WeightedLabeledPoint import com.twosigma.flint.math.stats.regression.OLSMultipleLinearRegression // Generate a random data set from a linear model with beta = [1.0, 2.0] and intercept = 3.0 val data = WeightedLabeledPoint.generateSampleData(spark.sparkContext, DenseVector(1.0, 2.0), 3.0) // Fit the data using the OLS linear regression. val model = OLSMultipleLinearRegression.regression(data) // Retrieve the estimate beta and intercept. val denseVector = model.estimateRegressionParameters spark.close()