Justin Jaffray

blog notes

Durability and Redo Logging

24 Jan 2022

The most fundamental property a database can provide is durability. That is, once I’ve told you that your write has been accepted, if a mouse chews through the power cord for the server rack, the write will not be lost.

This obviously is only possible to a degree. If someone goes into your SSD with a magnetized needle and a steady hand and tweaks their bank balance, then (short of replication) the best you can probably do is detect that it’s been changed via a checksum, unless, of course, they had the foresight to update that as well.

If we had to enumerate a very rough list of the types of relevant failures that can occur, we might wind up with something like this:

Certainly only some of these can be dealt with, but in this post we’re going to talk about how to deal with problems that could be described as “fail-stop:” so either our program crashing, or the computer it’s running on ceasing to run (but in either case, its durable storage remaining intact). I have to caveat this post with the fact that we are ignoring large swathes of failure modes. If you’re curious about the depths to which the “writing a file” rabbit hole goes, I recommend Dan Luu’s post on the matter. However, the solution to most of these problems is “use checksums,” which is not that exciting, which is why I’m omitting it from this discussion.

This post is something of a spiritual successor to Compaction. I think they both stand on their own, but I suspect reading them both will be a more satisfying experience than either on their own.

A non-durable solution

We’re going to start with a strawman. Let’s implement a simple database that writes out its contents when we’re done with it.

Our approach will be this:

struct Db {
    data: HashMap<String, String>,
    fname: PathBuf,
}

impl Db {
    fn new<P>(f: P) -> Result<Self>
    where
        P: AsRef<Path>,
    {
        let data = std::fs::read_to_string(&f)
            .and_then(|v| Ok(serde_json::from_str(v.as_str())?))
            .unwrap_or_else(|_| HashMap::new());
        Ok(Db {
            data,
            fname: f.as_ref().to_path_buf(),
        })
    }

    fn set(&mut self, k: &str, v: &str) {
        self.data.insert(k.to_owned(), v.to_owned());
    }

    fn delete(&mut self, k: &str) {
        self.data.remove(k);
    }

    fn flush(&self) -> Result<()> {
        std::fs::write(&self.fname, serde_json::to_vec(&self.data)?)?;
        Ok(())
    }
}

This appears to work:

fn main() -> Result<()> {
    let mut db = Db::new("db_data")?;
    db.set("foo", "bar");
    db.flush()?;
    Ok(())
}

After running it and checking the data file, we see it has indeed serialized itself:

{"foo":"bar"}

Using this, our database can be brought back up after having been torn down:

fn main() -> Result<()> {
    let mut db = Db::new("db_data")?;
    println!("value of abc is {:?}", db.get("abc"));
    db.set("abc", "def");
    println!("value of abc is {:?}", db.get("abc"));

    db.flush();
    Ok(())
}

Running this once produces:

value of abc is None
value of abc is Some("def")

and again produces:

value of abc is Some("def")
value of abc is Some("def")

Signifying that we indeed serialized our database to disk. This is a pretty poor design, though.

From a durability perspective, the most glaring problem is that we have no guarantee the code in flush will be run. If we unplug our server rack or even merely kill -9 our database process, flush will never get a chance to run. If such a thing were to happen, we would lose all of our writes since the previous flush (and that’s saying nothing about what happens if one of those things happens during a flush).

We can observe this loss of writes easily, simulating our program being killed via a panic!:

fn main() -> Result<()> {
    let mut db = Db::new("db_data")?;
    println!("value of abc is {:?}", db.get("abc"))
    db.set("abc", "def");
    println!("value of abc is {:?}", db.get("abc"))
    panic!("")

    db.flush();
    Ok(())
}

Running it twice now produces, as you might expect:

db state: {}
db state: {"abc": "def"}
<panic>
db state: {}
db state: {"abc": "def"}
<panic>

This is kind of the fundamental no-no in systems that purport to provide durability: the system accepted the write both implicitly (by returning from the call to set without error), and explicitly (by serving that write as a later read), and yet that write was not durable, since we did not see it on the second run.

I find it helpful to be more precise about exactly what we mean when we say “durability.” Here’s some (nonstandard) terminology:

