Chapter 2 Apache Spark

The core of Apache Spark consists of four components:

  1. Spark SQL
  2. Spark Streaming
  3. MLlib - Machine Learning Library
  4. GraphX - Graphical Computing Library

The Spark project is constantly under development, and the AMPLab is frequently adding new packages and implementing new ideas. An overview of the current state of the ecosystem can be found here: Berkeley Data Analytics Stack

Even though Spark was envisioned as a better way of running iterative jobs on distributed datasets, it is actually complementary to Hadoop and can be placed over the Hadoop file system. This behavior is supported through a third-party clustering framework called Mesos.

2.1 Functional Programming and Lazy Evaluation

In functional programming, all constructs are functions, and all computations are evaluated as function calls (higher-order functions being functions that call upon other functions, think of it as function composition). Pure functional languages (such as Haskell), aim to achieve purity in their evaluation, and therefore rarely change-state or mutate objects, and don’t cause any side-effects.

Scala, which composes ~80% of the code in Spark, has full support for functional programming. Although not a pure functional progrmaming langauge like Haskell, it is actually a hybrid of object oriented languages like Java and functional languages like Haskell.

Lazy evaluation defers computation of a function until it is necessary. This makes it easy to create higher order functions, and then optimize their computation by only evaluating what is necessary, and optimizing the order of computations. This is particularly useful for data science, where the programmer is often manipulating large amounts of data and creating complex functional pipelines.

In Spark, all functions belong to one of two classes: transformations and actions. Transformations and actions are applied to RDDs in a lazy fashion, i.e., Spark only computes transformations when they are passed to an action. This allows Spark to optimize the sequence of transformations, and gives the developer the freedom to write complex functional pipelines without worrying prematurely about optimization (which would be evil)! In a more precise sense, Spark operations create a DAG (Directed Acyclical Graph)

2.2 Distributed Programming Abstractions

A Spark program consists of two programs, a driver program and a worker program. A driver program runs on the driver machine, whereas the worker program runs on cluster nodes (or in local threads when running on a single machine). The driver program communicates with YARN to negotiate resources for a Spark job, and launches various parallel operations on the cluster. Therefore, the Spark user can write functions on the driver program, and it is automatically shipped to the many nodes of your cluster.

In order to access Spark, the driver program needs to define a SparkContext object, which provides a connection to the computing cluster. Hence, the first thing a Spark program requires is a SparkContext object, which tells Spark the details of the cluster. Inside a Spark shell (including PySpark and the SparkR shells), the SparkContext is automatically created and assigned to a variable called sc. When connecting to Spark from a separate R client and using the SparkR package, you need to define the SparkContext yourself.

The most important parameter for a SparkContext is the master parameter, which determines the type and size of the cluster to use:

Master Parameter Description
local run Spark locally with one worker thread (no parallelism)
local[K] run Spark locally with K worker threads
(most commonly set to number of available cores)
spark://HOST:PORT connect to a Spark standalone cluster:
PORT depends on configuration (by default is 7077)
mesos://HOST:PORT connect to a Mesos cluster: PORT depends on configuration
(by default is 5050)

2.3 RDDs

RDDs, short for Resilient Distributed Datasets, are the primary abstraction for data in Spark. They are immutable, meaning that once they are created they cannot be modified. Thefore, while a transformation acts on a RDD, it receives it’s result (following an action) as a new RDD, it does not modify an RDD in place. An RDD is a read-only collection of objects distributed across the nodes of the cluster. These collections are called resilienet due to their fault-tolerant nature, as they can be rebuilt if any portion is lost. Spark tracks lineage information of RDDs to efficeintly recompute any lost data due to failures. An RDD is represented as a Scala object and can be created from a file in HDFS or any other storage system; as a parallelized slice (spread across nodes), perhaps by parallelizing Python or R collections (such as lists); as a transformation of another RDD; and finally through changing the persistence of an existing RDD, such as requesting that it be cached in memory.

The various functions for creating an RDD provide an argument for specifying the number of partitions for an RDD. The more partitions used in an RDD, the higher the amount of parallelism the program will achieve (if there are more partitions than worker machines, then some worker machines will have multiple partitions).

By default, Spark RDDs are recomputed each time you run an action on them. In order to reuse a Spark RDD, you can persist the RDD, which allows the RDD to be stored in various places we will outline below.

Spark applications are called drivers, and a driver can perform one of two operations on a dataset: an action and a transformation. An action performs a computation on a dataset and returns a value to the driver; a transformation creates a new dataset from an existing dataset. Transformations are lazily evaluated, and are therefore not computed immediately, but only when an action calls it. Examples of actions include performing a Reduce operation (using a function) and iterating a dataset (running a function on each element, similar to the Map operation). Examples of transformations include Map and Filter operations, and the Cache operation (which saves an RDD to memory or disk).

2.3.1 Common Transformations and Actions

Here are some of the most common transformations when working with RDDs:

Master Parameter Description
map(func) return a new distributed dataset by passing each element
of the source through a function func
filter(func) return a new dataset by selecting those elements
of the source where func returns TRUE
distinct([numTasks]) return a new dataset that contains the distinct elements of the source dataset
flatMap(func) similar to map, but instead of mapping to a single value,
func returns a sequence

These functions will not be evaluated when called, but only when they are passed onto an action. Here are some common actions:

