Breaking News

Bicycle Sharing Demand Prediction Using Apache Spark and Scala


Final project code + output Bicycle Sharing Demand Prediction Using Apache Spark and Scala for certification.

run the  each machine learning  and notice the accuracy of the each machine learning.


import org.apache.spark.rdd.RDD

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.util.IntParam
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import org.apache.spark.sql._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.log4j._
import org.apache.spark.sql.functions.to_timestamp
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.OneHotEncoder
import org.apache.spark.ml.regression.{GBTRegressionModel, GBTRegressor}
import org.apache.spark.ml.regression.DecisionTreeRegressor
import org.apache.spark.ml.regression.RandomForestRegressor
import org.apache.spark.rdd.RDD import org.apache.spark.ml.Pipeline import org.apache.spark.ml.feature.StringIndexer import org.apache.spark.ml.feature.VectorAssembler import org.apache.spark.util.IntParam import org.apache.spark.sql.SQLContext import org.apache.spark.sql.functions._ import org.apache.spark.sql._ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types._ import org.apache.log4j._ import org.apache.spark.sql.functions.to_timestamp import org.apache.spark.ml.regression.LinearRegression import org.apache.spark.ml.evaluation.RegressionEvaluator import org.apache.spark.ml.feature.OneHotEncoder import org.apache.spark.ml.regression.{GBTRegressionModel, GBTRegressor} import org.apache.spark.ml.regression.DecisionTreeRegressor

Data Exploration and Transformation

1. Read dataset in Spark

val trainDF = spark.read.format("csv").option("inferSchema",true).option("header",true).load("/FileStore/tables/edureka/train.csv")
trainDF.show(10)
+----------------+------+-------+----------+-------+-----+------+--------+---------+------+----------+-----+ | datetime|season|holiday|workingday|weather| temp| atemp|humidity|windspeed|casual|registered|count| +----------------+------+-------+----------+-------+-----+------+--------+---------+------+----------+-----+ |01-01-2011 00:00| 1| 0| 0| 1| 9.84|14.395| 81| 0.0| 3| 13| 16| |01-01-2011 01:00| 1| 0| 0| 1| 9.02|13.635| 80| 0.0| 8| 32| 40| |01-01-2011 02:00| 1| 0| 0| 1| 9.02|13.635| 80| 0.0| 5| 27| 32| |01-01-2011 03:00| 1| 0| 0| 1| 9.84|14.395| 75| 0.0| 3| 10| 13| |01-01-2011 04:00| 1| 0| 0| 1| 9.84|14.395| 75| 0.0| 0| 1| 1| |01-01-2011 05:00| 1| 0| 0| 2| 9.84| 12.88| 75| 6.0032| 0| 1| 1| |01-01-2011 06:00| 1| 0| 0| 1| 9.02|13.635| 80| 0.0| 2| 0| 2| |01-01-2011 07:00| 1| 0| 0| 1| 8.2| 12.88| 86| 0.0| 1| 2| 3| |01-01-2011 08:00| 1| 0| 0| 1| 9.84|14.395| 75| 0.0| 1| 7| 8| |01-01-2011 09:00| 1| 0| 0| 1|13.12|17.425| 76| 0.0| 8| 6| 14| +----------------+------+-------+----------+-------+-----+------+--------+---------+------+----------+-----+ only showing top 10 rows trainDF: org.apache.spark.sql.DataFrame = [datetime: string, season: int ... 10 more fields]

2.Get summary of data and variable types

trainDF.printSchema
root |-- datetime: string (nullable = true) |-- season: integer (nullable = true) |-- holiday: integer (nullable = true) |-- workingday: integer (nullable = true) |-- weather: integer (nullable = true) |-- temp: double (nullable = true) |-- atemp: double (nullable = true) |-- humidity: integer (nullable = true) |-- windspeed: double (nullable = true) |-- casual: integer (nullable = true) |-- registered: integer (nullable = true) |-- count: integer (nullable = true)
display(trainDF.describe())
 