A database provides durability if every observed write is durable. It’s sometimes appropriate for a database to loosen this guarantee in the interest of performance, but for the purposes of this post we will stick with it.

One way we might (unsuccessfully) try to fix this is to simply flush after every write.

fn set(&mut self, k: &str, v: &str) {
    self.data.insert(k.to_owned(), v.to_owned());
    self.flush().unwrap();
}

Independent of the durability characteristics of this design, it’s untenable for larger databases since we’ll have to flush our entire dataset to disk on every write. So let’s step back and try a different fundamental design.

Enter: Logs

There’s a correspondence that’s well known in database-land that is taken as obvious but is not really stated explicitly that often, and that is the relationship between

This was once described to me as “a log is something of a universal Turing machine for data structures.” In our case, the state of our database is fully defined by the sequence of calls to set and delete. Thus, instead of storing the data structure directly, we can just write down the commands that produce it. What’s the simplest way to ensure we’ve stored all the commands we’ve been issued? Perhaps we could just, you know, write them all down?

Here’s another implementation of a database. This time, rather than the driver of the system being the in-memory representation of our data, here, we only care about the on-disk representation. When we receive a request to write data, we treat that as an opaque command and write it to disk.

#[derive(Debug)]
struct Db {
    log: File,
    fname: PathBuf,
}

#[derive(Serialize, Deserialize)]
enum Command<'a> {
    Set(&'a str, &'a str),
    Delete(&'a str),
}

impl Db {
    fn new<P>(f: P) -> Result<Self>
    where
        P: AsRef<Path>,
    {
        Ok(Db {
            log: OpenOptions::new().create(true).append(true).open(&f)?,
            fname: f.as_ref().to_path_buf(),
        })
    }

    fn apply_command(&mut self, command: &Command) -> Result<()> {
        self.log.write_all(&serde_json::to_vec(command)?)?;
        self.log.write_all(b"\n")?;
        Ok(())
    }

    fn set(&mut self, k: &str, v: &str) -> Result<()> {
        self.apply_command(&Command::Set(k, v))?;
        Ok(())
    }

    fn delete(&mut self, k: &str) -> Result<()> {
        self.apply_command(&Command::Delete(k))?;
        Ok(())
    }

    // ...
}

Now, if we run a series of inserts:

fn main() -> Result<()> {
    let mut db = Db::new("logfile")?;
    
    db.set("foo", "a")?;
    db.set("bar", "b")?;
    db.set("baz", "c")?;
    db.delete("bar")?;

    Ok(())
}

Checking the resulting logfile will show what we expect:

{"Set":["foo","a"]}
{"Set":["bar","b"]}
{"Set":["baz","c"]}
{"Delete":"bar"}

How do we service reads? Well, we have all the commands that have been issued so far, so we can just scan through them and track the value of the key we’re interested in:

fn get(&self, k: &str) -> Result<Option<String>> {
    let file = BufReader::new(File::open(&self.fname)?);
    let mut result = None;
    for line in file.lines() {
        match serde_json::from_str(&line?)? {
            Command::Set(new_k, v) => {
                if k == new_k {
                    result = Some(v.to_owned());
                }
            }
            Command::Delete(new_k) => {
                if k == new_k {
                    result = None;
                }
            }
        }
    }
    Ok(result)
}

Let’s check if it works:

fn main() -> Result<()> {
    let mut db = Db::new("logfile")?;

    println!("foo = {:?}", db.get("foo"));
    println!("bar = {:?}", db.get("bar"));
    println!("baz = {:?}", db.get("baz"));

    println!("\nperforming writes...\n");
    db.set("foo", "a")?;
    db.set("bar", "b")?;
    db.set("baz", "c")?;
    db.delete("bar")?;

    println!("foo = {:?}", db.get("foo"));
    println!("bar = {:?}", db.get("bar"));
    println!("baz = {:?}", db.get("baz"));

    Ok(())
}

This outputs:

foo = Ok(None)
bar = Ok(None)
baz = Ok(None)

performing writes...

foo = Ok(Some("a"))
bar = Ok(None)
baz = Ok(Some("c"))

If we run it again, we see that our writes have been persisted:

foo = Ok(Some("a"))
bar = Ok(None)
baz = Ok(Some("c"))

performing writes...

