Chapter 10 Data Manipulation with SparkR

Now that we have our two datasets saved as Spark DataFrames, we can conduct standard data manipulation techniques to visualize and explore our data.

First, we’ll use the rename function to rename our columns, and the select function to select the columns we need. We’ll also transform the These SparkR functions look just like the verbs from the dplyr package for data manipulation, but are designed to work with Spark DataFrames.

system.time(airDF <- rename(airDF,
                            ArrDel15 = airDF$ARR_DEL15,
                            Year = airDF$YEAR,
                            Month = airDF$MONTH,
                            DayofMonth = airDF$DAY_OF_MONTH,
                            DayOfWeek = airDF$DAY_OF_WEEK,
                            Carrier = airDF$UNIQUE_CARRIER,
                            OriginAirportID = airDF$ORIGIN_AIRPORT_ID,
                            DestAirportID = airDF$DEST_AIRPORT_ID,
                            CRSDepTime = airDF$CRS_DEP_TIME,
                            CRSArrTime =  airDF$CRS_ARR_TIME,
                            Distance = airDF$DISTANCE,
                            DepDelay = airDF$DEP_DELAY,
                            ArrDelay = airDF$ARR_DELAY
                            )
            )
  #  user  system elapsed 
  # 0.136   0.000   0.242 

# Select desired columns from the flight data. 
varsToKeep <- c("ArrDel15", "Year", "Month", "DayofMonth", "DayOfWeek", "Carrier",
                "OriginAirportID", "DestAirportID", "CRSDepTime", "CRSArrTime",
                "Distance", "DepDelay", "ArrDelay")
system.time(airDF <- select(airDF, varsToKeep))
  #  user  system elapsed 
  # 0.064   0.000   0.112 

# Round down scheduled departure time to full hour.
system.time(airDF$CRSDepTime <- floor(airDF$CRSDepTime / 100))
  # user  system elapsed 
  #  0.00    0.00    0.06 

10.1 Data Aggregations

SparkR is great at merges, and data aggregation. For instance, suppose we want to see the average departure delay for each carrier and arrange it in descending order. The following example shows just how easy the syntax for SparkR is.

sum_df <- airDF %>% select("Carrier", "DepDelay") %>% 
  group_by(airDF$Carrier) %>% 
  summarize(count = n(airDF$Carrier), 
            ave_delay = mean(airDF$DepDelay))
   # user  system elapsed 
  # 0.024   0.000   0.055 

The syntax is almost exactly like the syntax from the dplyr package, and the %>% operator makes chaining the additive methods exceptionally simple. Note that the above operation will not be run until we call an action upon the sum_df. As of right now it is a series of transformations, so it is a recipe for doing some computations, but the actual evaluation has been deferred until we call an action.

10.2 Collecting Results to Local Dataframes

In order to evaluate and bring the summarized data into an R data.frame, we can use the collect action.

sum_local <- sum_df %>% collect()
  #  user  system elapsed 
  # 0.616   0.536 337.758 
library(dplyr)
sum_local %>% arrange(desc(ave_delay))

10.3 Dimple Bar Charts

Now that our data resides as a local data.frame, we can plot it using any R plotting library.

load("local_df.RData")
library(rcdimple)
sum_local %>% 
  dimple(x ="Carrier", y = "ave_delay", z =  "count", type = "bar") %>%
  add_title(html = "<h4>Average Delay in Minutes by Carrier</h4>" ) %>% 
  zAxis(outputFormat = "#,### ")

In order to make the weather data correspond to the airline data, let us aggregate it by date and airport, and obtain it’s average value. If you are familiar with the dplyr package, you should be very familiar with the syntax provided by SparkR.

weatherAgg <- weatherDF %>% 
  SparkR::group_by("AdjustedYear", "AdjustedMonth", "AdjustedDay", "AdjustedHour", "AirportID") %>%
  SparkR::summarize(Visibility = mean(weatherDF$Visibility),
                    DryBulbCelsius = mean(weatherDF$DryBulbCelsius),
                    DewPointCelsius = mean(weatherDF$DewPointCelsius),
                    RelativeHumidity = mean(weatherDF$RelativeHumidity),
                    WindSpeed = mean(weatherDF$RelativeHumidity),
                    Altimeter = mean(weatherDF$Altimeter))

10.4 Merging Data

We can use SparkR for merging data sets as well. Let’s merge the airlines dataset with the weather dataset. We’ll first add weather data to the origination airport, and then add it to the destination airport. To keep our data in manageable size, we will remove the redundant columns. Finally, we save the DataFrame to a CSV file, stored in HDFS so that we may access it later.