summary
datetime
season
holiday
workingday
weather
temp
atemp
humidity
windspeed
1
2
3
4
5
count
10886
10886
10886
10886
10886
10886
10886
10886
10886
mean
null
2.5066139996325556
0.02856880396839978
0.6808745177291935
1.418427337865148
20.230859819952173
23.65508405291192
61.88645967297446
12.799395406945093
stddev
null
1.1161743093443237
0.16659885062470944
0.4661591687997361
0.6338385858190968
7.791589843987573
8.47460062648494
19.245033277394704
8.16453732683871
min
01-01-2011 00:00
1
0
0
1
0.82
0.76
0
0.0
max
19-12-2012 23:00
4
1
1
4
41.0
45.455
100
56.9969

Showing all 5 rows.

3.Decide which columns should be categorical and then convert them accordingly

//Cheking unique value In each column
val exprs = trainDF.schema.fields.filter(x => x.dataType != StringType).map(x=>x.name ->"approx_count_distinct").toMap
//data.agg(exprs).show(false)
exprs: scala.collection.immutable.Map[String,String] = Map(workingday -> approx_count_distinct, windspeed -> approx_count_distinct, registered -> approx_count_distinct, count -> approx_count_distinct, atemp -> approx_count_distinct, season -> approx_count_distinct, casual -> approx_count_distinct, humidity -> approx_count_distinct, temp -> approx_count_distinct, holiday -> approx_count_distinct, weather -> approx_count_distinct)
display(trainDF.agg(exprs))
 
approx_count_distinct(workingday)
approx_count_distinct(windspeed)
approx_count_distinct(registered)
approx_count_distinct(count)
approx_count_distinct(atemp)
approx_count_distinct(season)
1
2
27
726
802
60
4

Showing all 1 rows.

//so we are considering "workingday,holiday,season, and wether column" as a categorical column and we are applying onehotencoder on column with values > 2
val indexer = Array("season","weather").map(c=>new OneHotEncoder().setInputCol(c).setOutputCol(c + "_Vec"))
val pipeline = new Pipeline().setStages(indexer)
val df_r = pipeline.fit(trainDF).transform(trainDF).drop("season","weather")
command-2871561650106197:2: warning: class OneHotEncoder in package feature is deprecated: `OneHotEncoderEstimator` will be renamed `OneHotEncoder` and this `OneHotEncoder` will be removed in 3.0.0. val indexer = Array("season","weather").map(c=>new OneHotEncoder().setInputCol(c).setOutputCol(c + "_Vec")) ^ indexer: Array[org.apache.spark.ml.feature.OneHotEncoder] = Array(oneHot_8f2a7b7ac2df, oneHot_8b19ed4e6d57) pipeline: org.apache.spark.ml.Pipeline = pipeline_34ad2132a3f2 df_r: org.apache.spark.sql.DataFrame = [datetime: string, holiday: int ... 10 more fields]
df_r.show(5)
+----------------+-------+----------+----+------+--------+---------+------+----------+-----+-------------+-------------+ | datetime|holiday|workingday|temp| atemp|humidity|windspeed|casual|registered|count| season_Vec| weather_Vec| +----------------+-------+----------+----+------+--------+---------+------+----------+-----+-------------+-------------+ |01-01-2011 00:00| 0| 0|9.84|14.395| 81| 0.0| 3| 13| 16|(4,[1],[1.0])|(4,[1],[1.0])| |01-01-2011 01:00| 0| 0|9.02|13.635| 80| 0.0| 8| 32| 40|(4,[1],[1.0])|(4,[1],[1.0])| |01-01-2011 02:00| 0| 0|9.02|13.635| 80| 0.0| 5| 27| 32|(4,[1],[1.0])|(4,[1],[1.0])| |01-01-2011 03:00| 0| 0|9.84|14.395| 75| 0.0| 3| 10| 13|(4,[1],[1.0])|(4,[1],[1.0])| |01-01-2011 04:00| 0| 0|9.84|14.395| 75| 0.0| 0| 1| 1|(4,[1],[1.0])|(4,[1],[1.0])| +----------------+-------+----------+----+------+--------+---------+------+----------+-----+-------------+-------------+ only showing top 5 rows