Master Parameter Description
reduce(func) aggregate elements of the dataset by func.
func must take two arguments and returns a singleton
take(n) returns an array with the first
n elements of the dataset
collect() returns all the elements as an array
(make sure it can fit in the memory of the driver)
takeOrdered(n, key = func) return n elements ordered in ascending
order or as specified by the key function

The reduce function must be associative and commutative so that it can correctly compute in parallel.

Since the introduction of the DataFrames API in Spark 1.3, most data science applications will not need to create and compute with RDDs. In fact, the R API, SparkR does not even export many of the transformations below in the package’s namespace. However, they can still be called by utilizing the triple colon (i.e., SparkR:::filterMap.

Here’s a very simple example of creating a filterMap using the SparkR API:

library(SparkR)

sparkEnvir <- list(spark.executor.instance = '10',
                   spark.yarn.executor.memoryOverhead = '8000')

sc <- sparkR.init(
  sparkEnvir = sparkEnvir,
  sparkPackages = "com.databricks:spark-csv_2.10:1.3.0"
)
## Launching java with spark-submit command /usr/hdp/current/spark-client/bin/spark-submit  --packages com.databricks:spark-csv_2.10:1.3.0 sparkr-shell /tmp/RtmpG3xXzh/backend_port52fd431065da
sqlContext <- sparkRSQL.init(sc)

rdd <- SparkR:::parallelize(sc, 1:10)
multiplyByTwo <- SparkR:::flatMap(rdd, function(x) { list(x*2, x*10) })
results <- collect(multiplyByTwo)
unlist(results)
##  [1]   2  10   4  20   6  30   8  40  10  50  12  60  14  70  16  80  18
## [18]  90  20 100
sparkR.stop()

2.4 DataFrames

When working with relational data for structured data processing, most data scientists will think of using SQL, due to it’s highly efficient relational algebra. Spark provides a SQL interface with Spark SQL and a special SparkContext for working with relational data, appropriately named the SQLContext.

Spark SQL is a Spark module for structured data processing. While the RDD API is great for generic data storage, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations. There are several ways to interact with Spark SQL including SQL, the DataFrames API and the Datasets API. When computing a result the same execution engine is used, independent of which API/language you are using to express the computation. This unification means that developers can easily switch back and forth between the various APIs based on which provides the most natural way to express a given transformation.

DataFrames are RDDs, with the added specification that the data is organized into row of columns. DataFrames use the catalyst query optimizer to make querying more efficient. The catalyst query optimizer leverages advanced Scala functional programming features features, such as pattern matching and quasiquotes to build an extenisble query optimizer. So while conceptually Spark DataFrames look very much like R data.frames and Python’s Pandas DataFrames, they are actually highly optimized Spark objects that run in Spark’s SQL Context, which is a highly optimized environment for working with structured data.

Catalyst provides two methods for query optimization: 1) rule-based optimization, 2) cost-based optimization. Unlike traditional Spark operations that compute over a DAG (distributed acyclical graph), Catalyst’s primary data type is a tree (an abstract syntax tree to be precise) consisting of node objects, that describe a type and a set of child operations.

2.5 Datasets

A Dataset is a new experimental interface added in Spark 1.6 that tries to bring the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine to DataFrames. A Dataset can be constructed from JVM objects and then manipulated using functional transformations. Furthermore, Datasets are an abstraction above DataFrames, a DataFrame is simply a type alias of a Dataset.

The unified Dataset API can be used both in Scala and Java. Python does not yet have support for the Dataset API, but due to its dynamic nature many of the benefits are already available (i.e. you can access the field of a row by name naturally row.columnName). Full python support is expected to come in Spark 2.0, and it is yet unknown when SparkR will have support for Datasets.

2.6 MLlib

ML Pipelines

2.7 Spark APIs

There are numerous high-levels APIs for Spark: Java, Scala, Python and R, and an optimized engine that supports general execution graphs. The Scala, Python and R APIs all ship with HDInsight Spark Premium clusters, and the former two can be accessed via the jupyter kernel, which can be accessed via the jupyter dashboard: https://<>.azurehdinsight.net/jupyter/tree/.

2.7.1 Scala

The main Spark shell is a Scala API. It can be accessed directly from the bin directory of your Spark installation, as well as from the Jupyter notebooks running on your HDInsight cluster, where you can also find some demo notebooks illustrating how to use the API. While it is the most advanced and mature of the APIs we discussed in this section, it won’t be as familiar to data scientists who are more likely to know Python and R than Scala.

2.7.2 PySpark

The PySpark API might be the most commonly used API, due to Python’s ubiquity in data science. It is perhaps second in maturity to the Scala API, and when working with DataFrames, provides nearly the same performance as what you would get when writing native Spark applications with Scala. The PySpark kernel exposes all the transformation and action functions we described in the section on RDDs above.

2.7.3 SparkR

The SparkR API provides two ways of interacting with Spark through R: a package called SparkR for creating the necessary abstractions in your R session to access Spark’s functionality, and a sparkR shell which loads the library on top of the interactive R shell and creates the necessary SparkContexts automatically. To use the shell, see this quickstart. The examples in these notes use the SparkR library from RStudio, but you could use the SparkR library from any R environment of your choice. We describe how to to load the SparkR library in the next section, and how to configure your __.Rprofile_ file to automatically load the package at startup of your R session. As of Spark 1.4, the SparkR API is automatically bundled with Spark. Previous versions of Spark will need to install the library directly from the AMPLab github repository. This requires the devtools package: devtools::install_github('amplab-extras/SparkR-pkg').