MMLib - Prediction of Stock Prices using Financial KPIs

Financial KPIs can be used to drive investment decisions. So it was my goal to create a comprehensive set of KPIs across different dimensions. In this document we will use EDGAR to calculate KPIs to measure the following dimensions of a reporting company

  • Profitability
  • Liquidity
  • Efficiency
  • Innovation
  • Growth
  • Leadereship
  • Surprises

It is the expectation that the stock price of companies with better KPIs will grow faster than their competitors: So in this document we will evaluate the KPIs against the stock prices. The solution is based on the following functionality

Setup

In install the following libraries with the help of Maven

In [20]:
%classpath config resolver maven-public http://software.pschatzmann.ch/repository/maven-public/
%%classpath add mvn 
ch.pschatzmann:smart-edgar:LATEST
ch.pschatzmann:jupyter-jdk-extensions:LATEST
ch.pschatzmann:investor:LATEST
org.apache.spark:spark-sql_2.11:2.3.2
org.apache.spark:spark-mllib_2.11:2.3.2
Added new repo: maven-public
In [21]:
import ch.pschatzmann.edgar.reporting.company._
import ch.pschatzmann.display.Displayers
import ch.pschatzmann.stocks._
import ch.pschatzmann.stocks.ta4j.indicator._
import ch.pschatzmann.display.Table
import org.ta4j.core.indicators.helpers._
import org.ta4j.core.indicators._
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer,VectorAssembler,Imputer}
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.types.DoubleType

Displayers.setup
Out[21]:
true

Financial KPIs

In my last Blog I have described how to calculate financial KPS with the help of Smart EDGAR. Here is the result:

In [22]:
val ticker = "AAPL"
Out[22]:
AAPL
In [23]:
val kpi = new CompanyEdgarValuesDB(new CompanySelection().setTradingSymbol(ticker))
    .setUseArrayList(true)
    .setAddTime(true)
    .setFilter(new FilterQuarterlyCumulated())
    .setParameterNames("NetIncomeLoss","OperatingIncomeLoss","ResearchAndDevelopmentExpense","OperatingExpenses","CostsAndExpenses",
        "CashAndCashEquivalentsAtCarryingValue","AvailableForSaleSecuritiesCurrent","AccountsReceivableNetCurrent",
        "Revenues","SalesRevenueNet","SalesRevenueGoodsNet","InventoryNet","AssetsCurrent",
        "LiabilitiesCurrent","Assets","EarningsPerShareBasic","MarketableSecuritiesCurrent",
        "StockholdersEquity", "Assets","Liabilities","ProfitLoss","GrossProfit", "AccountsReceivableNetCurrent",
        "NetCashProvidedByUsedInOperatingActivities","NetCashProvidedByUsedInOperatingActivitiesContinuingOperations","CostOfGoodsSold",
        "CostOfGoodsAndServicesSold","CostOfRevenue", "SalesRevenueNet")
//  Handling of alternatives / renames
    .addFormula("SalesRevenue","Edgar.coalesce('SalesRevenueNet', 'SalesRevenueGoodsNet','Revenues')")
    .addFormula("TotalRevenue","Edgar.coalesce('Revenues', 'SalesRevenueNet')")
    .addFormula("OperatingCashFlow","Edgar.coalesce('NetCashProvidedByUsedInOperatingActivities','NetCashProvidedByUsedInOperatingActivitiesContinuingOperations')")
    .addFormula("COGS","Edgar.coalesce('CostOfGoodsSold', 'CostOfGoodsAndServicesSold','CostOfRevenue')")
    .addFormula("Profit","Edgar.coalesce('ProfitLoss', 'GrossProfit', 'NetIncomeLoss')")
