The RDD paper: introducing the Spark general purpose framework
This is the next instalment on my quest to read and help understand interesting papers in the data space.
The RDD paper (Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Computing) is foundational: it established Spark as the winner of the big data framework wars of the early 2010s.
What may be surprising is that the ideas in the paper are completely straightforward to understand, even for non-developers. As part of this writing I will also mention a related paper, Delay Scheduling. Although you can have RDDs without delay scheduling, it is a very large part of what makes Spark fast.
- Manufacturing data
- Where is the fairy dust that makes it fast?
- The magic of Spark: winning the framework wars
- Not perfect though
Manufacturing data
Let’s start with an unrelated situation.
You are the owner of a small and thriving cleaning products factory. Your bestseller is ElephantClean
, ideal for cleaning large cages. Production is increasing, and your manufacturing pipeline is at full tilt.
Getting your product in the hands of consumers involves many steps. It all starts with the raw chemicals, which are bought in bulk in numbered parcels. The chemicals are processed in batches, depending on the size of the vats needed to mix them. When the final mixture is ready, a machine fills the plastic bottles. They are then arranged in boxes ready for shipment to direct clients and distributors.
The machines that mix the chemicals are as close as possible to the raw material vats. You don’t want to pay for large container trucks, which are expensive to manage. A shorter distance reduces turn-out time, the time from raw material arriving to the factory to a bottle containing its processed materials.
Sometimes a bottle turns out faulty. This can be detected by your QA team, or more rarely, by a dissatisfied customer. When this happens, your team traces back the failed bottle, finding which mixture went into it, and then all related bottles to fix them for your clients. If a mixture turns out badly in the middle of the process, it is thrown out and prepared again. If some materials are faulty, they are re-ordered from providers. Reasonable, isn’t it?
In data processing we convert raw data into processed data.
There is one final parallel I need to establish. Transformations on data are classified between coarse grained and fine-grained.
- A fine-grained transformation affects only a row or data element. That top-notch barista that roasts her coffee.
- A coarse grained transformation affects the whole dataset, in operations like map, filter. A roasting company, going from producers up to home delivery of ground coffee bags.
Resilient Distributed Datasets: fault-tolerant, parallel data structures that…
RDDs are an abstraction built on top of data and operations. The properties of this abstraction are what lets Spark operate as you would in the factory.
An RDD defines an operation that is applied to another RDD. This chain of operations on RDDs starts with a read from a data source.
This operation (computation or read) is partitioned. For instance, when reading data from distributed storage, the partitioning can be machine 1 reads all files starting with the letter A. This is what makes distributing the work easy, like sending separate product batches in several trucks. Each RDD knows how to compute an output partition from input partitions. This defines a set of dependencies per partition, and allows us to talk about narrow dependencies and wide dependencies. Narrow means that only one partition per parent goes to an output partition (the quintessential example would be map), and wide is for multiple (where the example would be join).
The output partitioning can be customised optionally customised, a useful ability when you know the output is best distributed in a certain way.
The final fundamental piece of a RDD is a set of preferred locations for the partitions. This allows the RDD to execute compute
as close as possible to where the data is. When the RDD is defined as a read, “compute” is essentially a read, and if you are specifically using HDFS, there will be a partition per HDFS block.
Since RDDs depend on previous RDDs, if computing a partition fails, it can be retried by repeating all the operations up to that point only for the partition.
The compute
method has to be deterministic to allow recomputation. Otherwise, re-running failed paths would result in inconsistent data. This does not necessarily rule out using random number generators, it just means that the seed has to be fixed by the driver.
I hope my small cleaning factory story makes sense now.
Production materials (partitions) are kept close to the machines that have to process (compute) them. Production is traced in batches (partitions) so that when something fails it can be prepared (computed) again from the previous materials (previous RDDs).
Where is the fairy dust that makes it fast?
Transformations on RDDs are lazy. This is something that used to be a problem for people starting to work with Spark. But it also was one of its initial advantages. The laziness means that you can define a very long chain of transformations instantaneously, and it won’t start processing the data until an action happens. The distinction is pretty intuitive: transformations transform data, actions do something with the data.
This tiny and seemingly useless part was actually a big difference. You could use an interactive shell (based on the Scala shell) to operate immediately on large datasets.
The second sprinkle of fairy dust is delay scheduling. This was originally proposed for MapReduce workflows in Hadoop, but it made it directly into Spark from day one. RDDs have a preferred location to compute something, which usually matches the location where the data is (disk or memory). Ideally, you want to compute at that location, but this is not always possible. A similar problem happens with MapReduce workloads: you want to avoid moving data around, but you also want to avoid leaving your machines idle while you wait for only one thing to finish.
The solution is delay scheduling. It probably sounds more interesting than what it is. Imagine that you go to a big retail store to make a large purchase, a store where one of your friends works. You want to talk to him and maybe help him earn a commission, but you also want to make it home early. He’s busy. How to solve the problem? Easy. You wait for a while to see if he can process your order, and after a while, you go for another assistant. That’s delay scheduling.
The magic of Spark: winning the framework wars
Obviously, this is just part of the story. RDDs are fundamental for Spark, but the power of the framework lies in its ergonomics (its ease of use and capabilities). It was the first framework where you could run diverse computational workloads. Users only have to write a high-level program in the driver, and the driver connects to a cluster of workers. These workers are long-lived, which means they can keep data in RAM. The code in the driver is the one that keeps track of lineage and RDD operations.
The first implementation of Spark (the one in the paper) was written on top of the Mesos cluster manager. Workers and driver are separate Mesos applications, and Mesos handled resource sharing and assignment. Nowadays Spark runs also on YARN and Kubernetes, as well as having a “standalone” mode.
Arguments to RDD operations are passed around as serialised closures (a closure is a function together with a lexical binding… sorry, won’t explain closures here 😅). To make this work, the Spark developers had to play around Scala issues with serialising closures (using reflection, which in general is pretty evil). The summarised version is that you write functions in Scala and magically they are passed to the workers, where they are used on the data.
The other advantage is the in-memory capabilities. The RDD abstraction simplifies keeping data in memory, and keeping data in memory makes it easier to re-use. As a user, you had more or less complete control over this (even offering several methods, like deserialised Java objets, serialised Java objects or custom serialisers, like Kryo), being able to cache in memory, or persist to disk with an easy command, any RDD, at any time. Coupled with the interactivity you get via laziness and a shell made Spark the first general purpose system where you could process terabytes of data at interactive speeds. You could just type and get an answer, almost instantly.
The Spark scheduler (based on the Dryad scheduler according to the paper) factors what is in memory/disk in each node, and using the lineage RDDs define builds a DAG of stages. Stages are blocks of operations that can run without needing to move data around (an operation known as a shuffle), or in other words, operations that can be pipelined. For instance, if you are mapping and filtering, these two operations run together in one go, without moving data around. As an additional optimisation, in operations with wide dependencies intermediate records are physically materialised on nodes requiring these partitions.
This fixation on keeping data in place made easy some workloads that were cumbersome before. For instance, iterative machine learning and graph algorithms run with heavy data re-use. One example would be the PageRank indexing algorithm, which is at its core a repeated matrix computation. When the Spark/RDDs paper was written, the only way you could re-use data was to write to a distributed filesystem, with the large cost of writing to disk.
The fact that lineage keeps track of the process and allows recomputation means that Spark didn’t need to checkpoint. In a distributed computation a checkpoint is a specific point in the process where all data is stored, and establishes a point where you can pick up if something fails. This does not mean that Spark does not checkpoint: If the lineage is very long Spark will checkpoint.
Failure recovery and laggards (slow partitions) are handled as well by the scheduler. Missing partitions are recomputed straight from available data, laggards are pre-emptively scheduled in other, available nodes.
If you want to know a bit more about how Spark works internally (with a focus on the Python integration, PySpark) you can have a look at this presentation (from 2019s Spark+AI Summit). Press s
to enter presenter mode, this way you can see the footnotes: they should explain more or less everything I want to say in any slide (and sometimes more).
Not perfect though
Of course, there is no framework that can do everything. Spark was (and is not) suited for async-heavy computations, or those needing fine-grained updates to shared state. For instance, you would never write a web app or incremental web crawler in Spark. You would totally use it to analyse the logs or output of either, though.
If you are a Spark user nowadays you may be thinking: RDDs are lame and old! That is only partially true, if the Catalyst optimiser does not kick in, Datasets and Dataframes are implemented on top of the RDD framework.
In a future post I will cover the paper where dataframes and Catalyst were introduced, Spark SQL: Relational Data Processing in Spark. This will probably tie the loop between this post and Databricks' Delta Lake: high on ACID.