foo = Ok(Some("a"))
bar = Ok(None)
baz = Ok(Some("c"))

Now, since we’re writing out the commands as they come in, we don’t have to worry about a crash losing our results. By the time our call to set returns, it seems like we’ve definitely written the command to disk, right?

On Linux, you can induce a system reboot by writing a b to /proc/sysrq-trigger (if you’re going to try this you probably want to use a VM):

echo b > /proc/sysrq-trigger

We can sort of simulate an operating system level crash (or having the plug pulled on our server rack) by using this:

pub fn reboot_kernel() {
    File::create("/proc/sysrq-trigger")
        .unwrap()
        .write_all(b"b")
        .unwrap();
}

Let’s try running our test again, but with a OS-level crash.

fn main() -> Result<()> {
    let mut db = Db::new("logfile")?;

    println!("foo = {:?}", db.get("foo"));
    println!("bar = {:?}", db.get("bar"));
    println!("baz = {:?}", db.get("baz"));

    println!("\nperforming writes...\n");
    db.set("foo", "a")?;
    db.set("bar", "b")?;
    db.set("baz", "c")?;
    db.delete("bar")?;

    println!("foo = {:?}", db.get("foo"));
    println!("bar = {:?}", db.get("bar"));
    println!("baz = {:?}", db.get("baz"));

    reboot_kernel();

    Ok(())
}

Running this the first time (as root, obviously, to have the right to reboot the system) outputs what we’d expect:

foo = Ok(None)
bar = Ok(None)
baz = Ok(None)

performing writes...

foo = Ok(Some("a"))
bar = Ok(None)
baz = Ok(Some("c"))

Followed by a lack of interactivity from our VM as it dutifully reboots. Thankfully, we wrote all of our data to disk! Rerunning the program after a reboot however, our hopes are dashed:

foo = Ok(None)
bar = Ok(None)
baz = Ok(None)

performing writes...

foo = Ok(Some("a"))
bar = Ok(None)
baz = Ok(Some("c"))

Whoops!

What happened here? Had our writes been persisted, the first set of reads should have contained the values we wrote.

It turns out that writing data to disk has quite high latency, and performing a lot of individual writes to disk would be pretty slow if all of them had to actually go to disk. Because of this, operating systems typically buffer writes in an in-memory write-back cache. A write-back cache is one where any updates are written into the cache, and only synced to disk at some later point. This is in contrast to a write-through cache, where updates are written to both the in-memory cache and disk.

Most applications don’t care too much about this stuff, but for systems like a database that purport to do everything in their power to not lose a user’s data, this isn’t an appropriate design. For this reason, the fsync system call exists, which tells the operating system to sync all the outstanding data in a particular file to disk.

Rust exposes this system call via the File::sync_all method. If we call sync_all, when it returns successfully, the operating system promises that the data has been written to disk and will exist on disk even in the event of a hard crash like a power failure or OS crash. As I’ve tried to emphasize repeatedly, this is an oversimplification, and many things can go wrong depending on your filesystem configuration, but for today, let’s keep things simple and assume syncing is a reliable process.

If we add the appropriate call to sync_all

// ...
fn apply_command(&mut self, command: &Command) -> Result<()> {
    self.log.write_all(&serde_json::to_vec(command)?)?;
    self.log.write_all(b"\n")?;
    // New! //
    self.log.sync_all()?;
    //////////
    Ok(())
}
// ...

Our test from before now works: once we’ve observed the writes, even if we forcibly reboot the system, we can still observe them (note that the converse is not true: it’s possible for the writes to have been persisted despite not having been observed). Note that we might have to discard an incomplete command at the end of the log if we stopped before getting to fully write it.

This database is still subtly broken, however. Consider the following sequence of events.

This sounds like it might be a problem, let’s be scientific and see if we can observe it happening.

To make it easier to exercise this somewhat complicated sequence of events, I made a simple CLI:

fn main() -> Result<()> {
    let args: Vec<_> = std::env::args().collect();

    const LOGFILE: &str = ...;

    match args[1].as_str() {
        "get" => {
            let db = Db::new(LOGFILE)?;
            println!("{:?}", db.get(args[2].as_str()).unwrap());
        }
        "set-then-soft-crash" => {
            let mut db = Db::new(LOGFILE)?;
            db.set_unsynced(args[2].as_str(), args[3].as_str())?;
            panic!("i have been kill -9'd");
            db.sync_logfile();
        }
        "get-then-hard-crash" => {
            let db = Db::new(LOGFILE)?;
            println!("{:?}", db.get(args[2].as_str()).unwrap());
            reboot_kernel();
        }
        // ...
        _ => panic!("unhandled command"),
    }

    Ok(())
}

Let’s try this baby out.

# ./db get abc; ./db set-then-soft-crash abc xyz; ./db get-then-hard-crash abc
None
thread 'main' panicked at 'i have been kill -9'd', src/bin/with_panic.rs:119:13
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Some("xyz")
<OS reboots>

That Some("xyz") at the end is evidence that we read the value of "abc" that we set. After a reboot, let’s try reading again:

# ./db get abc
None

This is a problem, because, calling back to our earlier definition, the value has been observed (from before the kernel rebooted), but it was not durable (as evidenced by its disappearance).

Cool, our hypothesis has been validated. This should be easy to fix: if we sync the log when we open it, we should guarantee that no unsynced reads are ever served:

impl Db {
    fn new<P>(f: P) -> Result<Self>
    where
        P: AsRef<Path>,
    {
        let log = OpenOptions::new().create(true).append(true).open(&f)?;
        // New!
        log.sync_all()?;
        //
        Ok(Db {
            log,
            fname: f.as_ref().to_path_buf(),
        })
    }

    // ...
}

And this works! This is a completely functional database that provides durability. Let’s run our test again:

# ./db get abc; ./db set-then-soft-crash abc xyz; ./db get-then-hard-crash abc
None
thread 'main' panicked at 'i have been kill -9'd', src/bin/with_panic.rs:119:13
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Some("xyz")
<OS reboots>

After our reboot, we do indeed see:

# ./db get abc
Some("xyz")

Keep in mind that from a user’s perspective, it would have been OK to have either both values read be None or both values read be Some("xyz"). The process that did the write crashed, after all, so there’s no way to know if its write was actually successful or not without checking after the fact.

A durable and fast(er) design

You might rightfully object to this design. Despite providing durability, it provides very few other desirable properties. The first obvious problem is that reads are extremely slow. There’s an easy fix for that, we can just graft our first solution onto this one by introducing an in-memory cache in front of the log. This cache is sometimes called a memtable.

First, change the definition of our Db:

struct Db {
    log: File,
    memtable: HashMap<String, String>,
}

Then we add a function to apply a command directly to the memtable:

fn apply_command_to_memtable(memtable: &mut HashMap<String, String>, cmd: &Command) {
    match cmd {
        Command::Set(k, v) => {
            memtable.insert((*k).to_owned(), (*v).to_owned());
        }
        Command::Delete(k) => {
            memtable.remove(*k);
        }
    }
}

When we apply a command to our log, we also apply it to the memtable:

fn apply_command(&mut self, command: &Command) -> Result<()> {
    self.log.write_all(&serde_json::to_vec(command)?)?;
    self.log.write_all(b"\n")?;
    self.log.sync_all()?;
    Self::apply_command_to_memtable(&mut self.memtable, command);
    Ok(())
}

And finally, now, when we open the log, we have to replay it to rebuild the memtable:

fn new<P>(f: P) -> Result<Self>
where
    P: AsRef<Path>,
{
    let log = OpenOptions::new().create(true).append(true).open(&f)?;
    log.sync_all()?;
    let memtable = Self::replay_log(&f)?;
    Ok(Db { log, memtable })
}

fn replay_log<P>(f: P) -> Result<HashMap<String, String>>
where
    P: AsRef<Path>,
{
    let file = BufReader::new(File::open(f)?);
    let mut result = HashMap::new();
    for line in file.lines() {
        Self::apply_command_to_memtable(&mut result, &serde_json::from_str(line?.as_str())?);
    }
    Ok(result)
}

Our database now has reasonably fast reads along with durable writes. Since we redo the commands we’ve written to the log on startup, this style of logging is sometimes called “redo logging.”

Takeaways so Far

I have two main takeaways I want to leave you with.

