Apache Spark Explained

What is Apache Spark?

Spark was originally conceived at Berkeley’s AMPLab by Matei Zaharia, who went on to cofound Databricks, together with his mentor Ion Stoica, as well as Reynold Xin, Patrick Wendell, Andy Konwinski, and Ali Ghodsi. Although Spark is an open source, Databricks is the main force behind Apache Spark, contributing more than 75% of Spark’s code. It also offers Databricks Cloud, a commercial product for big data analysis based on Apache Spark.

Spark wasn’t made with online transaction processing (OLTP) applications in mind (fast, numerous, atomic transactions). It’s better suited for online analytical processing (OLAP): batch jobs and data mining.

Spark’s core concept is an in-memory execution model that enables caching job data in memory instead of fetching it from disk every time, as MapReduce does. This can speed the execution of jobs up to 100 times, compared to the same jobs in Map-Reduce; it has the biggest effect on iterative algorithms such as machine learning, graph algorithms, and other types of workloads that need to reuse data.

val spark = SparkSession.builder().appName("Spark wordcount")
val file = spark.sparkContext.textFile("hdfs://…")
val counts = file.flatMap(line => line.split(" "))
    .map(word => (word, 1)).countByKey()

Spark Supports
– Scala
– Java
– Python
– R

What are the Spark Components?

These components make Spark a feature-packed unifying platform: it can be used for many tasks that previously had to be accomplished with several different frameworks.

Spark Core

Spark Core contains basic Spark functionalities required for running jobs and needed by other components. The most important of these is the resilient distributed dataset (RDD), which is the main element of the Spark API. Spark Core contains logic for accessing various filesystems, such as HDFS, GlusterFS, Amazon S3, and so on. It also provides a means of information sharing between computing nodes with broadcast variables and accumulators. Other fundamental functions, such as networking, security, scheduling, and data shuffling, are also part of Spark Core.

Spark SQL

Spark SQL provides functions for manipulating large sets of distributed, structured data using an SQL subset supported by Spark and Hive SQL (HiveQL). With DataFrames introduced in Spark 1.3 and DataSets introduced in Spark 1.6, which simplified the handling of structured data and enabled radical performance optimizations, Spark SQL became one of the most important Spark components. Spark SQL can also be used for reading and writing data to and from various structured formats and data sources, such as JavaScript Object Notation (JSON) files, Parquet files (an increasingly popular file format that allows for storing a schema along with the data), relational databases, Hive, and others. Operations on DataFrames and DataSets at some point translate to operations on RDDs and execute as ordinary Spark jobs. Spark SQL provides a query optimization framework called Catalyst that can be extended by custom optimization rules. Spark SQL also includes a Thrift server, which can be used by external systems, such as business intelligence tools, to query data through Spark SQL using classic JDBC and ODBC protocols.

Spark Streaming

Spark Streaming is a framework for ingesting real-time streaming data from various sources. The supported streaming sources include HDFS, Kafka, Flume, Twitter, ZeroMQ, and custom ones. Spark Streaming operations recover from failure automatically, which is important for online data processing. Spark Streaming represents streaming data using discretized streams (DStreams), which periodically create RDDs containing the data that came in during the last time window. Spark Streaming can be combined with other Spark components in a single program, unifying real-time processing with machine learning, SQL, and graph operations. This is something unique in the Hadoop ecosystem. And since Spark 2.0, the new Structured Streaming API makes Spark streaming programs more similar to Spark batch programs.

Spark MLLib

Spark MLlib is a library of machine-learning algorithms grown from the MLbase project at UC Berkeley. Supported algorithms include logistic regression, naïve Bayes classification, support vector machines (SVMs), decision trees, random forests, linear regression, and k-means clustering. Spark MLlib handles machine-learning models used for transforming datasets, which are represented as RDDs or DataFrames.

Spark GraphX

Graphs are data structures comprising vertices and the edges connecting them. GraphX provides functions for building graphs, represented as graph RDDs: EdgeRDD and VertexRDD. GraphX contains implementations of the most important algorithms of graph theory, such as page rank, connected components, shortest paths, SVD++, and others. It also provides the Pregel message-passing API, the same API for large-scale graph processing implemented by Apache Giraph, a project with implementations of graph algorithms and running on Hadoop.