4.Check for any missing value in data set and treat it

trainDF.select(trainDF.columns.map(c => sum(col(c).isNull.cast("int")).alias(c)): _*).show
+--------+------+-------+----------+-------+----+-----+--------+---------+------+----------+-----+ |datetime|season|holiday|workingday|weather|temp|atemp|humidity|windspeed|casual|registered|count| +--------+------+-------+----------+-------+----+-----+--------+---------+------+----------+-----+ | 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| +--------+------+-------+----------+-------+----+-----+--------+---------+------+----------+-----+

5.Explode season column into separate columns such as season_and drop season

6.Execute the same for weather as weather_ and drop weather

//Ans : We don’t need to explode season column and weather column because previously I already handle categorical column with values > 2 by applying onehotencoder.

7. Split datetime in to meaning columns such as hour, day, month, year, etc.

//Converting datetime string column to timestamp column
val df_time = df_r.withColumn("datetime", to_timestamp(col("datetime"),"d-M-y H:m"))
 
//Now Spliting date time into meaning columns such as year,month,day,hour
val datetime_trainDF = df_time.
withColumn("year", year(col("datetime"))).
withColumn("month", month(col("datetime"))).
withColumn("day", dayofmonth(col("datetime"))).
withColumn("hour", hour(col("datetime"))).
withColumn("minute",minute(col("datetime")))
df_time: org.apache.spark.sql.DataFrame = [datetime: timestamp, holiday: int ... 10 more fields] datetime_trainDF: org.apache.spark.sql.DataFrame = [datetime: timestamp, holiday: int ... 15 more fields]

8.Explore how count varies with different features such as hour,month,etc

datetime_trainDF.groupBy("year").count.show()
datetime_trainDF.groupBy("month").count.show()
datetime_trainDF.groupBy("day").count.show()
datetime_trainDF.groupBy("hour").count.show()
datetime_trainDF.groupBy("minute").count.show()
+----+-----+ |year|count| +----+-----+ |2012| 5464| |2011| 5422| +----+-----+ +-----+-----+ |month|count| +-----+-----+ | 12| 912| | 1| 884| | 6| 912| | 3| 901| | 5| 912| | 9| 909| | 4| 909| | 8| 912| | 7| 912| | 10| 911| | 11| 911| | 2| 901| +-----+-----+ +---+-----+ |day|count| +---+-----+ | 12| 573| | 1| 575| | 13| 574| | 6| 572| | 16| 574| | 3| 573| | 5| 575| | 19| 574| | 15| 574| | 9| 575| | 17| 575| | 4| 574| | 8| 574| | 7| 574| | 10| 572| | 11| 568| | 14| 574| | 2| 573| | 18| 563| +---+-----+ +----+-----+ |hour|count| +----+-----+ | 12| 456| | 22| 456| | 1| 454| | 13| 456| | 6| 455| | 16| 456| | 3| 433| | 20| 456| | 5| 452| | 19| 456| | 15| 456| | 9| 455| | 17| 456| | 4| 442| | 8| 455| | 23| 456| | 7| 455| | 10| 455| | 21| 456| | 11| 455| +----+-----+ only showing top 20 rows +------+-----+ |minute|count| +------+-----+ | 0|10886| +------+-----+

Model Development

1.Split the data set into train and train_test

val splitSeed = 123
val Array(train,train_test) = datetime_trainDF.randomSplit(Array(0.7,0.3),splitSeed)
splitSeed: Int = 123 train: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [datetime: timestamp, holiday: int ... 15 more fields] train_test: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [datetime: timestamp, holiday: int ... 15 more fields]

