This lab requires some knowledge.
{
/* ~/sparkapps/logr10/logr12h.scala
This script should download prices and predict daily direction of GSPC.
It should generate a label which I assume to be dependent on price calculations.
A label should classify an observation as down or up. Down is 0.0, up is 1.0.
It should generate independent features from slopes of moving averages of prices.
It should create a Logistic Regression model from many years of features.
Demo:
spark-shell -i logr12h.scala
*/
import org.apache.spark.sql.SQLContext
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row
import sys.process._
// I should get prices:
"/usr/bin/curl -L ml4.herokuapp.com/csv/GSPC.csv -o /tmp/gspc.csv"!
val sqlContext = new SQLContext(sc)
val dp10df = sqlContext
.read
.format("com.databricks.spark.csv")
.option("header","true")
.option("inferSchema","true")
.load("/tmp/gspc.csv")
dp10df.createOrReplaceTempView("tab")
spark.sql("SELECT COUNT(Date),MIN(Date),MAX(Date),MIN(Close),MAX(Close)FROM tab").show
// I should compute a label I can use to classify observations.
var sqls="SELECT Date,Close,LEAD(Close,1)OVER(ORDER BY Date) leadp FROM tab ORDER BY Date"
val dp11df=spark.sql(sqls);dp11df.createOrReplaceTempView("tab")
sqls="SELECT Date,Close,100*(leadp-Close)/Close pctlead FROM tab ORDER BY Date"
val dp12df=spark.sql(sqls);dp12df.createOrReplaceTempView("tab")
sqls = "SELECT Date, Close, pctlead"
sqls=sqls++",AVG(Close)OVER(ORDER BY Date ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS mavg2"
sqls=sqls++",AVG(Close)OVER(ORDER BY Date ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) AS mavg3"
sqls=sqls++",AVG(Close)OVER(ORDER BY Date ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS mavg4"
sqls=sqls++",AVG(Close)OVER(ORDER BY Date ROWS BETWEEN 5 PRECEDING AND CURRENT ROW) AS mavg5"
sqls=sqls++",AVG(Close)OVER(ORDER BY Date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) AS mavg6"
sqls=sqls++",AVG(Close)OVER(ORDER BY Date ROWS BETWEEN 7 PRECEDING AND CURRENT ROW) AS mavg7"
sqls=sqls++",AVG(Close)OVER(ORDER BY Date ROWS BETWEEN 8 PRECEDING AND CURRENT ROW) AS mavg8"
sqls=sqls++",AVG(Close)OVER(ORDER BY Date ROWS BETWEEN 9 PRECEDING AND CURRENT ROW) AS mavg9"
sqls=sqls++" FROM tab ORDER BY Date"
val dp13df=spark.sql(sqls);dp13df.createOrReplaceTempView("tab")
sqls = "SELECT Date, Close, pctlead"
sqls=sqls++",(mavg2-LAG(mavg2,1)OVER(ORDER BY Date))/mavg2 AS slp2 "
sqls=sqls++",(mavg3-LAG(mavg3,1)OVER(ORDER BY Date))/mavg3 AS slp3 "
sqls=sqls++",(mavg4-LAG(mavg4,1)OVER(ORDER BY Date))/mavg4 AS slp4 "
sqls=sqls++",(mavg5-LAG(mavg5,1)OVER(ORDER BY Date))/mavg5 AS slp5 "
sqls=sqls++",(mavg6-LAG(mavg6,1)OVER(ORDER BY Date))/mavg6 AS slp6 "
sqls=sqls++",(mavg7-LAG(mavg7,1)OVER(ORDER BY Date))/mavg7 AS slp7 "
sqls=sqls++",(mavg8-LAG(mavg8,1)OVER(ORDER BY Date))/mavg8 AS slp8 "
sqls=sqls++",(mavg9-LAG(mavg9,1)OVER(ORDER BY Date))/mavg9 AS slp9 "
sqls=sqls++" FROM tab ORDER BY Date"
val dp14df=spark.sql(sqls);dp14df.createOrReplaceTempView("tab")
// For Class Boundry, I should get avg of pctlead over training period.
val training_period = " WHERE Date BETWEEN'1986-01-01'AND'2015-12-31' "
sqls = "SELECT AVG(pctlead) FROM tab"++training_period
val class_df = spark.sql(sqls)
val class_boundry = class_df.first()(0).asInstanceOf[Double]
class_df.show
println("class_boundry: ")
println(class_boundry)
// UNDER CONSTRUCTION
}
I saw something like this:
dan@h80:~/ml4/public/class04/logr10 $ spark-shell -i logr12h.scala
Spark context Web UI available at http://192.168.1.80:4042
Spark context available as 'sc' (master = local[*], app id = local-1515735392841).
Spark session available as 'spark'.
Loading logr12h.scala...
warning: there was one deprecation warning; re-run with -deprecation for details
warning: there was one feature warning; re-run with -feature for details
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0
0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0
0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0
100 1252k 100 1252k 0 0 1307k 0 --:--:-- --:--:-- --:--:-- 6455k
+-----------+-------------------+-------------------+----------+----------+
|count(Date)| min(Date)| max(Date)|min(Close)|max(Close)|
+-----------+-------------------+-------------------+----------+----------+
| 17116|1950-01-03 00:00:00|2018-01-09 00:00:00| 16.66|2753.52002|
+-----------+-------------------+-------------------+----------+----------+
+------------------+
| avg(pctlead)|
+------------------+
|0.0368842385145014|
+------------------+
class_boundry:
0.0368842385145014
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.2.1
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_152)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
scala>