How does spark program flow look like?

Imagine that a 300 MB log file is stored in a three-node HDFS cluster. HDFS automatically splits the file into 128 MB parts (blocks, in Hadoop terminology) and places each part on a separate node of the cluster. Let’s assume Spark is running on YARN, inside the same Hadoop cluster.

A Spark data engineer is given the task of analyzing how many errors of type OutOfMemoryError have happened during the last two weeks. Pankaj, the engineer, knows that the log file contains the last two weeks of logs of the company’s application server cluster. He sits at her laptop and starts to work.

He first starts her Spark shell and establishes a connection to the Spark cluster. Next, he loads the log file from HDFS by using this (Scala) line:

val lines = sc.textFile("hdfs://path/to/the/file")

To achieve maximum data locality, the loading operation asks Hadoop for the locations of each block of the log file and then transfers all the blocks into RAM of the cluster’s nodes. Now Spark has a reference to each of those blocks (partitions, in Spark terminology) in RAM. The sum of those partitions is a distributed collection of lines from the log file referenced by an RDD. Simplifying, we can say that RDDs allow you to work with a distributed collection the same way you would work with any local, nondistributed one. You don’t have to worry about the fact that the collection is distributed, nor do you have to handle node failures yourself.

Pankaj now has a reference to the RDD, so in order to find the error count, he first wants to remove all the lines that don’t have an OutOfMemoryError substring. This is a job for the filter function, which he calls like this:

val oomLines = lines.filter(l => l.contains("OutOfMemoryError")).cache()

Pankaj calls cache on it, which tells Spark to leave that RDD in memory across jobs. Caching is the basic component of Spark’s performance improvements we mentioned before. The benefits of caching the RDD will become apparent later.

Now he is left with only those lines that contain the error substring. For this simple example, we’ll ignore the possibility that the OutOfMemoryError string might occur in multiple lines of a single error. Our data engineer counts the remaining lines and reports the result as the number of out-of-memory errors that occurred in the last two weeks:

val result = oomLines.count()

If he now wants to further analyze lines with OutOfMemoryErrors, and perhaps call filter again (but with other criteria) on an oomLines object that was previously cached in memory, Spark won’t load the file from HDFS again, as it would normally do. Spark will load it from the cache.

Spark Ecosystem

We’ve already mentioned the Hadoop ecosystem, consisting of interface, analytic, cluster-management, and infrastructure tools.

Spark Fundamentals

Spark shell is also known as Spark REPL, where the REPL acronym stands for read-eval-print loop. It reads your input, evaluates it, prints the result, and then does it all over again—that is, after a command returns a result, it doesn’t exit the scala> prompt; it stays ready for your next command (thus loop)

Launching spark shell