First, this stuff is subtle! It’s really, really hard to tell whether you have this right or not. I tend to take a pretty scientific approach to the code I write. I’m not really convinced a line of code is required unless I can observe the system not working without it. Working with guarantees that are only observable when your operating system crashes makes it very difficult to prove to yourself that code does the right thing.

Second, while Write-Ahead Log (WAL) is the more common term for this mechanism, I personally don’t really like it, since it implies that the logging is the afterthought, rather than the primary mechanism. Like, “Before I do my thing, I’m going to write it to the log.” That’s wrong! The easier way to think about this is that the MEMTABLE (or some other indexed structure) is the afterthought. It’s just the cache in front of where the real action is: the log.

Next Steps

There’s at least three more glaring issues with our database:

  1. The size of our database is limited by memory: we can’t store more data than we can fit in a memtable, since it holds the entire state of the database.
  2. Our log still grows without bound, and we have no way to ever shrink it.
  3. Our throughput is quite bad.

(1) and (2) can both be addressed by periodically migrating data to an indexed, durable file called a sorted string table (SST). Following that thread will eventually lead us to re-deriving log-structured merge trees, which is a fun topic! We’re going deep on logging today, and so the issue we’re going to explore next is (3).

Improving Throughput

If we test the latency of a single sync, on my laptop it’s about 23ms:

let mut file = File::create("foo").unwrap();
file.write_all("bar".as_bytes()).unwrap();
let t = Instant::now();
file.sync_all().unwrap();
println!("{}ms", t.elapsed().as_secs_f64() * 1000.0);

Which sounds like we should be able to get around 1000/23 = ~43 writes/second, which we can verify with this very unscientific benchmark:

let file = dir.path().to_path_buf().join("logfile");
let mut db = Db::new(&file)?;
let t = Instant::now();
let mut writes = 0;
while Instant::now().duration_since(t) < Duration::from_millis(10000) {
    writes += 1;
    db.set("foo", "bar").unwrap();
}
println!("performed {} writes in 10 seconds", writes);

Which, for me, outputs

performed 437 writes in 10 seconds

Which matches our expectations, and, for most databases, is not very exciting throughput.

How can we do better? The major bottleneck in our system is the syncing. There’s a number of strategies we can use here. The most obvious answer is that we can reduce our guarantees. Disabling syncs improves throughput quite a lot:

// self.log.sync_all()?;
performed 1285768 writes in 10 seconds

We don’t have to go that far. If we were ok with looser guarantees, we could also only sync every n writes, or every 100ms, or something, to bound the amount of data loss that’s possible.

We could hold open the file multiple times and issue multiple write/fsyncs at once. This actually does work, but doesn’t actually really improve our throughput (at least on my MacBook) due to contention. If we want to improve our throughput, we need to find a way to turn multiple logical writes (Commands) into a single physical “batch.”

There are two main ways to do batching. One is to require users to construct batches themselves, which may be appropriate for some use cases. For other use cases, that may be onerous and it might make more sense for the database to handle construction of batches itself.

Batching Writes

The basic idea is that we will enforce having only a single write in flight at any time, and any writes that arrive while there’s an extant write will be put into a buffer to be written and synced once the current write finishes.

As an analogy, imagine a train that runs on a regular schedule between two points. Passengers who want to get from point A to point B will wait at point A (memory) for the train to come back, at which point it can take them all at once to point B (stable storage).

The way we handle multiple writes occurring once is across multiple threads. A more idiomatic approach in Rust today might be to use a cooperative scheduler like tokio, but to keep things slightly more language-agnostic I’m just going to use threads.

We are going to implement this, but keep in mind that I have what you might call “just enough knowledge of concurrency to do some real damage.”

The design is roughly based off of RocksDB and Pebble, but simplified because we’re going to sacrifice some concurrency by not using a lock-free data structure for our memtable, and will be happy to just lock it with a Mutex.

For any particular batch, there will be a “leader” thread, which will be responsible for actually doing the write and sync calls. The first thread to try to write to a batch will become the leader. Any others will become followers and will wait to be notified by the leader that their write has been committed.

There are thus two states our database can be in when a thread attempts to do a write:

