Chapter 8 Starting Your Machine Learning Pipeline

The first steps to start your machine learning and data science pipeline is to set your compute environment, and point to your data.

In Spark, you’ll need to create a SparkContext. This constructor provides Spark with the details of the cluster: how and where to access it, additional package directories, etc. You’ll use this constructor to create new RDDs or DataFrames.

8.1 Finding the SparkR Library

In order to create a Spark Context from your RStudio Server environment, you’ll need to access the SparkR library. Since Spark 1.4, SparkR has shipped the R API directly with its core implementation. Therefore, since 1.4 you do not need to install Spark from CRAN or a development version from github, but you need to add the SparkR library to your library paths in order to access it.

A system variable called “SPARK_HOME” has been set that points to the Spark installation directory, and in it you’ll find subdirectories, “R/lib”, containing the SparkR library. If you prefer, you can add the location of the SparkR package to your library paths, as we did in section 7. If you prefer not edit your Rprofile, you can add the location of the SparkR package to your library paths at the start of your R session.

list.files(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"))
## [1] "SparkR"     "sparkr.zip"

To add the SparkR library to your library paths, use the .libPaths function to include the directory in the search path for R’s library tree. The library paths could also be changed from in the Rprofile, either for the user or system wide. See the help on ?StartUp for more details on R’s startup mechanism.

.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))

8.2 Creating a Spark Context

To create a SparkContext, you should use the spark.init function and pass in options for the environment parameters, application properties, any spark packages depended on, and any other additional Spark parameters. In order to create and manipulate DataFrames, we will need a SQLContext, which can be created from the SparkContext using the sparkRSQL.init function.

library(dplyr)
library(SparkR)

sparkEnvir <- list(spark.executor.instance = '10',
                   spark.yarn.executor.memoryOverhead = '8000')

sc <- sparkR.init(
  sparkEnvir = sparkEnvir,
  sparkPackages = "com.databricks:spark-csv_2.10:1.3.0"
)
## Launching java with spark-submit command /usr/hdp/current/spark-client/bin/spark-submit  --packages com.databricks:spark-csv_2.10:1.3.0 sparkr-shell /tmp/RtmpB87OWy/backend_port19d82074c3f
sqlContext <- sparkRSQL.init(sc)

We added the sparkPackages argument and set it to the value of spark-csv package provided by databricks. This will allow us to read csv objects into Spark DataFrames. Databricks are a prominent vendor for Spark, and are developers of many Spark packages (think of them as the RStudio of the Spark ecosystem).

After you are done using your Spark session, you can terminate your backend to Spark by running sparkR.stop().

8.3 Creating DataFrames

Using our sqlContext variable, we can create DataFrames from local R data.frames, from Hive tables, or from other data sources. You could create a Spark DataFrame from a local R data.frame using the createDataFrame function,

8.3.1 From Local R data.frames

Creating Spark DataFrames from local R data.frames might not seem like a great idea. After all, if it can fit in an R data.frame, that means it can fit in a single node’s memory, so why distribute it? It might be that you are testing your methods out on a sample dataset, and eventually you plan to scale out your analysis to larger datasets when your tests have passed. For expository purposes, this is a useful exercise, as it’ll expose you to the fundamentals of Spark DataFrames, and the SparkR syntax.

As a first example, we’ll import data from the nycflight13 package into a Spark DataFrame, and use it’s data aggregation functions to tabulate the number of flights to each destination of flights originating from JFK.

library(nycflights13)
flights <- createDataFrame(sqlContext, nycflights13::flights)
jfk_flights <- filter(flights, flights$origin == "JFK")
# Group the flights by destination and aggregate by the number of flights
dest_flights <- summarize(
  group_by(jkf_flights, jfk_flights$dest), 
  count = n(jfk_flights$dest)
  )
# Now sort by the `count` column and print the first few rows
head(arrange(dest_flights, desc(dest_flights$count)))

This same analysis could be streamlined using the %>% operator exposed by the magrittr package to improve the readability of the pipeline:

library(magrittr)
dest_flights <- filter(flights, flights$origin == "JFK") %>% 
  group_by(flights$dest) %>% 
  summarize(count = n(flights$dest))
dest_flights %>% arrange(desc(dest_flights$count)) %>% head

8.3.2 Creating DataFrames from CSV Files

Since we imported the spark-csv package, we can import CSV files into DataFrames. We will be using the full airlines dataset, which I assume you have already ingested into Blob storage by following the steps in Section 6.

We have saved in our data directory a couple of data directories (virtually, everything in Azure Storage is a simple blob!). Let’s see what we have in our directory using the rxHadoopListFiles command, which is simply a wrapper to hadoop shell command hadoop fs -ls

data_dir <- "/user/RevoShare/alizaidi"
rxHadoopListFiles(data_dir)

Taking a look at the individual directories, we can see how many files there are for the Airlines directory and Weather directory

rxHadoopCommand("fs -ls /user/RevoShare/alizaidi/AirOnTimeCSV | head")
rxHadoopCommand("fs -ls /user/RevoShare/alizaidi/delayDataLarge/WeatherRaw | head")