2. Try different regression algorithms such as linear regression, random forest, etc. and note accuracy

//Generate Feature Column
val feature = Array("holiday","workingday","temp","atemp","humidity","windspeed","season_Vec","weather_Vec","year","month","day","hour","minute")
//Assemble Feature Column
val assembler = new VectorAssembler().setInputCols(feature).setOutputCol("features")
feature: Array[String] = Array(holiday, workingday, temp, atemp, humidity, windspeed, season_Vec, weather_Vec, year, month, day, hour, minute) assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_8de8dc426983

Linear Regression Model

//Model Building
val lr = new LinearRegression().setLabelCol("count").setFeaturesCol("features")
 
//Creating Pipeline
val pipeline = new Pipeline().setStages(Array(assembler,lr))
 
//Training Model
val lrModel = pipeline.fit(train)
val predictions = lrModel.transform(train_test)
 
//Model Summary
val evaluator = new RegressionEvaluator().setLabelCol("count").setPredictionCol("prediction").setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println("Linear Regression Root Mean Squared Error (RMSE) on train_test data = " + rmse)
Linear Regression Root Mean Squared Error (RMSE) on train_test data = 136.5774373860326 lr: org.apache.spark.ml.regression.LinearRegression = linReg_1ea40cad23bb pipeline: org.apache.spark.ml.Pipeline = pipeline_f2ddbe3d23c6 lrModel: org.apache.spark.ml.PipelineModel = pipeline_f2ddbe3d23c6 predictions: org.apache.spark.sql.DataFrame = [datetime: timestamp, holiday: int ... 17 more fields] evaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_2f1ee7361559 rmse: Double = 136.5774373860326

GBT Regressor

//Model Building
val gbt = new GBTRegressor().setLabelCol("count").setFeaturesCol("features")
 
//Creating pipeline
val pipeline = new Pipeline().setStages(Array(assembler,gbt))
 
//Training Model
val gbtModel = pipeline.fit(train)
val predictions = gbtModel.transform(train_test)
 
//Model Summary
val evaluator = new RegressionEvaluator().setLabelCol("count").setPredictionCol("prediction").setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println("GBT Regressor Root Mean Squared Error (RMSE) on train_test data = " + rmse)
GBT Regressor Root Mean Squared Error (RMSE) on train_test data = 59.3978579039326 gbt: org.apache.spark.ml.regression.GBTRegressor = gbtr_e579397772d8 pipeline: org.apache.spark.ml.Pipeline = pipeline_e9de176b1b8e gbtModel: org.apache.spark.ml.PipelineModel = pipeline_e9de176b1b8e predictions: org.apache.spark.sql.DataFrame = [datetime: timestamp, holiday: int ... 17 more fields] evaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_0359461a0553 rmse: Double = 59.3978579039326

Decision Tree Regressor

//Model Building
val dt = new DecisionTreeRegressor().setLabelCol("count").setFeaturesCol("features")
 
//Creating Pipeline
val pipeline = new Pipeline().setStages(Array(assembler,dt))
 
//Training Model
val dtModel = pipeline.fit(train)
val predictions = dtModel.transform(train_test)
 
//Model Summary
val evaluator = new RegressionEvaluator().setLabelCol("count").setPredictionCol("prediction").setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println("Decision Tree Regressor Root Mean Squared Error (RMSE) on train_test data = " + rmse)
Decision Tree Regressor Root Mean Squared Error (RMSE) on train_test data = 105.71823914900725 dt: org.apache.spark.ml.regression.DecisionTreeRegressor = dtr_c6a610ffbe6f pipeline: org.apache.spark.ml.Pipeline = pipeline_88e50475cb10 dtModel: org.apache.spark.ml.PipelineModel = pipeline_88e50475cb10 predictions: org.apache.spark.sql.DataFrame = [datetime: timestamp, holiday: int ... 17 more fields] evaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_eb9af058ad63 rmse: Double = 105.71823914900725