enum DbState {
    // There may or may not be an outstanding fsync, and there is currently no
    // leader.
    Pending {
        // This condition variable will allow us to wait for the previous batch
        // to finish committing before we go and commit our own.
        prev_batch_notif: Arc<(Mutex<bool>, std::sync::Condvar)>,
    },
    // Outstanding fsync, there is a leader.
    PendingLeader {
        // If a new thread comes along and tries to write, it will stuff its
        // write into this buffer that the leader will use when it actually does
        // its write.
        writes: Vec<Command>,
        // This will tell us when the leader has finished writing and we can
        // safely return (informing the caller that their write has been
        // committed).
        batch_notif: Arc<(Mutex<bool>, std::sync::Condvar)>,
    },
}

Next, we’re going to wrap all the important stuff in our Db struct in an Arc<Mutex<...>> so that we can safely pass our Db between threads:

#[derive(Debug, Clone)]
struct Db {
    state: Arc<Mutex<DbState>>,
    log: Arc<Mutex<File>>,
    memtable: Arc<Mutex<HashMap<String, String>>>,
}

Now the process of doing a write is simply a matter of delicately managing the state machine.

To apply a command, we first lock the state variable:

fn apply_command(&mut self, command: &Command) -> Result<()> {
    let mut state = self.state.lock().unwrap();
    match &mut *state {

If we attempt to do a write and the database is in the Pending state, meaning we can become the leader, then we update the state and release our lock on the state variable to allow other writers to queue up:

        DbState::Pending { .. } => {
            // There's a pending batch, but no current leader. We shall
            // become the leader.
            let done = Arc::new((Mutex::new(false), std::sync::Condvar::new()));
            // Ensure any incoming participants know that there is already a
            // leader.
            let notif = if let DbState::Pending { prev_batch_notif } = std::mem::replace(
                &mut *state,
                DbState::PendingLeader {
                    writes: vec![command.clone()],
                    batch_notif: done.clone(),
                },
            ) {
                prev_batch_notif
            } else {
                panic!("invalid");
            };
            drop(state);

Then we wait for the previous batch, if there is one, to finish;

            Self::wait_for(notif);

Now we’re ready to commit our batch, so we can change the state back to Pending so any future writers will join the batch that comes after us.

            let mut state = self.state.lock().unwrap();
            let writes = if let DbState::PendingLeader { writes, .. } = std::mem::replace(
                &mut *state,
                DbState::Pending {
                    prev_batch_notif: done.clone(),
                },
            ) {
                writes
            } else {
                panic!("expected to still be the leader");
            };

Then we write all of the commands in our batch to the log:

            let mut log = self.log.lock().unwrap();
            drop(state);
            for command in &writes {
                log.write_all(&serde_json::to_vec(command)?)?;
                log.write_all(b"\n")?;
            }
            log.sync_all()?;

Finally, we’re ready to make the writes visible, by writing them to the memtable.

            // Now we apply each command to the memtable:
            let mut memtable = self.memtable.lock().unwrap();
            for command in &writes {
                Self::apply_command_to_memtable(&mut *memtable, command);
            }

At this point, the batch is written and visible, so we can notify all of out followers that their write has been applied and they can safely return.

            *done.0.lock().unwrap() = true;
            done.1.notify_all();
        }

If we are not the leader, things are much simpler, we just add our write into the pending batch and wait for the leader to tell us that it’s been committed.

        DbState::PendingLeader {
            writes,
            batch_notif,
        } => {
            writes.push(command.clone());
            let batch_notif = batch_notif.clone();
            drop(state);
            Self::wait_for(batch_notif);
        }
    }
    Ok(())
}

Running this across 8 threads, we do manage to improve our throughput:

we did 2353 writes

Which you’d probably still angrily call your Oracle vendor over in a production system, but is a fun improvement for a toy project.

This post was a very light introduction to log-based durability, but we’ve only scratched the surface. In particular, none of these implementations are totally safe, they don’t detect or recover from filesystem errors whatsoever, not to mention any of the myriad things that can go quietly wrong. In addition, there’s a lot of room to make this much faster.

If you’re interested in some further reading on this general area, I can recommend the following resources:

This post (even moreso than many others) was possible due to help from Bilal Akhtar. Also Rich, Forte, Ruchir, Ilia, Madhav, and Jordan. All mistakes are my own!