Chapter 9 Exploratory Data Analysis with SparkR

9.1 SparkR the Explorer

SparkR has a limited API for modeling. As of 1.6.1, the only supported modeling function in SparkR is a glm. There are many more available modeling functions in the R Server RevoScaleR library that can be computed using a Spark compute context. We will discuss this in more detail in the subsequent chapters.

While limited in modeling, SparkR shows its versatility for data exploration. We show how easy it is to create data exploration pipelines with SparkR and open source R packages.

9.2 Doing Data Aggregations with SparkR Efficiently

Since Spark evaluates objects lazily, tremendous speedups can be achieved by putting a little thought into our data analysis pipelines. In ths section, we will analyze the average arrival delay for flights and group them by carrier, origin and destination. These few feature variables are the only ones we need, so we can remove the redundant columns.

select_cols <- function(sparkDF = airDF) {
  
  library(magrittr)
  
  sparkDF %>% 
    SparkR::select(airDF$FL_DATE, airDF$DAY_OF_WEEK,
                   airDF$UNIQUE_CARRIER, airDF$ORIGIN,
                   airDF$DEST, airDF$ARR_DELAY) -> skinny_df
  
  skinny_df %<>% SparkR::rename(
    flight_date = skinny_df$FL_DATE,
    day_of_week = skinny_df$DAY_OF_WEEK,
    carrier = skinny_df$UNIQUE_CARRIER, 
    origin = skinny_df$ORIGIN,
    destination = skinny_df$DEST, 
    arrival_delay = skinny_df$ARR_DELAY
  )
  
  
  return(skinny_df)
  
}

air_df <- select_cols()

The function above creates a transformation on our airlines dataset and renames the columns we need for our analysis. At this point, we don’t have any actions yet, so this is simply a promise the Spark interpreter has given us for later evaluation.

9.3 From Spark DataFrames to Local Dataframes

For our next step, we will group by carrier, origin and destination, and calculate the average arrival delay. Our resulting dataframe should be rather condensed, so we will collect our results and save them to a local R data.frame. Observe how simple it is to create a pipeline that starts with a Spark DataFrame and outputs a local data.frame you can interact with locally.

agg_delay <- function(airdf = air_df) {
  
  library(magrittr)
  
  airdf %>% SparkR::group_by(airdf$carrier,
                             airdf$origin,
                             airdf$destination) %>% 
    SparkR::summarize(counts = n(airdf$arrival_delay),
                      ave_delay = mean(airdf$arrival_delay)) -> summary_df
  
  return(summary_df)
  
}

agg_df <- agg_delay()
agg_df_local <- agg_df %>% collect() %>% dplyr::tbl_df
save(agg_df_local, file = "aggflightslocal.RData")

If you are familiar with the dplyr grammar of data manipulation, you should be ecstatic by how your knowledge transfers over directly to manipulating Spark dataframes (and you’ll probably just wonder why Spark DataFrames don’t just have a supported backend by dplyr yet). We saved our local data.frame to disk so we don’t have to re-run the Spark datasteps again.

9.4 Plotting Results

We deliberately kept the carrier column in our analysis, in case we wanted to visualize or analyze delays by that feature. For now, let us narrow our focus to simply the routes of our flights.

# load the agg_df_local calculated above
load("aggflightslocal.RData")

delays_routes <- function(delay_df = agg_df_local) {
  
  library(dplyr)
  
  delay_df %>%
    group_by(origin, destination) %>%
    summarize(total = sum(counts), 
              arrival_delay = weighted.mean(ave_delay, counts)) -> route_delays
  
  return(route_delays)
 
  
}

delays_routes()
## Source: local data frame [9,171 x 4]
## Groups: origin [?]
## 
##    origin destination total arrival_delay
##     (chr)       (chr) (dbl)         (dbl)
## 1     ABE         ALB     2    23.0000000
## 2     ABE         ATL 18482     7.4414566
## 3     ABE         AVP  1587     2.3238815
## 4     ABE         AZO     0           NaN
## 5     ABE         BDL     1     1.0000000
## 6     ABE         BHM     1    -3.0000000
## 7     ABE         BWI  2502     4.3033573
## 8     ABE         CLE  6331    -2.4669089
## 9     ABE         CLT  8531     0.1807525
## 10    ABE         CVG  6654     0.4804629
## ..    ...         ...   ...           ...

Let us make a heatmap of the delays, by picking just a few routes. We will use the ggplot library to make our base plot, and then use the plotly package to make our plot more interactive!

library(ggplot2)

plot_route_delays <- function(min_routes = 10) {
  
  library(dplyr)
  library(ggplot2)
  
  gplot <- delays_routes() %>%
    filter(total > min_routes) %>% 
    arrange(desc(total)) %>% 
    filter(origin %in% c("JFK", "LGA", "IAD", "DCA",
                         "ATL", "DFW", "ORD", "IAH", 
                         "DEN", "CLT"),
           destination %in% c("ATL", "BDL", "BOS", "JFK", 
                              "IAD", "LGA", "DCA", "IAD",
                              "DFW", "ORD", "IAH", "DEN", 
                              "CLT")) %>% 
    ggplot(aes(x = origin, y = destination, fill = arrival_delay)) + 
    geom_tile(color = "white") + 
    scale_fill_gradient(low = "white", high = "steelblue")
  
  return(gplot)
  
}

gplot <- plot_route_delays(100) + theme_bw()
library(plotly)
ggplotly(gplot, width = 650)