joinedDF <- SparkR::join(
  airDF,
  weatherAgg,
  airDF$OriginAirportID == weatherAgg$AirportID &
    airDF$Year == weatherAgg$AdjustedYear &
    airDF$Month == weatherAgg$AdjustedMonth &
    airDF$DayofMonth == weatherAgg$AdjustedDay &
    airDF$CRSDepTime == weatherAgg$AdjustedHour,
  joinType = "left_outer"
)

vars <- names(joinedDF)
varsToDrop <- c('AdjustedYear', 'AdjustedMonth', 'AdjustedDay', 'AdjustedHour', 'AirportID')
varsToKeep <- vars[!(vars %in% varsToDrop)]
joinedDF1 <- select(joinedDF, varsToKeep)

joinedDF2 <- rename(joinedDF1,
                    VisibilityOrigin = joinedDF1$Visibility,
                    DryBulbCelsiusOrigin = joinedDF1$DryBulbCelsius,
                    DewPointCelsiusOrigin = joinedDF1$DewPointCelsius,
                    RelativeHumidityOrigin = joinedDF1$RelativeHumidity,
                    WindSpeedOrigin = joinedDF1$WindSpeed,
                    AltimeterOrigin = joinedDF1$Altimeter
)


joinedDF3 <- join(
  joinedDF2,
  weatherAgg,
  airDF$DestAirportID == weatherAgg$AirportID &
    airDF$Year == weatherAgg$AdjustedYear &
    airDF$Month == weatherAgg$AdjustedMonth &
    airDF$DayofMonth == weatherAgg$AdjustedDay &
    airDF$CRSDepTime == weatherAgg$AdjustedHour,
  joinType = "left_outer"
)

# Remove redundant columns
vars <- names(joinedDF3)
varsToDrop <- c('AdjustedYear', 'AdjustedMonth', 'AdjustedDay', 'AdjustedHour', 'AirportID')
varsToKeep <- vars[!(vars %in% varsToDrop)]
joinedDF4 <- select(joinedDF3, varsToKeep)

joinedDF5 <- rename(joinedDF4,
                    VisibilityDest = joinedDF4$Visibility,
                    DryBulbCelsiusDest = joinedDF4$DryBulbCelsius,
                    DewPointCelsiusDest = joinedDF4$DewPointCelsius,
                    RelativeHumidityDest = joinedDF4$RelativeHumidity,
                    WindSpeedDest = joinedDF4$WindSpeed,
                    AltimeterDest = joinedDF4$Altimeter
                    )


joinedDF5 <- repartition(joinedDF5, 80) 

# write result to directory of CSVs
write.df(joinedDF5, file.path("/user/RevoShare/alizaidi/delayDataLarge",
                              "JoinAirWeatherDelay"),
         "com.databricks.spark.csv", "overwrite", 
         header = "true")

# We can shut down the SparkR Spark context now
sparkR.stop()

10.5 Exploring Credit Scores for Mortgage Borrowers

Freddie Mac and Fannie Mae make their single family loan level data available publically (for research and non-commerical use). This data includes credit attributes for new loan originations, as well as a time series of performance metrics for each loan that was sponsored by these GSE(Government Sponsored Enterprise).

We have loaded both portions of these data into different data directories on our Azure Storage Account. Let’s import the acquistion data (credit profiles for new originations for new mortgages) and rename it’s columns to more manageable column names.

10.5.1 Ingesting Originations Data into Spark DataFrames

data_dir <- "/user/RevoShare/alizaidi"

acquisition_import <- function(datadir) {

  library(magrittr)

  acquisition <- file.path(datadir, "Acquisition")

  originations <- read.df(sqlContext, acquisition,
                          source = "com.databricks.spark.csv",
                          header = "false", inferSchema = "true", delimiter = "|")


  originations %<>% SparkR::rename(loan_id  = originations$C0,
                                   orig_chn = originations$C1,
                                   seller_name = originations$C2,
                                   orig_rt = originations$C3,
                                   orig_amt = originations$C4,
                                   orig_trm = originations$C5,
                                   orig_dte = originations$C6,
                                   frst_dte = originations$C7,
                                   oltv = originations$C8,
                                   ocltv = originations$C9,
                                   num_bo = originations$C10,
                                   dti = originations$C11,
                                   cscore_b = originations$C12,
                                   fthb_flg = originations$C13,
                                   purpose = originations$C14,
                                   prop_typ = originations$C15,
                                   num_unit  = originations$C16,
                                   occ_stat  = originations$C17,
                                   state  = originations$C18,
                                   zip_3 = originations$C19,
                                   mi_pct  = originations$C20,
                                   product_type = originations$C21,
                                   cscore_co  = originations$C22)
  return(originations)


}

