diff options
-rw-r--r-- | src/table/data.rs | 3 | ||||
-rw-r--r-- | src/table/gc.rs | 150 |
2 files changed, 130 insertions, 23 deletions
diff --git a/src/table/data.rs b/src/table/data.rs index ffd494d5..13ec62bf 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -12,6 +12,7 @@ use garage_util::error::*; use garage_rpc::system::System; use crate::crdt::Crdt; +use crate::gc::GcTodoEntry; use crate::replication::*; use crate::schema::*; @@ -176,7 +177,7 @@ where let pk_hash = Hash::try_from(&tree_key[..32]).unwrap(); let nodes = self.replication.write_nodes(&pk_hash); if nodes.first() == Some(&self.system.id) { - self.gc_todo.insert(&tree_key, new_bytes_hash.as_slice())?; + GcTodoEntry::new(tree_key, new_bytes_hash).save(&self.gc_todo)?; } } } diff --git a/src/table/gc.rs b/src/table/gc.rs index 9b3d60ff..cbd91b3a 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -12,7 +12,7 @@ use futures_util::future::*; use tokio::sync::watch; use garage_util::data::*; -use garage_util::error::Error; +use garage_util::error::*; use garage_rpc::system::System; use garage_rpc::*; @@ -24,7 +24,7 @@ use crate::schema::*; const TABLE_GC_BATCH_SIZE: usize = 1024; const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30); -pub struct TableGc<F: TableSchema + 'static, R: TableReplication + 'static> { +pub(crate) struct TableGc<F: TableSchema + 'static, R: TableReplication + 'static> { system: Arc<System>, data: Arc<TableData<F, R>>, @@ -94,31 +94,45 @@ where let mut entries = vec![]; let mut excluded = vec![]; - for item in self.data.gc_todo.iter() { - let (k, vhash) = item?; + // List entries in the GC todo list + // These entries are put there when a tombstone is inserted in the table + // This is detected and done in data.rs in update_entry + for entry_kv in self.data.gc_todo.iter() { + let (k, vhash) = entry_kv?; + let mut todo_entry = GcTodoEntry::parse(&k, &vhash); let vhash = Hash::try_from(&vhash[..]).unwrap(); - let v_opt = self + // Check if the tombstone is still the current value of the entry. + // If not, we don't actually want to GC it, and we will remove it + // from the gc_todo table later (below). + todo_entry.value = self .data .store .get(&k[..])? - .filter(|v| blake2sum(&v[..]) == vhash); + .filter(|v| blake2sum(&v[..]) == vhash) + .map(|v| v.to_vec()); - if let Some(v) = v_opt { - entries.push((ByteBuf::from(k.to_vec()), vhash, ByteBuf::from(v.to_vec()))); + if todo_entry.value.is_some() { + entries.push(todo_entry); if entries.len() >= TABLE_GC_BATCH_SIZE { break; } } else { - excluded.push((k, vhash)); + excluded.push(todo_entry); } } - for (k, vhash) in excluded { - self.todo_remove_if_equal(&k[..], vhash)?; + // Remove from gc_todo entries for tombstones where we have + // detected that the current value has changed and + // is no longer a tombstone. + for entry in excluded { + entry.remove_if_equal(&self.data.gc_todo)?; } + // Remaining in `entries` is the list of entries we want to GC, + // and for which they are still currently tombstones in the table. + if entries.is_empty() { // Nothing to do in this iteration return Ok(false); @@ -126,9 +140,17 @@ where debug!("({}) GC: doing {} items", self.data.name, entries.len()); + // Split entries to GC by the set of nodes on which they are stored. + // Here we call them partitions but they are not exactly + // the same as partitions as defined in the ring: those partitions + // are defined by the first 8 bits of the hash, but two of these + // partitions can be stored on the same set of nodes. + // Here we detect when entries are stored on the same set of nodes: + // even if they are not in the same 8-bit partition, we can still + // handle them together. let mut partitions = HashMap::new(); - for (k, vhash, v) in entries { - let pkh = Hash::try_from(&k[..32]).unwrap(); + for entry in entries { + let pkh = Hash::try_from(&entry.key[..32]).unwrap(); let mut nodes = self.data.replication.write_nodes(&pkh); nodes.retain(|x| *x != self.system.id); nodes.sort(); @@ -136,9 +158,12 @@ where if !partitions.contains_key(&nodes) { partitions.insert(nodes.clone(), vec![]); } - partitions.get_mut(&nodes).unwrap().push((k, vhash, v)); + partitions.get_mut(&nodes).unwrap().push(entry); } + // For each set of nodes that contains some items, + // ensure they are aware of the tombstone status, and once they + // are, instruct them to delete the entries. let resps = join_all( partitions .into_iter() @@ -146,6 +171,8 @@ where ) .await; + // Collect errors and return a single error value even if several + // errors occurred. let mut errs = vec![]; for resp in resps { if let Err(e) = resp { @@ -162,23 +189,40 @@ where .collect::<Vec<_>>() .join(", "), )) + .err_context("in try_send_and_delete:") } } async fn try_send_and_delete( &self, nodes: Vec<Uuid>, - items: Vec<(ByteBuf, Hash, ByteBuf)>, + items: Vec<GcTodoEntry>, ) -> Result<(), Error> { let n_items = items.len(); + // Strategy: we first send all of the values to the remote nodes, + // to ensure that they are aware of the tombstone state. + // (if they have a newer state that overrides the tombstone, that's fine). + // Second, once everyone is at least at the tombstone state, + // we instruct everyone to delete the tombstone IF that is still their current state. + // If they are now at a different state, it means that that state overrides the + // tombstone in the CRDT lattice, and it will be propagated back to us at some point + // (either just a regular update that hasn't reached us yet, or later when the + // table is synced). + // Here, we store in updates all of the tombstones to send for step 1, + // and in deletes the list of keys and hashes of value for step 2. let mut updates = vec![]; let mut deletes = vec![]; - for (k, vhash, v) in items { - updates.push(v); - deletes.push((k, vhash)); + for item in items { + updates.push(ByteBuf::from(item.value.unwrap())); + deletes.push((ByteBuf::from(item.key), item.value_hash)); } + // Step 1: ensure everyone is at least at tombstone in CRDT lattice + // Here the quorum is nodes.len(): we cannot tolerate even a single failure, + // otherwise old values before the tombstone might come back in the data. + // GC'ing is not a critical function of the system, so it's not a big + // deal if we can't do it right now. self.system .rpc .try_call_many( @@ -189,13 +233,18 @@ where .with_quorum(nodes.len()) .with_timeout(TABLE_GC_RPC_TIMEOUT), ) - .await?; + .await + .err_context("GC: send tombstones")?; info!( "({}) GC: {} items successfully pushed, will try to delete.", self.data.name, n_items ); + // Step 2: delete tombstones everywhere. + // Here we also fail if even a single node returns a failure: + // it means that the garbage collection wasn't completed and has + // to be retried later. self.system .rpc .try_call_many( @@ -206,11 +255,17 @@ where .with_quorum(nodes.len()) .with_timeout(TABLE_GC_RPC_TIMEOUT), ) - .await?; + .await + .err_context("GC: remote delete tombstones")?; + // GC has been successfull for all of these entries. + // We now remove them all from our local table and from the GC todo list. for (k, vhash) in deletes { - self.data.delete_if_equal_hash(&k[..], vhash)?; - self.todo_remove_if_equal(&k[..], vhash)?; + self.data + .delete_if_equal_hash(&k[..], vhash) + .err_context("GC: local delete tombstones")?; + self.todo_remove_if_equal(&k[..], vhash) + .err_context("GC: remove from todo list after successfull GC")?; } Ok(()) @@ -248,3 +303,54 @@ where } } } + +/// An entry stored in the gc_todo Sled tree associated with the table +/// Contains helper function for parsing, saving, and removing +/// such entry in Sled +pub(crate) struct GcTodoEntry { + key: Vec<u8>, + value_hash: Hash, + value: Option<Vec<u8>>, +} + +impl GcTodoEntry { + /// Creates a new GcTodoEntry (not saved in Sled) from its components: + /// the key of an entry in the table, and the hash of the associated + /// serialized value + pub(crate) fn new(key: Vec<u8>, value_hash: Hash) -> Self { + Self { + key, + value_hash, + value: None, + } + } + + /// Parses a GcTodoEntry from a (k, v) pair stored in the gc_todo tree + pub(crate) fn parse(sled_k: &[u8], sled_v: &[u8]) -> Self { + Self { + key: sled_k.to_vec(), + value_hash: Hash::try_from(sled_v).unwrap(), + value: None, + } + } + + /// Saves the GcTodoEntry in the gc_todo tree + pub(crate) fn save(&self, gc_todo_tree: &sled::Tree) -> Result<(), Error> { + gc_todo_tree.insert(&self.key[..], self.value_hash.as_slice())?; + Ok(()) + } + + /// Removes the GcTodoEntry from the gc_todo tree if the + /// hash of the serialized value is the same here as in the tree. + /// This is usefull to remove a todo entry only under the condition + /// that it has not changed since the time it was read, i.e. + /// what we have to do is still the same + pub(crate) fn remove_if_equal(&self, gc_todo_tree: &sled::Tree) -> Result<(), Error> { + let _ = gc_todo_tree.compare_and_swap::<_, _, Vec<u8>>( + &self.key[..], + Some(self.value_hash), + None, + )?; + Ok(()) + } +} |