Databricks' Delta Lake: high on ACID
After reading the Snowflake paper, I got curious about how similar engines work. Also, as I mentioned in that article, I like knowing how the data sausage is made. So, here I will summarise the Delta Lake paper by Databricks.
- What is Delta Lake?
- Problems and benefits with cloud object stores
- Storage format and performance implications
- Storage Access Protocols
- Impact of the protocol
- Known limitations
- Wrap-up
What is Delta Lake?
Delta Lake is an internal product by Databricks, open sourced at the North American 2019 Spark Summit. Its goal is to offer ACID (Atomicity, Consistency, Isolation, Durability, a set of properties databases offer) guarantees over cloud-based object storage (that is Amazon’s S3 for instance) as well as optimise Spark workloads. How? Stay tuned and keep reading.
It also enables read-only access from Hive, Presto, AWS Athena, AWS Redshift, Snowflake and more. That is, these systems can read data stored as Delta. Write from CDC (change-data-capture) systems like Fivetran, Informatica, Qlik and Talend is also available.
It also has sped up access to BI tools via a new offering Delta Engine, a proprietary full-stage, vectorised native execution engine. There are no technical details on this paper or anywhere else (except for a presentation I’m yet to watch) about Delta Engine. I wonder if it has some relationship with Flare.
Note: it is interesting that reading from Delta is achieved via the symlink manifest file offered by Hive, which defines which files from the folder should be visible to the reading system.
Problems and benefits with cloud object stores
I will use OS
to mean Object Store from now on, since they are mentioned constantly. There will be no mention of operating systems in this post, so all appearances of OS
mean object store.
Why would anyone want to use cloud OS
s to store their data? Obviously, for cost reasons. OS
s are the cheapest storage available, and at scale. Imagine the alternative, having your own HDFS cluster, with its NameNode
and all the DataNodes
required to store this data. If you run the numbers, you’ll see how large the difference in cost is, as well as the difference in management required. There are some catches, especially when comparing against HDFS. Latency in HDFS is pretty low, (at least for a decently configured cluster against any OS
) which makes querying OS
s slower than querying HDFS or using a proper database system. Overcoming this latency is one of the fundamental goals of Delta Lake (and of Snowflake, but that was the subject of another post).
Using OS
has several tradeoffs to balance. Parallelism, sequential I/O, network bandwidth, all need to be considered to optimise usage. A simple example is full utilisation of network bandwidth: peak throughput in OS
is around 50/100 MB/s, but most compute instances (the ones reading this data from the network) have 10 GB network cards or virtual cards. Without adequate parallelisation of reads you have a lot of bandwidth idle.
OS
s don’t work as real filesystems, even if they look like they do. They actually work as key-value stores, where a key looks like a path. For example, if you have ever used Redis (or Aerospike), they follow more or less the same approach. Note that this is not the case for HDFS, which actually works like a real filesystem: this is what the FS part of HDFS stands for. This key-centeredness of OS
s explains one of the main issues with using them: LIST
operations, where you request a set or range of keys, are costly (unless you request things in a very specific way).
To overcome this, you may need to parallelise listing. Databricks (not Delta Lake, but Databricks’ version of Apache Spark) goes one step further, and worker nodes in Spark jobs issue LIST
operations themselves. One of the advantages of Delta is not needing to issue LIST
: that is already a large win.
Another issue, there are no generic cheap rename operation in OS
s, neither are updates to object content available. If you need to update a line in the middle of a file, you need to rewrite the whole file. This makes mutability really hard to handle.
Each OS
has its consistency model, and they get complex. All of them are eventually consistent, but the details of what happens right after a write operation can vary wildly (read-after-write is basically where many concurrency issues appear, think of the classic bank account deposit/withdrawal situation). No OS
is consistent across several keys, though.
There are some obvious approaches when storing relational data in an OS
. You want to be able to load data sequentially and with good compression. Why good compression? This needs further explanation.
Due to the speed of network vs speed of compression, you’d rather spend CPU time decompressing: see this article about Blosc for some details. To maximise compression while enabling sequentiality, you need to choose a columnar format, like Parquet or ORC. This has the additional advantage of using two formats with headers and footers with additional metadata, which act like partial zone maps (although you’d have to request them for each file, adding latency).
You want to have large files to compensate for network latency, since requesting many small files means you are handshaking too much as well as requesting many parts of files that are not data (headers, metadata). But not too large, since deletes/updates require rewriting the whole file and you don’t want to rewrite large files.
You should use some partitioning scheme, like Hive’s filename partitioning. Sadly, partitioning schemes and OS
s together don’t offer isolation nor atomicity, so you’d end with C and D only. Rolling back is hard, too, since versioning in OS
does not map correctly to what a database roll back would require.
There are several existing (if you count Delta) approaches to using OS
s to store relational data. The most common is the directory of files, something similar to Hive or HBase. On its own, there will be no atomicity and only eventual consistency. Given the slow metadata and listing capabilities, you end up with poor performance. You also lose any management (and auditing) capabilities unless you add them somehow.
Alternatively, you can have a custom storage based on OS
. This is what Snowflake does. In this case, all I/O needs to pass through a metadata service (that is the Cloud Layer in Snowflake), adding latency to each query. Since the engine is custom, you’d need engineering work to connect to other systems, unless the storage and engine were open source based or the company offered them already. And you are tied to this particular vendor for a long, long time. Note that Hive is an open-source implementation of this approach, although it comes with its issues.
Finally, you can use the Delta approach: metadata stored in OS
s. Apache Hudi and Apache Iceberg are two open-source implementations that were concurrently tackling this problem while Delta Lake was being developed. They don’t necessarily benefit from the advantages of Delta Lake, some of which will be detailed later.
Storage format and performance implications
Delta stores data using Parquet, a format that is familiar to anyone in the data space. Columnar, with good compression and performance. The layout in the OS
is using “folders” following Hive naming scheme for partitioning (like year=2020
). Individual files are named using GUID (Globally Unique Identifier).
The bulk of the ACID transaction control is handled via a write-ahead log, stored in the folder _delta_log
in the OS
. The log is formed of transaction-recording JSON files, zero-padded and incrementally numbered. Occasionally there will be checkpointing log files, where several previous logs are coalesced.
Each JSON file contains a record of an array of actions that have been applied to the data in the table:
- Change metadata: The metadata record contains the schema, column names, partitioning columns… The first record will always contain this. Note that this is very interesting information if you want to keep a catalog up to date or write your metadata engine: you only need to read this small JSON file.
- Add/Remove file: This is the fundamental action. Used when adding data or removing/updating data. When this action is recorded, statistics of the new file (minimum, maximum, etc) are added to the metadata section.
- Protocol evolution: This is internal for Delta, to know which version of the library can load these files.
- Adding provenance: To store information about who made the changes to the file. For auditing.
- Update application transaction ID: Stores information about which application has written this. Especially useful for streaming, to enable exactly once for user applications (and used internally for Spark Streaming). This goes into the field
txn
(I will mention this field again later).
During checkpointing log files are squashed together, writing new files on the OS
. This acts like a form of materialization, where the records define a set of operations forming a view (an isolated one) based on several files, and the checkpoint materializes it in a new set of files. The only records left in the checkpoint are the transaction information, protocol change and metadata changes. All add/remove get lost, like in a squash commit in Git. By default, Delta checkpoints every 10 transactions, but the user can either configure or trigger checkpointing manually (with the OPTIMIZE
SQL extension).
Checkpointing and change recording allows for efficient time travel and rollback. This is handled with the SQL extensions AS OF timestamp
and VERSION AS OF commit_id
. These can be used to see the table at time X or at record id X respectively.
Since whom changed what is also stored, there is a pretty good level of audit logging built into the system, via the provenance action.
By storing metadata information in separate files, there is no need to issue any LIST
commands for data files (possibly millions), only for transaction record files (dozens). Schema evolution is relatively easy: since the schema is stored in the records, many schema changes can be acted on transactionally and some can even be compacted (column drops for instance). There is no detail about how column additions are handled, although I imagine that the Delta connector is more lenient than the Spark Parquet reader (which would fail for non-existing columns when a query plan demands them). Some form of null if not present needs to be offered in this case.
Databricks’ own Delta Lake uses Z-Ordering to optimise the storage (Data layout optimisation). You can even choose the ordering approach and columns via an SQL extension (ZORDER BY
). If you use Databricks, you also get auto-optimise. There are no details about what this does, but it is not that hard to check most common query paths and automatically set the Z-Ordering to speed up those queries. There are no details either about how Databricks implements Z-Ordering internally, but from comments in the performance evaluation section of the paper, it’s not “fully” Z-Ordering. But close. For each column in the Z-index, the whole dataset is at least partially ordered, as in, each file contains a chunk of the ordered set. Then, each file will have reasonable min-max metadata field that allows skipping it when querying outside those ranges. Compare this with Redshift’s interleaved indices (this PDF from Chartio also talks about zone maps and is an excellent resource).
Thanks to Josep Martínez Vilà for giving me two Databricks-related Z-order links:
Storage Access Protocols
The access protocol is the foundation to enable ACID transactions. Reads and writes are handled differently, since the problems presented are also of different kinds. It is important to keep in mind that transaction records (the JSON file names) are serial.
Read protocol
Reads are isolated either via snapshot isolation or due to the serializability of the record logs. Transaction isolation is at the table level. Given that files are immutable, once files are read they can be efficiently cached locally in worker nodes until there are metadata changes. The protocol for a read is as follows:
- First read the
_last_checkpoint
object, and if it exists use it to find the latest record ID to read. - Issue a
LIST
request for a prefix based on the latest record checkpoint ID or starting at 0. With this, the current state of the table can be reconstructed. Eventual consistency can make thisLIST
fail, returning non-contiguous records (skipping a few). The client reader then knows to wait for the intermediate results to be available, since they need to exist. - The current state of the table is reconstructed from checkpoints and/or actions. Reading all these (Parquet files for the data and JSON records for additional metadata) will be done in parallel.
- Metadata statistics from each file (coming from the JSON records) will be used to determine which files are relevant for the current query.
- Data is read (possibly in parallel) from the
OS
. Due to the eventual consistency, some workers may not see the objects they have to access. They will keep retrying until they are present.
Note that with this scheme, the clear weak point is with being sure the latest record (or checkpoint) is present and not eventually present. This is handled by the write protocol below.
Write protocol
- Find a recent log record ID using the first 2 steps of the read protocol. This finds record
r
and the write step will create recordr+1
. - Read the required data as in the read protocol, to reconstruct the state of the table.
- Create any new files the transaction needs to create (object names will be GUIDs). This may be done in parallel.
- Attempt to write the transaction log record
r+1
, if no other client has done so (note that this is where things get interesting/tricky: this needs to be done atomically). If the transaction fails it can be retried, possibly reusing the data already written in Parquet. - Optionally, checkpoint and make
_last_checkpoint
referencer+1
.
Step 4 depends on the cloud OS
implementation. Google Cloud Storage and Azure Blob Store support atomic put-if-absent, which solves this issue. S3 does not have this property, and Databricks (their internal offering) has a lightweight coordination service that makes sure only one client can record for this ID (which makes it the same you’d have in the custom engine scenario, although only applies to write operations, not read operations). In the open-source version, all writes going through the same driver get guaranteed different log record ids. That is not a great solution if you use more than one driver/job writing to the same place, but to be fair if you do you deserve whatever happens. If you use HDFS or Azure Data Lake Storage, Delta will use atomic renames to rename a temporary written record file to the final one.
Impact of the protocol
The need to keep record logs and fight with the eventual consistency of the OS
reduces the transaction rate Delta Lake can handle. Step 4 in the write step is the limiting factor: the need to wait for the latency of put-if-absent. The rate mentioned in the paper is “several transactions per second”. There is no further qualification of this several, but we can assume it’s less than 20-30 (otherwise you get into the “dozens” area).
Crucially though, the paper says that even for streaming this transaction level is enough. This is because Delta Lake is optimised for streaming ingest and consume. It uses compaction of small writes transactionally, which readers don’t see (you can keep reading old files while another job is creating a compaction on new files with a posterior record log). This is what allows efficient UPSERT/DELETE/MERGE
operations. Readers can keep reading the “old” files while background processes are modifying files. It effectively offers snapshot isolation.
It uses the txn
field to handle exactly once semantics (I’m not exactly sure how they handle this, I’d need to review the code to be sure). Since records have increasing identifiers, it is easy to read from the tail of the table in streaming systems.
Known limitations
These are the limitations identified in the paper:
- Table transactions are at the level of a table. This is a reasonable decision to take, since handling transactions across tables causes a lot of contention (and potentially, locks).
- For streaming, the limiting factor is the latency of the underlying
OS
. There is not a lot to be handled here. - Except the min/max per column added to metadata, there are no other secondary indexes. There is a working prototype according to the paper of Bloom indices, something you can find also in Snowflake.
Wrap-up
I expected a bit more detail in this paper, to be fair. Although it is detailed enough to know how it works, it really doesn’t scratch the itch of knowing the nitty-gritty details of some decisions and choices. Particularly, no mention of how Z-Ordering is achieved, or what are the implications of checkpointing. Let me explain.
One of the main selling points from Databricks in the past two years has been GDPR right-to-remove capabilities. In your classical OS
data lake, a removal request implies:
- find all files with that identifier,
- write files without that identifier,
- rename the new to the old or whatever you need to make it consistent.
Databricks argues that this is expensive due to the reading, writing and handling. I agree on the fact that handling this process is a pain, but I still don’t see where we get a particular speed-up in this case when using Delta. Compaction is not free, some worker node that you are paying for will eventually run steps 1-3. Automatically, yes, but still will cost compute power.
The speed up promises, though, seem realistic. Immutability without needing to issue LIST
requests are a big speed up for caching, and zone maps/Z-ordering should be a significant speed up for many work scenarios. After all, the fastest way to scan a file is being aware the data is not there and not even loading it.