diff options
-rw-r--r-- | src/model/block.rs | 10 | ||||
-rw-r--r-- | src/table/data.rs | 2 | ||||
-rw-r--r-- | src/table/gc.rs | 108 |
3 files changed, 76 insertions, 44 deletions
diff --git a/src/model/block.rs b/src/model/block.rs index 3f40aaab..8b1919bb 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -1,3 +1,4 @@ +use std::convert::TryInto; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; @@ -422,7 +423,7 @@ impl BlockManager { async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, Error> { if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? { - let time_msec = u64_from_be_bytes(&time_bytes[0..8]); + let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap()); let now = now_msec(); if now >= time_msec { let hash = Hash::try_from(&hash_bytes[..]).unwrap(); @@ -705,13 +706,6 @@ impl BlockManagerLocked { } } -fn u64_from_be_bytes<T: AsRef<[u8]>>(bytes: T) -> u64 { - assert!(bytes.as_ref().len() == 8); - let mut x8 = [0u8; 8]; - x8.copy_from_slice(bytes.as_ref()); - u64::from_be_bytes(x8) -} - /// Describes the state of the reference counter for a block #[derive(Clone, Copy, Debug)] enum RcEntry { diff --git a/src/table/data.rs b/src/table/data.rs index 13ec62bf..fb0b6d02 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -55,7 +55,7 @@ where .expect("Unable to open DB Merkle TODO tree"); let gc_todo = db - .open_tree(&format!("{}:gc_todo", name)) + .open_tree(&format!("{}:gc_todo_v2", name)) .expect("Unable to open DB tree"); Arc::new(Self { diff --git a/src/table/gc.rs b/src/table/gc.rs index cbd91b3a..98d7c95d 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::convert::TryInto; use std::sync::Arc; use std::time::Duration; @@ -13,6 +14,7 @@ use tokio::sync::watch; use garage_util::data::*; use garage_util::error::*; +use garage_util::time::*; use garage_rpc::system::System; use garage_rpc::*; @@ -24,6 +26,11 @@ use crate::schema::*; const TABLE_GC_BATCH_SIZE: usize = 1024; const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30); +// GC delay for table entries: 1 day (24 hours) +// (the delay before the entry is added in the GC todo list +// and the moment the garbage collection actually happens) +const TABLE_GC_DELAY: Duration = Duration::from_secs(24 * 3600); + pub(crate) struct TableGc<F: TableSchema + 'static, R: TableReplication + 'static> { system: Arc<System>, data: Arc<TableData<F, R>>, @@ -72,35 +79,49 @@ where async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) { while !*must_exit.borrow() { match self.gc_loop_iter().await { - Ok(true) => { + Ok(None) => { // Stuff was done, loop immediately - continue; } - Ok(false) => { - // Nothing was done, sleep for some time (below) + Ok(Some(wait_delay)) => { + // Nothing was done, wait specified delay. + select! { + _ = tokio::time::sleep(wait_delay).fuse() => {}, + _ = must_exit.changed().fuse() => {}, + } } Err(e) => { warn!("({}) Error doing GC: {}", self.data.name, e); } } - select! { - _ = tokio::time::sleep(Duration::from_secs(10)).fuse() => {}, - _ = must_exit.changed().fuse() => {}, - } } } - async fn gc_loop_iter(&self) -> Result<bool, Error> { + async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> { + let now = now_msec(); + let mut entries = vec![]; let mut excluded = vec![]; // List entries in the GC todo list // These entries are put there when a tombstone is inserted in the table - // This is detected and done in data.rs in update_entry + // (see update_entry in data.rs) for entry_kv in self.data.gc_todo.iter() { let (k, vhash) = entry_kv?; let mut todo_entry = GcTodoEntry::parse(&k, &vhash); + if todo_entry.deletion_time() > now { + if entries.is_empty() && excluded.is_empty() { + // If the earliest entry in the todo list shouldn't yet be processed, + // return a duration to wait in the loop + return Ok(Some(Duration::from_millis( + todo_entry.deletion_time() - now, + ))); + } else { + // Otherwise we have some entries to process, do a normal iteration. + break; + } + } + let vhash = Hash::try_from(&vhash[..]).unwrap(); // Check if the tombstone is still the current value of the entry. @@ -134,8 +155,9 @@ where // and for which they are still currently tombstones in the table. if entries.is_empty() { - // Nothing to do in this iteration - return Ok(false); + // Nothing to do in this iteration (no entries present) + // Wait for a default delay of 60 seconds + return Ok(Some(Duration::from_secs(60))); } debug!("({}) GC: doing {} items", self.data.name, entries.len()); @@ -181,7 +203,7 @@ where } if errs.is_empty() { - Ok(true) + Ok(None) } else { Err(Error::Message( errs.into_iter() @@ -189,19 +211,20 @@ where .collect::<Vec<_>>() .join(", "), )) - .err_context("in try_send_and_delete:") + .err_context("in try_send_and_delete in table GC:") } } async fn try_send_and_delete( &self, nodes: Vec<Uuid>, - items: Vec<GcTodoEntry>, + mut items: Vec<GcTodoEntry>, ) -> Result<(), Error> { let n_items = items.len(); // Strategy: we first send all of the values to the remote nodes, - // to ensure that they are aware of the tombstone state. + // to ensure that they are aware of the tombstone state, + // and that the previous state was correctly overwritten // (if they have a newer state that overrides the tombstone, that's fine). // Second, once everyone is at least at the tombstone state, // we instruct everyone to delete the tombstone IF that is still their current state. @@ -209,13 +232,14 @@ where // tombstone in the CRDT lattice, and it will be propagated back to us at some point // (either just a regular update that hasn't reached us yet, or later when the // table is synced). + // Here, we store in updates all of the tombstones to send for step 1, // and in deletes the list of keys and hashes of value for step 2. let mut updates = vec![]; let mut deletes = vec![]; - for item in items { - updates.push(ByteBuf::from(item.value.unwrap())); - deletes.push((ByteBuf::from(item.key), item.value_hash)); + for item in items.iter_mut() { + updates.push(ByteBuf::from(item.value.take().unwrap())); + deletes.push((ByteBuf::from(item.key.clone()), item.value_hash)); } // Step 1: ensure everyone is at least at tombstone in CRDT lattice @@ -250,7 +274,7 @@ where .try_call_many( &self.endpoint, &nodes[..], - GcRpc::DeleteIfEqualHash(deletes.clone()), + GcRpc::DeleteIfEqualHash(deletes), RequestStrategy::with_priority(PRIO_BACKGROUND) .with_quorum(nodes.len()) .with_timeout(TABLE_GC_RPC_TIMEOUT), @@ -260,24 +284,16 @@ where // GC has been successfull for all of these entries. // We now remove them all from our local table and from the GC todo list. - for (k, vhash) in deletes { + for item in items { self.data - .delete_if_equal_hash(&k[..], vhash) + .delete_if_equal_hash(&item.key[..], item.value_hash) .err_context("GC: local delete tombstones")?; - self.todo_remove_if_equal(&k[..], vhash) + item.remove_if_equal(&self.data.gc_todo) .err_context("GC: remove from todo list after successfull GC")?; } Ok(()) } - - fn todo_remove_if_equal(&self, key: &[u8], vhash: Hash) -> Result<(), Error> { - let _ = self - .data - .gc_todo - .compare_and_swap::<_, _, Vec<u8>>(key, Some(vhash), None)?; - Ok(()) - } } #[async_trait] @@ -295,7 +311,6 @@ where GcRpc::DeleteIfEqualHash(items) => { for (key, vhash) in items.iter() { self.data.delete_if_equal_hash(&key[..], *vhash)?; - self.todo_remove_if_equal(&key[..], *vhash)?; } Ok(GcRpc::Ok) } @@ -307,7 +322,16 @@ where /// An entry stored in the gc_todo Sled tree associated with the table /// Contains helper function for parsing, saving, and removing /// such entry in Sled +/// +/// Format of an entry: +/// - key = 8 bytes: timestamp of tombstone +/// (used to implement GC delay) +/// n bytes: key in the main data table +/// - value = hash of the table entry to delete (the tombstone) +/// for verification purpose, because we don't want to delete +/// things that aren't tombstones pub(crate) struct GcTodoEntry { + tombstone_timestamp: u64, key: Vec<u8>, value_hash: Hash, value: Option<Vec<u8>>, @@ -319,6 +343,7 @@ impl GcTodoEntry { /// serialized value pub(crate) fn new(key: Vec<u8>, value_hash: Hash) -> Self { Self { + tombstone_timestamp: now_msec(), key, value_hash, value: None, @@ -328,7 +353,8 @@ impl GcTodoEntry { /// Parses a GcTodoEntry from a (k, v) pair stored in the gc_todo tree pub(crate) fn parse(sled_k: &[u8], sled_v: &[u8]) -> Self { Self { - key: sled_k.to_vec(), + tombstone_timestamp: u64::from_be_bytes(sled_k[0..8].try_into().unwrap()), + key: sled_k[8..].to_vec(), value_hash: Hash::try_from(sled_v).unwrap(), value: None, } @@ -336,7 +362,7 @@ impl GcTodoEntry { /// Saves the GcTodoEntry in the gc_todo tree pub(crate) fn save(&self, gc_todo_tree: &sled::Tree) -> Result<(), Error> { - gc_todo_tree.insert(&self.key[..], self.value_hash.as_slice())?; + gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?; Ok(()) } @@ -347,10 +373,22 @@ impl GcTodoEntry { /// what we have to do is still the same pub(crate) fn remove_if_equal(&self, gc_todo_tree: &sled::Tree) -> Result<(), Error> { let _ = gc_todo_tree.compare_and_swap::<_, _, Vec<u8>>( - &self.key[..], + &self.todo_table_key()[..], Some(self.value_hash), None, )?; Ok(()) } + + fn todo_table_key(&self) -> Vec<u8> { + [ + &u64::to_be_bytes(self.tombstone_timestamp)[..], + &self.key[..], + ] + .concat() + } + + fn deletion_time(&self) -> u64 { + self.tombstone_timestamp + TABLE_GC_DELAY.as_millis() as u64 + } } |