aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/table/data.rs3
-rw-r--r--src/table/gc.rs150
2 files changed, 130 insertions, 23 deletions
diff --git a/src/table/data.rs b/src/table/data.rs
index ffd494d5..13ec62bf 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -12,6 +12,7 @@ use garage_util::error::*;
use garage_rpc::system::System;
use crate::crdt::Crdt;
+use crate::gc::GcTodoEntry;
use crate::replication::*;
use crate::schema::*;
@@ -176,7 +177,7 @@ where
let pk_hash = Hash::try_from(&tree_key[..32]).unwrap();
let nodes = self.replication.write_nodes(&pk_hash);
if nodes.first() == Some(&self.system.id) {
- self.gc_todo.insert(&tree_key, new_bytes_hash.as_slice())?;
+ GcTodoEntry::new(tree_key, new_bytes_hash).save(&self.gc_todo)?;
}
}
}
diff --git a/src/table/gc.rs b/src/table/gc.rs
index 9b3d60ff..cbd91b3a 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -12,7 +12,7 @@ use futures_util::future::*;
use tokio::sync::watch;
use garage_util::data::*;
-use garage_util::error::Error;
+use garage_util::error::*;
use garage_rpc::system::System;
use garage_rpc::*;
@@ -24,7 +24,7 @@ use crate::schema::*;
const TABLE_GC_BATCH_SIZE: usize = 1024;
const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
-pub struct TableGc<F: TableSchema + 'static, R: TableReplication + 'static> {
+pub(crate) struct TableGc<F: TableSchema + 'static, R: TableReplication + 'static> {
system: Arc<System>,
data: Arc<TableData<F, R>>,
@@ -94,31 +94,45 @@ where
let mut entries = vec![];
let mut excluded = vec![];
- for item in self.data.gc_todo.iter() {
- let (k, vhash) = item?;
+ // List entries in the GC todo list
+ // These entries are put there when a tombstone is inserted in the table
+ // This is detected and done in data.rs in update_entry
+ for entry_kv in self.data.gc_todo.iter() {
+ let (k, vhash) = entry_kv?;
+ let mut todo_entry = GcTodoEntry::parse(&k, &vhash);
let vhash = Hash::try_from(&vhash[..]).unwrap();
- let v_opt = self
+ // Check if the tombstone is still the current value of the entry.
+ // If not, we don't actually want to GC it, and we will remove it
+ // from the gc_todo table later (below).
+ todo_entry.value = self
.data
.store
.get(&k[..])?
- .filter(|v| blake2sum(&v[..]) == vhash);
+ .filter(|v| blake2sum(&v[..]) == vhash)
+ .map(|v| v.to_vec());
- if let Some(v) = v_opt {
- entries.push((ByteBuf::from(k.to_vec()), vhash, ByteBuf::from(v.to_vec())));
+ if todo_entry.value.is_some() {
+ entries.push(todo_entry);
if entries.len() >= TABLE_GC_BATCH_SIZE {
break;
}
} else {
- excluded.push((k, vhash));
+ excluded.push(todo_entry);
}
}
- for (k, vhash) in excluded {
- self.todo_remove_if_equal(&k[..], vhash)?;
+ // Remove from gc_todo entries for tombstones where we have
+ // detected that the current value has changed and
+ // is no longer a tombstone.
+ for entry in excluded {
+ entry.remove_if_equal(&self.data.gc_todo)?;
}
+ // Remaining in `entries` is the list of entries we want to GC,
+ // and for which they are still currently tombstones in the table.
+
if entries.is_empty() {
// Nothing to do in this iteration
return Ok(false);
@@ -126,9 +140,17 @@ where
debug!("({}) GC: doing {} items", self.data.name, entries.len());
+ // Split entries to GC by the set of nodes on which they are stored.
+ // Here we call them partitions but they are not exactly
+ // the same as partitions as defined in the ring: those partitions
+ // are defined by the first 8 bits of the hash, but two of these
+ // partitions can be stored on the same set of nodes.
+ // Here we detect when entries are stored on the same set of nodes:
+ // even if they are not in the same 8-bit partition, we can still
+ // handle them together.
let mut partitions = HashMap::new();
- for (k, vhash, v) in entries {
- let pkh = Hash::try_from(&k[..32]).unwrap();
+ for entry in entries {
+ let pkh = Hash::try_from(&entry.key[..32]).unwrap();
let mut nodes = self.data.replication.write_nodes(&pkh);
nodes.retain(|x| *x != self.system.id);
nodes.sort();
@@ -136,9 +158,12 @@ where
if !partitions.contains_key(&nodes) {
partitions.insert(nodes.clone(), vec![]);
}
- partitions.get_mut(&nodes).unwrap().push((k, vhash, v));
+ partitions.get_mut(&nodes).unwrap().push(entry);
}
+ // For each set of nodes that contains some items,
+ // ensure they are aware of the tombstone status, and once they
+ // are, instruct them to delete the entries.
let resps = join_all(
partitions
.into_iter()
@@ -146,6 +171,8 @@ where
)
.await;
+ // Collect errors and return a single error value even if several
+ // errors occurred.
let mut errs = vec![];
for resp in resps {
if let Err(e) = resp {
@@ -162,23 +189,40 @@ where
.collect::<Vec<_>>()
.join(", "),
))
+ .err_context("in try_send_and_delete:")
}
}
async fn try_send_and_delete(
&self,
nodes: Vec<Uuid>,
- items: Vec<(ByteBuf, Hash, ByteBuf)>,
+ items: Vec<GcTodoEntry>,
) -> Result<(), Error> {
let n_items = items.len();
+ // Strategy: we first send all of the values to the remote nodes,
+ // to ensure that they are aware of the tombstone state.
+ // (if they have a newer state that overrides the tombstone, that's fine).
+ // Second, once everyone is at least at the tombstone state,
+ // we instruct everyone to delete the tombstone IF that is still their current state.
+ // If they are now at a different state, it means that that state overrides the
+ // tombstone in the CRDT lattice, and it will be propagated back to us at some point
+ // (either just a regular update that hasn't reached us yet, or later when the
+ // table is synced).
+ // Here, we store in updates all of the tombstones to send for step 1,
+ // and in deletes the list of keys and hashes of value for step 2.
let mut updates = vec![];
let mut deletes = vec![];
- for (k, vhash, v) in items {
- updates.push(v);
- deletes.push((k, vhash));
+ for item in items {
+ updates.push(ByteBuf::from(item.value.unwrap()));
+ deletes.push((ByteBuf::from(item.key), item.value_hash));
}
+ // Step 1: ensure everyone is at least at tombstone in CRDT lattice
+ // Here the quorum is nodes.len(): we cannot tolerate even a single failure,
+ // otherwise old values before the tombstone might come back in the data.
+ // GC'ing is not a critical function of the system, so it's not a big
+ // deal if we can't do it right now.
self.system
.rpc
.try_call_many(
@@ -189,13 +233,18 @@ where
.with_quorum(nodes.len())
.with_timeout(TABLE_GC_RPC_TIMEOUT),
)
- .await?;
+ .await
+ .err_context("GC: send tombstones")?;
info!(
"({}) GC: {} items successfully pushed, will try to delete.",
self.data.name, n_items
);
+ // Step 2: delete tombstones everywhere.
+ // Here we also fail if even a single node returns a failure:
+ // it means that the garbage collection wasn't completed and has
+ // to be retried later.
self.system
.rpc
.try_call_many(
@@ -206,11 +255,17 @@ where
.with_quorum(nodes.len())
.with_timeout(TABLE_GC_RPC_TIMEOUT),
)
- .await?;
+ .await
+ .err_context("GC: remote delete tombstones")?;
+ // GC has been successfull for all of these entries.
+ // We now remove them all from our local table and from the GC todo list.
for (k, vhash) in deletes {
- self.data.delete_if_equal_hash(&k[..], vhash)?;
- self.todo_remove_if_equal(&k[..], vhash)?;
+ self.data
+ .delete_if_equal_hash(&k[..], vhash)
+ .err_context("GC: local delete tombstones")?;
+ self.todo_remove_if_equal(&k[..], vhash)
+ .err_context("GC: remove from todo list after successfull GC")?;
}
Ok(())
@@ -248,3 +303,54 @@ where
}
}
}
+
+/// An entry stored in the gc_todo Sled tree associated with the table
+/// Contains helper function for parsing, saving, and removing
+/// such entry in Sled
+pub(crate) struct GcTodoEntry {
+ key: Vec<u8>,
+ value_hash: Hash,
+ value: Option<Vec<u8>>,
+}
+
+impl GcTodoEntry {
+ /// Creates a new GcTodoEntry (not saved in Sled) from its components:
+ /// the key of an entry in the table, and the hash of the associated
+ /// serialized value
+ pub(crate) fn new(key: Vec<u8>, value_hash: Hash) -> Self {
+ Self {
+ key,
+ value_hash,
+ value: None,
+ }
+ }
+
+ /// Parses a GcTodoEntry from a (k, v) pair stored in the gc_todo tree
+ pub(crate) fn parse(sled_k: &[u8], sled_v: &[u8]) -> Self {
+ Self {
+ key: sled_k.to_vec(),
+ value_hash: Hash::try_from(sled_v).unwrap(),
+ value: None,
+ }
+ }
+
+ /// Saves the GcTodoEntry in the gc_todo tree
+ pub(crate) fn save(&self, gc_todo_tree: &sled::Tree) -> Result<(), Error> {
+ gc_todo_tree.insert(&self.key[..], self.value_hash.as_slice())?;
+ Ok(())
+ }
+
+ /// Removes the GcTodoEntry from the gc_todo tree if the
+ /// hash of the serialized value is the same here as in the tree.
+ /// This is usefull to remove a todo entry only under the condition
+ /// that it has not changed since the time it was read, i.e.
+ /// what we have to do is still the same
+ pub(crate) fn remove_if_equal(&self, gc_todo_tree: &sled::Tree) -> Result<(), Error> {
+ let _ = gc_todo_tree.compare_and_swap::<_, _, Vec<u8>>(
+ &self.key[..],
+ Some(self.value_hash),
+ None,
+ )?;
+ Ok(())
+ }
+}