Random Forest Regressor

//Model Building
val rf = new RandomForestRegressor().setLabelCol("count").setFeaturesCol("features")
 
//Creating Pipeline
val pipeline = new Pipeline().setStages(Array(assembler,rf))
 
//Training Model
val rfModel = pipeline.fit(train)
val predictions = rfModel.transform(train_test)
 
//Model Summary
val evaluator = new RegressionEvaluator().setLabelCol("count").setPredictionCol("prediction").setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println("Random Forest Regressor Root Mean Squared Error (RMSE) on train_test data = " + rmse)
Random Forest Regressor Root Mean Squared Error (RMSE) on train_test data = 108.49954488266827 rf: org.apache.spark.ml.regression.RandomForestRegressor = rfr_ab681154d918 pipeline: org.apache.spark.ml.Pipeline = pipeline_b61910bcfa69 rfModel: org.apache.spark.ml.PipelineModel = pipeline_b61910bcfa69 predictions: org.apache.spark.sql.DataFrame = [datetime: timestamp, holiday: int ... 17 more fields] evaluator: org.apache.spark.ml.evaluation.RegressionEvaluator = regEval_607a2e7dc8bf rmse: Double = 108.49954488266827

3.Select the best model and persistit

//So as we try diferent Regression Alorithms and found that "GBT Regressor Model" is giving better accuracy compare to other.
//gbtModel.write.overwrite().save("/FileStore/tables/model/bicycle-model")

Model Implementation and Prediction

Application Development for Model Generation

  1. Clean and Transform the data

  2. Develop the model and persist it.

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.ml.regression.{GBTRegressionModel, GBTRegressor}
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml._
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.OneHotEncoder
 
object BicyclePredict{
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("kapil")
    val sc = new SparkContext(sparkConf)
    
    sc.setLogLevel("ERROR")
 
    val spark = new org.apache.spark.sql.SQLContext(sc)
    import spark.implicits._
    
    println("Reading training data...................")
    
    val trainDF = spark.read.format("csv").option("inferSchema",true).option("header",true).load("/FileStore/tables/edureka/train.csv")
    
    println("Cleaning data.................")
    
    //Converting datetime string column to timestamp column
    val df_time = trainDF.withColumn("datetime", to_timestamp(col("datetime"),"d-M-y H:m"))
 
    //Now Spliting date time into meaning columns such as year,month,day,hour
    val datetime_trainDF = df_time.
    withColumn("year", year(col("datetime"))).
    withColumn("month", month(col("datetime"))).
    withColumn("day", dayofmonth(col("datetime"))).
    withColumn("hour", hour(col("datetime"))).
    withColumn("minute",minute(col("datetime")))   
    
    //Onehot encoding on season and weather column.
    val indexer = Array("season","weather").map(c=>new OneHotEncoder().setInputCol(c).setOutputCol(c + "_Vec"))
    val pipeline = new Pipeline().setStages(indexer)
    val df_r = pipeline.fit(datetime_trainDF).transform(datetime_trainDF)
    
    //split data into train test
    val splitSeed =123
    val Array(train, train_test) = df_r.randomSplit(Array(0.7, 0.3), splitSeed)
    
    //Generate Feature Column
    val feature_cols = Array("holiday","workingday","temp","atemp","humidity","windspeed","season_Vec","weather_Vec","year","month","day","hour","minute")
    
    //Assemble Feature
    val assembler = new VectorAssembler().setInputCols(feature_cols).setOutputCol("features")
    
    //Model Building
    val gbt = new GBTRegressor().setLabelCol("count").setFeaturesCol("features")
    
    val pipeline2 = new Pipeline().setStages(Array(assembler,gbt))
    