Let’s read the airlines directory and the weather directory to Spark DataFrames. We will use the read.df function from the spark.csv package.

airPath <- file.path("/user/RevoShare/alizaidi", "AirOnTimeCSV")
weatherPath <- file.path(fullDataDir, "Weather") # pre-processed weather data

# create a SparkR DataFrame for the airline data

airDF <- read.df(sqlContext, airPath, source = "com.databricks.spark.csv", 
                 header = "true", inferSchema = "true")

# Create a SparkR DataFrame for the weather data

weatherDF <- read.df(sqlContext, weatherPath, source = "com.databricks.spark.csv", 
                     header = "true", inferSchema = "true")

Note that it took more than 6 minutes to load our airlines data into Spark DataFrames. However, subsequent operations on the airDF object will occur in-memory, and should be very fast.

Let’s count the number of rows in each of our DataFrames and print the first few rows:

library(SparkR)
lapply(list(airDF, weatherDF), count)
# [[1]]
# [1] 148619655
# 
# [[2]]
# [1] 14829028

lapply(list(airDF, weatherDF), head)
# [[1]]
#   YEAR MONTH DAY_OF_MONTH DAY_OF_WEEK    FL_DATE UNIQUE_CARRIER TAIL_NUM FL_NUM
# 1 1987    10            1           4 1987-10-01             AA               1
# 2 1987    10            2           5 1987-10-02             AA               1
# 3 1987    10            3           6 1987-10-03             AA               1
# 4 1987    10            4           7 1987-10-04             AA               1
# 5 1987    10            5           1 1987-10-05             AA               1
# 6 1987    10            6           2 1987-10-06             AA               1
#   ORIGIN_AIRPORT_ID ORIGIN ORIGIN_STATE_ABR DEST_AIRPORT_ID DEST DEST_STATE_ABR
# 1             12478    JFK               NY           12892  LAX             CA
# 2             12478    JFK               NY           12892  LAX             CA
# 3             12478    JFK               NY           12892  LAX             CA
# 4             12478    JFK               NY           12892  LAX             CA
# 5             12478    JFK               NY           12892  LAX             CA
# 6             12478    JFK               NY           12892  LAX             CA
#   CRS_DEP_TIME DEP_TIME DEP_DELAY DEP_DELAY_NEW DEP_DEL15 DEP_DELAY_GROUP TAXI_OUT
# 1          900      901         1             1         0               0         
# 2          900      901         1             1         0               0         
# 3          900      859        -1             0         0              -1         
# 4          900      900         0             0         0               0         
# 5          900      902         2             2         0               0         
# 6          900      900         0             0         0               0         
#   WHEELS_OFF WHEELS_ON TAXI_IN CRS_ARR_TIME ARR_TIME ARR_DELAY ARR_DELAY_NEW
# 1                                      1152     1117       -35             0
# 2                                      1152     1137       -15             0
# 3                                      1152     1111       -41             0
# 4                                      1152     1116       -36             0
# 5                                      1152     1119       -33             0
# 6                                      1152       NA        NA            NA
#   ARR_DEL15 ARR_DELAY_GROUP CANCELLED CANCELLATION_CODE DIVERTED CRS_ELAPSED_TIME
# 1         0              -2         0                          0              352
# 2         0              -1         0                          0              352
# 3         0              -2         0                          0              352
# 4         0              -2         0                          0              352
# 5         0              -2         0                          0              352
# 6        NA              NA         0                          1              352
#   ACTUAL_ELAPSED_TIME AIR_TIME FLIGHTS DISTANCE DISTANCE_GROUP CARRIER_DELAY
# 1                 316                1     2475             10              
# 2                 336                1     2475             10              
# 3                 312                1     2475             10              
# 4                 316                1     2475             10              
# 5                 317                1     2475             10              
# 6                  NA                1     2475             10              
#   WEATHER_DELAY NAS_DELAY SECURITY_DELAY LATE_AIRCRAFT_DELAY 
# 1                                                            
# 2                                                            
# 3                                                            
# 4                                                            
# 5                                                            
# 6                                                            
# 
# [[2]]
#   Visibility DryBulbCelsius DewPointCelsius RelativeHumidity WindSpeed Altimeter
# 1   4.000000       0.000000       -1.000000        92.000000  0.000000 29.690000
# 2  10.000000       7.000000       -3.000000        49.000000 11.000000 29.790000
# 3   3.000000       1.000000        0.000000        92.000000  3.000000 29.710000
# 4  10.000000       7.000000       -5.000000        42.000000 18.000000 29.710000
# 5   1.250000       3.000000        0.000000        82.000000  6.000000 29.720000
# 6  10.000000       8.000000       -4.000000        44.000000 14.000000 29.770000
#   AdjustedYear AdjustedMonth AdjustedDay AdjustedHour AirportID
# 1         2007             5           4           18     15177
# 2         2007             5           4            6     15177
# 3         2007             5           4           17     15177
# 4         2007             5           4            9     15177
# 5         2007             5           4           10     15177
# 6         2007             5           4            7     15177

8.4 Caching DataFrames

Spark provides a few different options for caching DataFrames.