spark@spark-in-action:~$ spark-shell
 Setting default log level to "WARN".
 To adjust logging level use sc.setLogLevel(newLevel).
 Spark context Web UI available at
 Spark context available as 'sc' (master = local[*], app id = local-1577045675854).
 Spark session available as 'spark'.
 Welcome to
       _              _      / /   / /
     \ \/ _ \/ _ `/ /  '/    // ./_,// //_\   version 2.0.0
 Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_72-internal)
 Type in expressions to have them evaluated.
 Type :help for more information.

As you can see in the output, you’re provided with the Spark context in the form of the sc variable and the SQL context as sqlContext. The Spark context is the entry point for interacting with Spark. You use it for things like connecting to Spark from an application, configuring a session, managing job execution, loading or saving a file, and so on.

Simple line count with spark

scala> val liclines = sc.textFile("/usr/local/spark/LICENSE")
 liclines: org.apache.spark.rdd.RDD[String] = /usr/local/spark/LICENSE MapPartitionsRDD[1] at textFile at :24

 scala> val licecount = liclines.count
 licecount: Long = 299

You now know the total number of lines in the file. What good does that do? You need to find out the number of lines BSD appears in. The idea is to run the licLines collection through a filter that sifts out the lines that don’t contain BSD:

scala> val badLines = liclines.filter(line => line.contains("BSD"))
 badLines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at :26

 scala> badLines.count
 res0: Long = 33

If you’ve never used Scala, you may be wondering what the snippet with the fat arrow (=>) means. That is a Scala function literal; it defines an anonymous function that takes a string and returns true or false, depending on whether the line contains the “BSD” substring. The fat arrow basically designates the transformation that a function does on the left side of the expression, converting it to the right side, which is then returned. In this case, String (line) is transformed into a Boolean (the result of contains), which is then returned as the function’s result.

The filter function evaluates the fat-arrow function on each element of the licLines collection (each line) and returns a new collection, bsdLines, that has only those elements for which the fat-arrow function returned true. The fat-arrow function you use for filtering lines is anonymous, but you could also define the equivalent named function, like this

scala> def isBSD(line: String) = { line.contains("BSD") }
 isBSD: (line: String)Boolean

or store (a reference to) the function definition in a variable

scala> val isBSD = (line: String) => line.contains("BSD")
 isBSD: String => Boolean = <function1>

and then use it in place of the anonymous function:

scala> val bsdLines1 = liclines.filter(isBSD)
 bsdLines1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at filter at :28
scala> bsdLines1.count
 res1: Long = 33

To print the lines containing BSD to the console, you call println for each line:

scala> bsdLines1.foreach(bLine => println(bLine))
 BSD-style licenses
 The following components are provided under a BSD-style license. See project link for details.
      (BSD 3 Clause) netlib core (com.github.fommil.netlib:core:1.1.2 - https://github.com/fommil/netlib-java/core)

The notion of a resilient distributed dataset

Although licLines and bsdLines feel and look like ordinary Scala collections (filter and foreach methods are available in ordinary Scala collections, too), they aren’t. They’ve distributed collections, specific to Spark, called resilient distributed datasets or RDDs.

The RDD is the fundamental abstraction in Spark. It represents a collection of elements that is
– Immutable (read-only)
-Resilient (fault-tolerant)
-Distributed (dataset spread out to more than one node)

RDDs support a number of transformations that do useful data manipulation, but they always yield a new RDD instance. Once created, RDDs never change; thus the adjective immutable. Mutable state is known to introduce complexity, but besides that, having immutable collections allows Spark to provide important fault-tolerance guarantees in a straightforward manner.

The fact that the collection is distributed on a number of machines (execution contexts, JVMs) is transparent to its users, so working with RDDs isn’t much different than working with ordinary local collections like plain old lists, maps, sets, and so on. To summarize, the purpose of RDDs is to facilitate parallel operations on large datasets in a straightforward manner, abstracting away from their distributed nature and inherent fault tolerance.

RDDs are resilient because of Spark’s built-in fault recovery mechanics. Spark is capable of healing RDDs in case of node failure. Whereas other distributed computation frameworks facilitate fault tolerance by replicating data to multiple machines (so it can be restored from healthy replicas once a node fails), RDDs are different: they provide fault tolerance by logging the transformations used to build a dataset (how it came to be) rather than the dataset itself. If a node fails, only a subset of the dataset that resided on the failed node needs to be recomputed.

For example, in the previous section, the process of loading the text file yielded the licLines RDD. Then you applied the filter function to licLines, which produced the new bsdLines RDD. Those transformations and their ordering are referred to as RDD lineage. It represents the exact recipe for creating the bsdLines RDD, from start to finish.

Basic RDD actions and Transformation

There are two types of RDD operations: transformations and actions. Transformations (for example, filter or map) are operations that produce a new RDD by performing some useful data manipulation on another RDD. Actions (for example, count or foreach) trigger a computation in order to return the result to the calling program or to perform some actions on an RDD’s elements.

It’s important to understand that transformations are evaluated lazily, meaning computation doesn’t take place until you invoke an action. Once an action is triggered on an RDD, Spark examines the RDD’s lineage and uses that information to build a “graph of operations” that needs to be executed in order to compute the action. Think of a transformation as a sort of diagram that tells Spark which operations need to happen and in which order once the action gets executed.

Map Transformation

The map transformation allows you to apply an arbitrary function to all elements of an RDD. Here is how the map method is declared (we removed parts of the signature that aren’t relevant for this discussion):

Let’s start with a basic example. If you wanted to calculate the squares of an RDD’s elements, you could easily do that using map.

scala> val numbers = sc.parallelize(10 to 50 by 10)
 numbers: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at :24

 scala> numbers.foreach(x => println(x))

 scala> val numbersSquared = numbers.map(num => num * num)
 numbersSquared: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[5] at map at :26

 scala> numbersSquared.foreach(x => println(x))

The first command in the listing, Spark context’s parallelize method, takes a Seq (Array and List classes both implement the Seq interface) and creates an RDD from its elements. The Seq’s elements get distributed to Spark executors in the process. makeRDD is an alias for parallelize, so you can use either of the two. The expression passed in as an argument (10 to 50 by 10) is Scala’s way of creating a Range, which is also an implementation of Seq.

Using a slightly different example to illustrate how map can change RDD’s type, imagine a situation where you want to convert an RDD of integers to an RDD of strings and then reverse each of those strings:

scala> val reversed = numbersSquared.map(x => x.toString.reverse)
 reversed: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at map at :28

 scala> reversed.foreach(x => println(x))
scala> val alsorev = numbersSquared.map(_.toString.reverse)
 alsorev: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at map at :28

 scala> alsorev.first
 res8: String = 001

 scala> alsorev.top(4)
 res9: Array[String] = Array(009, 0061, 0052, 004)

You can read the placeholder syntax in the previous example like this: “Whatever I’m invoked with, call toString and then reverse on it.” We call it a placeholder[4] because it holds the place to be filled in with the argument to a function when the function is invoked.

In this case, as map starts going over elements, the placeholder is first replaced with the first element from the numbersSquared collection (100), then the second element (400), and so on.

distinct and flatMap transformations

scala> val lines = sc.textFile("/home/spark/client-ids.log")
 lines: org.apache.spark.rdd.RDD[String] = /home/spark/client-ids.log MapPartitionsRDD[1] at textFile at :24

 scala> val idsStr = lines.map(line => line.split(","))
 idsStr: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at :26

 scala> idsStr.foreach(println(_))
 [Stage 0:>                                                          (0 + 0) / 2][Ljava.lang.String;@41a66b0

scala> idsStr.first
 res1: Array[String] = Array(15, 16, 20, 20, 77, 80, 94, 94, 98, 16, 31, 31, 15, 20)

. collect is an action that creates an array and then, well, collects all elements of an RDD into that array, which it then returns as the result to your shell:

scala> idsStr.collect
 res2: Array[Array[String]] = Array(Array(15, 16, 20, 20, 77, 80, 94, 94, 98, 16, 31, 31, 15, 20))

If only there was a function that knows how to flatten those seven arrays into a single, union array. There is, and it is called flatMap, and it’s used exactly for situations like these, where the result of a transformation yields multiple arrays and you need to get all elements into one array. It basically works the same as map, in that it applies the provided function to all of the RDD’s elements, but the difference is that it concatenates multiple arrays into a collection that has one level of nesting less than what it received. This is its signature:

scala> val ids = lines.flatMap(_.split(","))
 ids: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at flatMap at :26

 scala> ids.collect
 res3: Array[String] = Array(15, 16, 20, 20, 77, 80, 94, 94, 98, 16, 31, 31, 15, 20)

 scala> ids.first
 res4: String = 15

If you want to format the output of the collect method, you can use a method of the Array class called mkString:

scala> val lines = sc.textFile("/home/spark/client-ids.log")
 lines: org.apache.spark.rdd.RDD[String] = /home/spark/client-ids.log MapPartitionsRDD[5] at textFile at :24

 scala> val ids = lines.flatMap(_.split(","))
 ids: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at flatMap at :26

 scala> ids.count
 res6: Long = 14

 scala> val uniqueIds = ids.distinct
 uniqueIds: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at distinct at :28

 scala> uniqueIds.count
 res7: Long = 8

 scala> uniqueIds.collect
 res8: Array[String] = Array(80, 20, 15, 31, 77, 98, 16, 94)

Leave a Comment