    println("Training model................")
    val gbt_model = pipeline2.fit(train)
    val predictions = gbt_model.transform(train_test)
    
    val evaluator = new RegressionEvaluator().setLabelCol("count").setPredictionCol("prediction").setMetricName("rmse")
    val rmse = evaluator.evaluate(predictions)
    println("GBT Regressor Root Mean Squared Error (RMSE) on train_test data = " + rmse)
  
    println("Persisting the model................")
    gbt_model.write.overwrite().save("/FileStore/tables/model/bicycle-model")
  }
}
//Application Execution
spark2-submit --class "BicyclePredict" --master yarn /mnt/home/edureka_836462/BicycleProject/BicycleTrain/target/scala-2.11/bicycletrain_2.11-1.0.jar

Application Development for Demand Prediction

Model Prediction Application – Write an application to predict the bike demand based on the input dataset from HDFS:

  1. Load the persisted model.

  2. Predict bike demand

  3. Persist the result to RDBMS

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.ml.regression.{GBTRegressionModel, GBTRegressor}
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml._
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.OneHotEncoder
 
object BicyclePredict {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("Telecom")
    val sc = new SparkContext(sparkConf)
 
    sc.setLogLevel("ERROR")
 
    val spark = new org.apache.spark.sql.SQLContext(sc)
    import spark.implicits._
    
    println("Reading Training data.................")
    
    val testDF = spark.read.format("csv").option("inferSchema",true).option("header",true).load("/FileStore/tables/edureka/test.csv")
    
    println("Cleaning data.................")
    
    //Converting datetime string column to timestamp column
    val df_time = testDF.withColumn("datetime", to_timestamp(col("datetime"),"d-M-y H:m"))
    
    //Now Spliting date time into meaning columns such as year,month,day,hour
    val datetime_testDF = df_time.
    withColumn("year", year(col("datetime"))).
    withColumn("month", month(col("datetime"))).
    withColumn("day", dayofmonth(col("datetime"))).
    withColumn("hour", hour(col("datetime"))).
    withColumn("minute",minute(col("datetime")))
    
    //Onehot encoding on season and weather column.
    val indexer = Array("season","weather").map(c=>new OneHotEncoder().setInputCol(c).setOutputCol(c + "_Vec"))
    val pipeline = new Pipeline().setStages(indexer)
    val df_r = pipeline.fit(datetime_testDF).transform(datetime_testDF)
    
    println("Loading Trained Model..............")
    val gbt_model = PipelineModel.read.load("/FileStore/tables/model/bicycle-model")
    
    println("Making predictions...........") 
    val predictions = gbt_model.transform(df_r).select($"datetime",$"prediction".as("count"))
    
    println("Persisting the result to RDBMS..............")
    predictions.write.format("jdbc").
      option("url", "jdbc:mysql://mysqldb.edu.cloudlab.com/kapil_bicycle").
      option("driver", "com.mysql.cj.jdbc.Driver").option("dbtable", "predictions").
      option("user", "labuser").
      option("password", "edureka").
      mode(SaveMode.Append).save
  }
}

Application for Streaming Data

Write an application to predict demand on streaming data:

1. Setup flume to push data into spark flume sink.

//Kafka topic creation:
kafka-topics --create --zookeeper ip-20-0-21-161.ec2.internal:2181 --replication-factor 1 --partitions 1 --topic edureka_836462_bicycle_kapil

Flume configuration:

agent1.sources  = source1
agent1.channels = channel1
agent1.sinks = spark
agent1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
agent1.sources.source1.kafka.bootstrap.servers = ip-20-0-31-210.ec2.internal:9092
agent1.sources.source1.kafka.topics = edureka_836462_bicycle_kapil
agent1.sources.source1.kafka.consumer.group.id = edureka_836462_bicycle_kapil
agent1.sources.source1.channels = channel1
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = timestamp
agent1.sources.source1.kafka.consumer.timeout.ms = 100
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 10000
agent1.channels.channel1.transactionCapacity = 1000
agent1.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
agent1.sinks.spark.hostname = ip-20-0-41-62.ec2.internal
agent1.sinks.spark.port = 4143
agent1.sinks.spark.channel = channel1