originations <- acquisition_import(datadir = data_dir)
originations %>% persist("MEMORY_ONLY")
## DataFrame[loan_id:bigint, orig_chn:string, seller_name:string, orig_rt:double, orig_amt:int, orig_trm:int, orig_dte:string, frst_dte:string, oltv:int, ocltv:int, num_bo:int, dti:int, cscore_b:int, fthb_flg:string, purpose:string, prop_typ:string, num_unit:int, occ_stat:string, state:string, zip_3:int, mi_pct:int, product_type:string, cscore_co:int]
originations %>% nrow
## [1] 21706905
originations %>% head
##        loan_id orig_chn                               seller_name orig_rt
## 1 100007365142        R                   JPMORGAN CHASE BANK, NA   8.000
## 2 100011322040        C                              AMTRUST BANK   7.750
## 3 100015192562        R                                     OTHER   8.500
## 4 100015874399        C                        CITIMORTGAGE, INC.   8.750
## 5 100017922445        C                              AMTRUST BANK   8.250
## 6 100020205696        R FIRST TENNESSEE BANK NATIONAL ASSOCIATION   7.625
##   orig_amt orig_trm orig_dte frst_dte oltv ocltv num_bo dti cscore_b
## 1    75000      360  12/1999  02/2000   79    NA      1  62      763
## 2   123000      360  11/1999  01/2000   80    NA      1  28      750
## 3    51000      360  02/2000  04/2000   95    NA      1  27      686
## 4   242000      360  02/2000  04/2000   95    NA      1  47      706
## 5   240000      360  12/1999  02/2000   77    NA      2  19      737
## 6   225000      360  07/1999  09/1999   64    NA      2  21      793
##   fthb_flg purpose prop_typ num_unit occ_stat state zip_3 mi_pct
## 1        N       R       SF        1        P    PA   173     NA
## 2        N       P       SF        1        P    MO   630     NA
## 3        N       P       SF        1        P    GA   316     25
## 4        N       P       SF        1        P    FL   335     30
## 5        N       P       SF        1        P    MI   483     NA
## 6        N       P       SF        1        P    WA   980     NA
##   product_type cscore_co
## 1          FRM        NA
## 2          FRM        NA
## 3          FRM        NA
## 4          FRM        NA
## 5          FRM       731
## 6          FRM       770

We encapsulated our import into an R function to make the overall operation a bit more readable.

10.5.2 Calculting State Level Credit Attributes

Let’s extract the month and year from each loan’s origination date, and then calculate the average value for a few different credit attributes at the state level.

originations_dates <- function(orig_df = originations) {

  orig_df <- orig_df %>% SparkR::mutate(month = substr(orig_df$orig_dte, 1, 2),
                                year = substr(orig_df$orig_dte, 5, 8))

  return(orig_df)

}


originations_state <- function(orig_df = originations_dates(originations)) {

  orig_df %>% SparkR::group_by(orig_df$state, orig_df$year) %>%
    SparkR::summarize(ave_dti = mean(orig_df$dti),
              ave_ltv = mean(orig_df$oltv),
              ave_cltv = mean(orig_df$ocltv),
              ave_fico = mean(orig_df$cscore_b)) -> orig_summary

  return(orig_summary)

}

orig_summary <- originations_state() %>% as.data.frame
sparkR.stop()

10.5.3 Credit Attribute Choropleths

We can visualize these average values in team as a choropleth. We will use the very easy to use rMaps package for this visualiziation.

library(rMaps)
orig_summary %>%
  dplyr::mutate(year = as.numeric(year)) %>%
  rMaps::ichoropleth(ave_fico ~ state, data = .,
                     animate = "year",
                     geographyConfig = list(popupTemplate = "#!function(geo, data) {
                                         return '<div class=\"hoverinfo\"><strong>'+
                                         data.state+ '<br>' + 'Average Credit Score for Loans Originated in '+ data.year + ': ' +
                                         data.ave_fico.toFixed(0) +
                                         '</strong></div>';}!#")) -> fico.map
fico.map$show('iframesrc', cdn = TRUE)