//  Profititbility
    .addFormula("ExpensesOp","Edgar.coalesce('OperatingExpenses', 'CostsAndExpenses')")
    .addFormula("GrossProfitMargin","(TotalRevenue - COGS) / TotalRevenue")
    .addFormula("OperatingProfitMargin", "OperatingIncomeLoss / SalesRevenue")
    .addFormula("OperatingExpensesRatio","ExpensesOp / TotalRevenue ")
    .addFormula("ReturnonEquity","NetIncomeLoss / StockholdersEquity")
    .addFormula("TotalCapitalEmployed","Assets - LiabilitiesCurrent")
    .addFormula("OperatingCashFlowCapitalRatio","OperatingCashFlow / TotalCapitalEmployed")
    .addFormula("NetProfitMargin","Profit / TotalRevenue")
    .addFormula("DebtToEquityRatio","Liabilities / StockholdersEquity")
    .addFormula("EarningsPerShare","Edgar.coalesce('EarningsPerShareBasic')")
//  Liquidity
    .addFormula("ShortTermInvestements","Edgar.coalesce('AvailableForSaleSecuritiesCurrent','MarketableSecuritiesCurrent')")
    .addFormula("CurrentRatio","AssetsCurrent / LiabilitiesCurrent")
    .addFormula("QuickRatio","(CashAndCashEquivalentsAtCarryingValue + AccountsReceivableNetCurrent + ShortTermInvestements) / LiabilitiesCurrent")
    .addFormula("WorkingCapital","Assets - Liabilities")
//  Efficiency
    .addFormula("InventoryTurnover","SalesRevenue / InventoryNet")
    .addFormula("AccountsReceivableTurnoverRatio", "SalesRevenue / AccountsReceivableNetCurrent")
    .addFormula("ReturnOnAssets", "NetIncomeLoss / Assets")
    .addFormula("SalesWorkingCapitalRatio","SalesRevenue / WorkingCapital")
//  Innovation
    .addFormula("SalesResearchRatio","ResearchAndDevelopmentExpense / SalesRevenue * 100")
    .addFormula("NetIncomeResearchRatio","ResearchAndDevelopmentExpense / NetIncomeLoss * 100")
//  Growth
    .addFormula("SalesRevenueChange", "Edgar.percentChange('SalesRevenue')" )  
    .addFormula("NetIncomeChange", "Edgar.percentChange('NetIncomeLoss')" )  
    .addFormula("EarningsPerShareChange", "Edgar.percentChange('EarningsPerShare')" )  
    .addFormula("ResearchAndDevelopmentChange","Edgar.percentChange('ResearchAndDevelopmentExpense')" )
//  Surprises
    .addFormula("NetIncomeSurprise","Edgar.surprisePercent('NetIncomeLoss')")
    .addFormula("RevenueSurprise","Edgar.surprisePercent('SalesRevenue')")
//  Leadereship
    .addFormula("MarketShareOfYear","Edgar.marketShare()")
    .addFormula("MarketShareChange", "Edgar.percentChange('MarketShareOfYear')" )  
    .removeParameterNames("Assets","AssetsCurrent","AccountsReceivableNetCurrent","AvailableForSaleSecuritiesCurrent",
        "CashAndCashEquivalentsAtCarryingValue","Revenues","SalesRevenueNet",
        "SalesRevenueGoodsNet","CostOfGoodsSold","InventoryNet","Liabilities","LiabilitiesCurrent",
        "NetIncomeLoss", "OperatingExpenses", "OperatingIncomeLoss","ResearchAndDevelopmentExpense",
        "Profit", "COGS" ,"TotalRevenue", "SalesRevenue", "StockholdersEquity", 
        "WorkingCapital", "TotalCapitalEmployed", "OperatingCashFlow",
        "NetCashProvidedByUsedInOperatingActivities","NetCashProvidedByUsedInOperatingActivitiesContinuingOperations",
        "CostOfGoodsAndServicesSold","ProfitLoss","GrossProfit","EarningsPerShareBasic",
        "CostOfRevenue","CostsAndExpenses","ExpensesOp","MarketableSecuritiesCurrent","ShortTermInvestements")


var table = kpi.getTable

Impact on Stock Prices

The EDGAR filing is done quarterly. However we have stock prices for each trading day. In order to bridge the gap of the reporting frequency we will take the quarterly moving average (which is roughly 62 working days).

We want to predict if the stock price is going up or down. Therfore we use the OffsetIndicator to move the future to the current date and calculate the percent difference.