Run Flume agent:

flume-ng agent --conf conf --conf-file bicycle.conf --name agent1 -Dflume.root.logger=DEBUG,console

2. Configure spark streaming to pulldata from spark flume sink using receivers and predict the demand using model and persist the result to RDBMS.

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.ml.regression.{GBTRegressionModel, GBTRegressor}
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.ml._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.flume._
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.OneHotEncoder
 
object BicycleStreaming {
  case class Bicycle(datetime: String, season: Int, holiday: Int, workingday: Int, weather: Int, temp: Double, atemp: Double, humidity: Int, windspeed: Double)
 
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("kapil")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(2))
 
    sc.setLogLevel("ERROR")
 
    val spark = new org.apache.spark.sql.SQLContext(sc)
 
    import spark.implicits._
 
    val flumeStream = FlumeUtils.createPollingStream(ssc, "ip-20-0-41-62.ec2.internal", 4143)
 
    println("Loading tained model.............")    
    val gbt_model = PipelineModel.read.load("/user/edureka_836462/bicycle-model")
 
    
    val lines = flumeStream.map(event => new String(event.event.getBody().array(), "UTF-8"))
 
    lines.foreachRDD { rdd => 
      def row(line: List[String]): Bicycle = Bicycle(line(0), line(1).toInt, line(2).toInt,
              line(3).toInt, line(4).toInt, line(5).toDouble, line(6).toDouble, line(7).toInt,
              line(8).toDouble
              )
 
      val rows_rdd = rdd.map(_.split(",").to[List]).map(row)
      val rows_df = rows_rdd.toDF
    
      if(rows_df.count > 0) {
        
        val df_time = rows_df.withColumn("datetime",to_timestamp(col("datetime"),"d-M-y H:m"))
        val datetime_testDF = df_time.
        withColumn("year", year(col("datetime"))).
        withColumn("month", month(col("datetime"))).
        withColumn("day", dayofmonth(col("datetime"))).
        withColumn("hour", hour(col("datetime"))).
        withColumn("minute",minute(col("datetime")))
 
        //Onehot encoding on season nd weather column.
        val indexer = Array("season","weather").map(c => new OneHotEncoder().setInputCol(c).setOutputCol(c + "_Vec"))
        val pipeline = new Pipeline().setStages(indexer)
        val df_r = pipeline.fit(datetime_testDF).transform(datetime_testDF)
 
        println("Making predictions...............")   
        val predictions =  gbt_model.transform(df_r).select($"datetime",$"prediction".as("count"))
 
        println("Persisting the result to RDBMS..................")
        predictions.write.format("jdbc").
          option("url", "jdbc:mysql://mysqldb.edu.cloudlab.com/kapil64_bicycle").
          option("driver", "com.mysql.cj.jdbc.Driver").option("dbtable", "predictions").
          option("user", "labuser").
          option("password", "edureka").
          mode(SaveMode.Append).save
      }
    }
    
    ssc.start()
    ssc.awaitTermination()    
  }
}

Run the application:

spark2-submit --packages mysql:mysql-connector-java:8.0.13 --class "BicycleStreaming" --master yarn /mnt/home/edureka_836462/BicycleProject/BicycleStreaming/target/scala-2.11/bicyclestreaming_2.11-1.0.jar

3. Push messages from flume to test the application. Here application should process and persist the result to RDBMS

kafka-console-producer --broker-list ip-20-0-31-210.ec2.internal:9092 --topic edureka_836462_bicycle_kapil
1/20/2011 0:00,1,0,1,1,10.66,11.365,56,26.0027

No comments