Class04 Answer:

Fit LogisticRegression model to training data.

This lab was easy.


{
/* ~/sparkapps/logr10/logr12m.scala
This script should download prices and predict daily direction of GSPC.
It should generate a label which I assume to be dependent on price calculations.
A label should classify an observation as down or up. Down is 0.0, up is 1.0.
It should generate independent features from slopes of moving averages of prices.
It should create a Logistic Regression model from many years of features.
Demo:
spark-shell -i logr12m.scala
*/

import org.apache.spark.sql.SQLContext
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row
import sys.process._

// I should get prices:
"/usr/bin/curl -L ml4.herokuapp.com/csv/GSPC.csv -o /tmp/gspc.csv"!

val sqlContext = new SQLContext(sc)
  
val dp10df = sqlContext
  .read
  .format("com.databricks.spark.csv")
  .option("header","true")
  .option("inferSchema","true")
  .load("/tmp/gspc.csv")

dp10df.createOrReplaceTempView("tab")

spark.sql("SELECT COUNT(Date),MIN(Date),MAX(Date),MIN(Close),MAX(Close)FROM tab").show

// I should compute a label I can use to classify observations.

var sqls="SELECT Date,Close,LEAD(Close,1)OVER(ORDER BY Date) leadp FROM tab ORDER BY Date"

val dp11df=spark.sql(sqls);dp11df.createOrReplaceTempView("tab")

sqls="SELECT Date,Close,100*(leadp-Close)/Close pctlead FROM tab ORDER BY Date"

val dp12df=spark.sql(sqls);dp12df.createOrReplaceTempView("tab")

sqls = "SELECT Date, Close, pctlead"
sqls=sqls++",AVG(Close)OVER(ORDER BY Date ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS mavg2"
sqls=sqls++",AVG(Close)OVER(ORDER BY Date ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) AS mavg3"
sqls=sqls++",AVG(Close)OVER(ORDER BY Date ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS mavg4"
sqls=sqls++",AVG(Close)OVER(ORDER BY Date ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) AS mavg5"
sqls=sqls++",AVG(Close)OVER(ORDER BY Date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) AS mavg6"
sqls=sqls++",AVG(Close)OVER(ORDER BY Date ROWS BETWEEN 7 PRECEDING AND CURRENT ROW) AS mavg7"
sqls=sqls++",AVG(Close)OVER(ORDER BY Date ROWS BETWEEN 8 PRECEDING AND CURRENT ROW) AS mavg8"
sqls=sqls++",AVG(Close)OVER(ORDER BY Date ROWS BETWEEN 9 PRECEDING AND CURRENT ROW) AS mavg9"
sqls=sqls++" FROM tab ORDER BY Date"

val dp13df=spark.sql(sqls);dp13df.createOrReplaceTempView("tab")

sqls = "SELECT Date, Close, pctlead"
sqls=sqls++",(mavg2-LAG(mavg2,1)OVER(ORDER BY Date))/mavg2 AS slp2 "
sqls=sqls++",(mavg3-LAG(mavg3,1)OVER(ORDER BY Date))/mavg3 AS slp3 "
sqls=sqls++",(mavg4-LAG(mavg4,1)OVER(ORDER BY Date))/mavg4 AS slp4 "
sqls=sqls++",(mavg5-LAG(mavg5,1)OVER(ORDER BY Date))/mavg5 AS slp5 "
sqls=sqls++",(mavg6-LAG(mavg6,1)OVER(ORDER BY Date))/mavg6 AS slp6 "
sqls=sqls++",(mavg7-LAG(mavg7,1)OVER(ORDER BY Date))/mavg7 AS slp7 "
sqls=sqls++",(mavg8-LAG(mavg8,1)OVER(ORDER BY Date))/mavg8 AS slp8 "
sqls=sqls++",(mavg9-LAG(mavg9,1)OVER(ORDER BY Date))/mavg9 AS slp9 "
sqls=sqls++" FROM tab ORDER BY Date"

val dp14df=spark.sql(sqls);dp14df.createOrReplaceTempView("tab")

// For Class Boundry, I should get avg of pctlead over training period.

val training_period = " WHERE Date BETWEEN'1986-01-01'AND'2015-12-31' "

sqls = "SELECT AVG(pctlead) FROM tab"++training_period

val class_df = spark.sql(sqls)

val class_boundry = class_df.first()(0).asInstanceOf[Double]

// I should compute label from pctlead:

val pctlead2label = udf((pctlead:Float)=> {if (pctlead> class_boundry) 1.0 else 0.0}) 

// I should add the label to my DF of observations:

val dp15df = dp14df.withColumn("label",pctlead2label(col("pctlead")))

// I should copy slp-values into Vectors.dense():

val fill_vec = udf((
  slp2:Float
  ,slp3:Float
  ,slp4:Float
  ,slp5:Float
  ,slp6:Float
  ,slp7:Float
  ,slp8:Float
  ,slp9:Float
  )=> {Vectors.dense(
  slp2
  ,slp3
  ,slp4
  ,slp5
  ,slp6
  ,slp7
  ,slp8
  ,slp9
  )
  }
)

val dp16df = dp15df.withColumn("features"
,fill_vec(
  col("slp2")
  ,col("slp3")
  ,col("slp4")
  ,col("slp5")
  ,col("slp6")
  ,col("slp7")
  ,col("slp8")
  ,col("slp9")
  )
)

// I should create a LogisticRegression instance. This instance is an 'Estimator'.

val lr = new LogisticRegression()

lr.setMaxIter(1234).setRegParam(0.01)

// I should gather observations to learn from:
dp16df.createOrReplaceTempView("tab")

val train_df = spark.sql("SELECT * FROM tab"++training_period)

/*I should fit a LogisticRegression model to observations.
This uses the parameters stored in lr.*/
val model1 = lr.fit(train_df)
// Above line will fail with ugly error if train_df has any nulls.

// UNDER CONSTRUCTION
}

I saw something like this:


dan@h80:~/ml4/public/class04/logr10 $ 
dan@h80:~/ml4/public/class04/logr10 $ spark-shell -i logr12m.scala
Spark context Web UI available at http://192.168.1.80:4042
Spark context available as 'sc' (master = local[*], app id = local-1515736782075).
Spark session available as 'spark'.
Loading logr12m.scala...
warning: there was one deprecation warning; re-run with -deprecation for details
warning: there was one feature warning; re-run with -feature for details
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100 1252k  100 1252k    0     0  1307k      0 --:--:-- --:--:-- --:--:-- 3095k
+-----------+-------------------+-------------------+----------+----------+
|count(Date)|          min(Date)|          max(Date)|min(Close)|max(Close)|
+-----------+-------------------+-------------------+----------+----------+
|      17116|1950-01-03 00:00:00|2018-01-09 00:00:00|     16.66|2753.52002|
+-----------+-------------------+-------------------+----------+----------+


Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.1
      /_/
         
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_152)
Type in expressions to have them evaluated.
Type :help for more information.

scala> 
scala> 

Class04 Lab


learn4.us About Blog Contact Class01 Class02 Class03 Class04 Class05 Class06 Class07 Class08 Class09 Class10 dan101 Forum Google Hangout Vboxen