In [24]:
var series = Context.getStockData(ticker).toTimeSeries
var closePrice = new ClosePriceIndicator(series);

// Getting a longer SMA (e.g. over 1 quarter of ticks)
var current = new SMAIndicator(closePrice, 62);
var future = new OffsetIndicator(current, -62);
var forecast = new PercentChangeIndicator(current, future)

Table.create(forecast, future, current)
In [25]:
var historicValues = forecast.toHistoricValues

def classifyChange(value:Double):String = {
    var historicValues = forecast.toHistoricValues
    var result = "neutral"
    if (value >= 0.05) {
        result = "positive"
    } else  if (value <= -0.05) {
        result = "negative"        
    }
    return result
}

def getValue(date:String):Double = {
    return historicValues.getNextValue((date).asInstanceOf[String]).getValue
}

def classifyDate(date:Option[Any]): String = {
    val value = getValue(date.get.toString)
    return classifyChange(value)
}
Out[25]:
historicValues: ch.pschatzmann.stocks.integration.HistoricValues = ch.pschatzmann.stocks.integration.HistoricValues@553675e5
classifyChange: (value: Double)String
getValue: (date: String)Double
classifyDate: (date: Option[Any])String
In [26]:
classifyChange(0)
Out[26]:
neutral
In [27]:
classifyChange(0.1)
Out[27]:
positive
In [28]:
classifyChange(0.01)
Out[28]:
neutral
In [29]:
classifyChange(-0.1)
Out[29]:
negative
In [30]:
classifyDate(Option("2017-04-01"))
Out[30]:
negative

We have the prices for all dates but we want to keep only the values which are relevant at the quarterly filing date and add them to our KPI data. Unfortunatly we have filings at dates where there is not trading, so we need to take the next available value. This can be done with the getNextValue method.:

In [31]:
import scala.collection.JavaConverters._

var historicValues = forecast.toHistoricValues
var resultList = table.toList.asScala
    .map(map => map.asScala.asInstanceOf[scala.collection.mutable.Map[String,Any]])  
    .map(map => map -= ("companyName","sicDescription", "identifier"))
    .map(map => map += ("label" -> classifyDate(map.get("date"))))

s"Number of records: ${table.getRowCount} / ${resultList.size}"
Out[31]:
Number of records: 42 / 42
In [32]:
val table = Table.create(resultList.slice(2,resultList.size-1).map(m => m.asJava).asJava)

Machine Learning

We will use Spark MLLib for the implementatio of our Machine Learning. First we need to start a spark session.

In [33]:
val spark = SparkSession.builder()
                        .appName("EDGAR KPIs")
                        .master("local[*]")
                        .config("spark.ui.enabled", "false")
                        .getOrCreate()
Out[33]:
org.apache.spark.sql.SparkSession@31a9f196

We load the data into Spark: The easiest way is to go via a CSV String...

In [34]:
import spark.implicits._

val csvData: Dataset[String] = spark.sparkContext.parallelize(table.toCSV.stripMargin.lines.toList).toDS()
val data = spark.read.option("header", true).option("inferSchema",true).csv(csvData)

