Apache Spark Tutorial (Part 2 - RDD)
Resilient Distributed Datasets (RDD): RDD is an abstraction, a fundamental unit of data and computation in Spark. As the name indicates, among others, they have two key features:
They are resilient: If the data in memory is lost, an RDD can be recreated
They are distributed: You can Java objects or Python objects that are distributed across clusters
More details about RDD will be discussed later in this post.
Sample scala program:
You can monitor the jobs that are running on this cluster from Spark UI, which is running by default at port 4040. If you navigate your browser to http://localhost:4040, you should see the following Spark driver program UI:
The UI gives you an overview of the type of job, its submission date/time, the amount of time it took, and the number of stages that it had to pass through. If you want to look at the details of the job, simply click the description of the job, which will take you to another web page that details all the completed stages. You might want to look at individual stages of the job. If you click through the individual stage, you can get detailed metrics about your job.
RDD (Transformations & Actions)
RDD is an abstraction, a fundamental unit of data and computation in Spark. As the name indicates, among others, they have two key features:
They are resilient: If the data in memory is lost, an RDD can be recreated
They are distributed: You can Java objects or Python objects that are distributed across clusters
For example, an Airbus A350 has roughly 6000 sensors across the entire plane and generates 2.5 TB data per day, while the newer model expected to launch in 2020 will generate roughly 7.5 TB per day. From a data engineering point of view, it might be important to understand the data pipeline, but from an analyst and a data scientist point of view, the major concern is to analyze the data irrespective of the size and number of nodes across which it has been stored. This is where the neatness of the RDD concept comes into play, where the sensor data can be encapsulated as an RDD concept, and any transformation/action that you perform on the RDD applies across the entire dataset. Six month's worth of dataset for an A350 would be approximately 450 TBs of data, and would need to sit across multiple machines.
For the sake of discussion, we assume that you are working on a cluster of four worker machines. Your data would be partitioned across the workers as follows
The figure basically explains that an RDD is a distributed collection of the data, and the framework distributes the data across the cluster. Data distribution across a set of machines brings its own set of nuisances including recovering from node failures. RDD's are resilient as they can be recomputed from the RDD lineage graph, which is basically a graph of e entire parent RDDs of the RDD. In addition to resilience, distribution, and representing a data set, an RDD has various other distinguishing qualities:
In Memory: An RDD is a memory-resident collection of objects. We'll look at options where an RDD can be stored in memory, on disk, or both. However, the execution speed of Spark stems from the fact that the data is in memory, and is not fetched from disk for each operation.
Partitioned: A partition is a division of a logical dataset or constituent elements into independent parts. Partitioning is a defacto performance optimization technique in distributed systems to achieve minimal network traffic, a killer for high performance workloads. The objective of partitioning in key-value oriented data is to collocate a similar range of keys and in effect, minimize shuffling. Data inside RDD is split into partitions and across various nodes of the cluster. We'll discuss this in more detail later in this chapter.
Typed: Data in an RDD is strongly typed. When you create an RDD, all the elements are typed depending on the data type.
Lazy evaluation: The transformations in Spark are lazy, which means data inside RDD is not available until you perform an action. You can, however, make the data available at any time using a count() action on the RDD. We'll discuss this later and the benefits associated with it.
Immutable: An RDD once created cannot be changed. It can, however, be transformed into a new RDD by performing a set of transformations on it.
Parallel: An RDD is operated on in parallel. Since the data is spread across a cluster in various partitions, each partition is operated on in parallel.
Cacheable: Since RDD's are lazily evaluated, any action on an RDD will cause the RDD to revaluate all transformations that led to the creation of the RDD. This is generally not a desirable behavior on large datasets, and hence Spark allows the option to cache the data on memory or disk. We'll discuss caching later.
A typical Spark program flow with an RDD includes:
Creation of an RDD from a data source.
A set of transformations, for example, filter, map, join, and so on.
Persisting the RDD to avoid re-execution.
Calling actions on the RDD to start performing parallel operations across the cluster.
There are two major ways of creating an RDD:
Parallelizing an existing collection in your driver program.
Parallelizing collections are created by calling the parallelize() method on SparkContext within your driver program. The parallelize() method asks Spark to create a new RDD based on the dataset provided. Once the local collection/dataset is converted into an RDD, it can be operated on in parallel. Parallelize() is often used for prototyping and not often used in production environments due to the need of the data set being available on a single machine
Example:
Val namesList = sc.parallelize(List(“rob”,”james”,”ardian”,”greg”,”paul”,”jochen”))
namesList.count()
Creating an RDD by referencing an external data source, for example, Filesystem, HDFS, HBase, or any data source capable of providing a Hadoop Input format.
For production use, Spark can load data from any storage source supported by Hadoop ranging from a text file on your local file system, to data available on HDFS, HBase, or Amazon S3.
hadoopFile(): Create an RDD from a Hadoop file with an arbitrary input format
objectFile(): Load an RDD saved as SequenceFile containing serialized objects, with NullWritable keys and BytesWritable values that contain a serialized partition
sequenceFile(): Create an RDD from the Hadoop sequence file with a given key and value types
textFile(): A method that we have already seen, which reads a textFile from either HDFS or a local file system and returns as an RDD of strings
Operations on RDD:
Two major operations types can be performed on RDD. They are
Transformations
Actions
Transformations: Transformations are operations that create a new dataset, as RDDs are immutable. They are used to transform data from one to another, which could result in amplification of the data, reduction of the data, or a totally different shape altogether. These operations do not return any value back to the driver program, and hence are lazily evaluated, which is one of the main benefits of Spark
Actions: Actions are operations that return a value to the driver program. As previously discussed, all transformations in Spark are lazy, which essentially means that Spark remembers all the transformations carried out on an RDD, and applies them in the most optimal fashion when an action is called.
RDD Operations:
val fruits = sc.textFile("file:///home/ubuntu/spark-data/fruits.txt")
val yellowThings = sc.textFile("file:///home/ubuntu/spark-data/yellowthings.txt")
What are RDD operations?
RDDs support two types of operations: transformations and actions. Transformations create a new dataset from an existing one. Transformations are lazy, meaning that no transformation is executed until you execute an action. Actions return a value to the driver program after running a computation on the dataset.
RDD transformations, following are examples of some of the common transformations available.
/* map */
val fruitsReversed = fruits.map((fruit) => fruit.reverse)
fruitsReversed.foreach(println)
/* filter */
val shortFruits = fruits.filter((fruit) => fruit.length <= 5)
shortFruits.foreach(println)
/* flatMap */
val characters = fruits.flatMap((fruit) => fruit.toList)
characters.foreach(println)
/* union */
val fruitsAndYellowThings = fruits.union(yellowThings)
fruitsAndYellowThings.foreach(println)
/* intersection */
val yellowFruits = fruits.intersection(yellowThings)
yellowFruits.foreach(println)
/* distinct */
val distinctFruitsAndYellowThings = fruitsAndYellowThings.distinct()
distinctFruitsAndYellowThings.foreach(println)
/* groupByKey */
val yellowThingsByFirstLetter = yellowThings.map((thing) => (thing(0), thing)).groupByKey()
yellowThingsByFirstLetter.foreach(println)
/* reduceByKey */
val numFruitsByLength = fruits.map((fruit) => (fruit.length, 1)).reduceByKey((x, y) => x + y)
numFruitsByLength.foreach(println)
RDD actions, following are examples of some of the common actions available.
/* collect */
val fruitsArray = fruits.collect()
val yellowThingsArray = yellowThings.collect()
/* count */
val numFruits = fruits.count()
/* take */
val first3Fruits = fruits.take(3)
/* reduce */
val letterSet = fruits.map((fruit) => fruit.toSet).reduce((x, y) => x ++ y)