data.printSchema()
root
 |-- AccountsReceivableTurnoverRatio: double (nullable = true)
 |-- CurrentRatio: double (nullable = true)
 |-- DebtToEquityRatio: double (nullable = true)
 |-- EarningsPerShare: double (nullable = true)
 |-- EarningsPerShareChange: double (nullable = true)
 |-- GrossProfitMargin: double (nullable = true)
 |-- InventoryTurnover: double (nullable = true)
 |-- MarketShareChange: double (nullable = true)
 |-- MarketShareOfYear: double (nullable = true)
 |-- NetIncomeChange: double (nullable = true)
 |-- NetIncomeResearchRatio: double (nullable = true)
 |-- NetIncomeSurprise: double (nullable = true)
 |-- NetProfitMargin: double (nullable = true)
 |-- OperatingCashFlowCapitalRatio: double (nullable = true)
 |-- OperatingExpensesRatio: double (nullable = true)
 |-- OperatingProfitMargin: double (nullable = true)
 |-- QuickRatio: double (nullable = true)
 |-- ResearchAndDevelopmentChange: double (nullable = true)
 |-- ReturnOnAssets: double (nullable = true)
 |-- ReturnonEquity: double (nullable = true)
 |-- RevenueSurprise: double (nullable = true)
 |-- SalesResearchRatio: double (nullable = true)
 |-- SalesRevenueChange: double (nullable = true)
 |-- SalesWorkingCapitalRatio: double (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- incorporation: string (nullable = true)
 |-- label: string (nullable = true)
 |-- location: string (nullable = true)
 |-- tradingSymbol: string (nullable = true)

Out[34]:
org.apache.spark.sql.SparkSession$implicits$@170d25b9

All double values of our dataset are valid KPIs that can be used by our machine learning. This logic helps us to exclude the parameters which are not available: FB is not reporting any inventory so there is no InventoryTurnover KPI and the standard Spark import results in a String attribute.

In [35]:
val kpis = data.schema.filter(sf => sf.dataType==DoubleType).map(sf => sf.name).toArray
Out[35]:
[AccountsReceivableTurnoverRatio, CurrentRatio, DebtToEquityRatio, EarningsPerShare, EarningsPerShareChange, GrossProfitMargin, InventoryTurnover, MarketShareChange, MarketShareOfYear, NetIncomeChange, NetIncomeResearchRatio, NetIncomeSurprise, NetProfitMargin, OperatingCashFlowCapitalRatio, OperatingExpensesRatio, OperatingProfitMargin, QuickRatio, ResearchAndDevelopmentChange, ReturnOnAssets, ReturnonEquity, RevenueSurprise, SalesResearchRatio, SalesRevenueChange, SalesWorkingCapitalRatio]

We also need to handle the situation that not all KPI values are available: We impute the missing values and split our data into a training and a test set

In [36]:
val imputer = new Imputer()
  .setInputCols(kpis)
  .setOutputCols(kpis)

val imputerModel = imputer.fit(data)
val dataImputed = imputerModel.transform(data)

// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = dataImputed.randomSplit(Array(0.7, 0.3))
Out[36]:
[AccountsReceivableTurnoverRatio: double, CurrentRatio: double ... 27 more fields]

Maschine Learning Pipeline

We define the machine learning pipline and train the model.

  • The featured need to be assembled in a vector
  • We define a StringIndexer and IndexToString for the label
  • We use a RandomForestClassifier
In [37]:
val assembler = new VectorAssembler()
  .setInputCols(kpis)
  .setOutputCol("features")

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
val labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(data)

// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labels)

// Train a GBT model.
val model = new RandomForestClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("features")
  .setNumTrees(20)

// Chain indexer and GBT in a Pipeline.
val pipeline = new Pipeline()
  .setStages(Array(labelIndexer,assembler, model, labelConverter))

// Train model. This also runs the indexer.
val trainedModel = pipeline.fit(trainingData)
Out[37]:
pipeline_baf2494ac621
In [38]:
// We calculate the Accuracy over the training data
val predictionsTrain = trainedModel.transform(trainingData)
val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictionsTrain)

s"Accuracy on training data = $accuracy"
Out[38]:
Accuracy on training data = 1.0

 Evaluate the Model

We evaluate the model by calculating the Accuracy for the test data

In [39]:
// Make predictions.
val predictions = trainedModel.transform(testData)

// Select example rows to display.
predictions.select("predictedLabel", "label").show()

// Select (prediction, true label) and compute test error.
val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)

s"Accuracy for test data = $accuracy"
+--------------+--------+
|predictedLabel|   label|
+--------------+--------+
|      negative|negative|
|       neutral|negative|
|       neutral| neutral|
|      negative|positive|
|      negative|negative|
|      negative|negative|
|      negative| neutral|
|      negative|negative|
|      negative|positive|
|      negative|negative|
|      negative|negative|
+--------------+--------+

Out[39]:
Accuracy for test data = 0.6363636363636364

It seems that the amout of data from one company is not sufficient to give a stable reliable result and the accuracy dependes very much on the splitting of the dataset into training and test data.

In the next blog I will extend the logic to use many ticker symbols....

In [ ]: