diff options
author | Alex Auvolat <alex@adnab.me> | 2021-02-23 19:59:43 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-02-23 19:59:43 +0100 |
commit | 28bc967c837c38ba416d9b19fd1ae96cbb292074 (patch) | |
tree | 6d2a7c704776a86e0c92bec4f1417111379993c1 | |
parent | 55156cca9df6b6b9a117f5a7105c8ba6ded34f83 (diff) | |
download | garage-28bc967c837c38ba416d9b19fd1ae96cbb292074.tar.gz garage-28bc967c837c38ba416d9b19fd1ae96cbb292074.zip |
Handle correctly deletion dues to offloading
-rw-r--r-- | src/table/table.rs | 44 | ||||
-rw-r--r-- | src/table/table_sync.rs | 36 |
2 files changed, 38 insertions, 42 deletions
diff --git a/src/table/table.rs b/src/table/table.rs index 31241530..018426c4 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -27,7 +27,7 @@ pub struct Table<F: TableSchema, R: TableReplication> { pub replication: R, pub name: String, - pub rpc_client: Arc<RpcClient<TableRPC<F>>>, + pub(crate) rpc_client: Arc<RpcClient<TableRPC<F>>>, pub system: Arc<System>, pub store: sled::Tree, @@ -35,7 +35,7 @@ pub struct Table<F: TableSchema, R: TableReplication> { } #[derive(Serialize, Deserialize)] -pub enum TableRPC<F: TableSchema> { +pub(crate) enum TableRPC<F: TableSchema> { Ok, ReadEntry(F::P, F::S), @@ -415,9 +415,7 @@ where } self.instance.updated(old_entry, Some(new_entry)).await?; - self.system - .background - .spawn_cancellable(syncer.clone().invalidate(tree_key)); + syncer.invalidate(&tree_key[..]); } } @@ -431,26 +429,26 @@ where Ok(()) } - pub async fn delete_range(&self, begin: &Hash, end: &Hash) -> Result<(), Error> { - let syncer = self.syncer.load_full().unwrap(); - - debug!("({}) Deleting range {:?} - {:?}", self.name, begin, end); - let mut count: usize = 0; - while let Some((key, _value)) = self.store.get_lt(end.as_slice())? { - if key.as_ref() < begin.as_slice() { - break; - } - if let Some(old_val) = self.store.remove(&key)? { - let old_entry = self.decode_entry(&old_val)?; - self.instance.updated(Some(old_entry), None).await?; - self.system - .background - .spawn_cancellable(syncer.clone().invalidate(key.to_vec())); - count += 1; + pub(crate) async fn delete_if_equal( + self: &Arc<Self>, + k: &[u8], + v: &[u8], + ) -> Result<bool, Error> { + let removed = self.store.transaction(|txn| { + if let Some(cur_v) = self.store.get(k)? { + if cur_v == v { + txn.remove(v)?; + return Ok(true); + } } + Ok(false) + })?; + if removed { + let old_entry = self.decode_entry(v)?; + self.instance.updated(Some(old_entry), None).await?; + self.syncer.load_full().unwrap().invalidate(k); } - debug!("({}) {} entries deleted", self.name, count); - Ok(()) + Ok(removed) } fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> { diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs index 1a1b328b..11b1c211 100644 --- a/src/table/table_sync.rs +++ b/src/table/table_sync.rs @@ -29,14 +29,14 @@ pub struct TableSyncer<F: TableSchema, R: TableReplication> { } #[derive(Serialize, Deserialize)] -pub enum SyncRPC { +pub(crate) enum SyncRPC { GetRootChecksumRange(Hash, Hash), RootChecksumRange(SyncRange), Checksums(Vec<RangeChecksum>), Difference(Vec<SyncRange>, Vec<Arc<ByteBuf>>), } -pub struct SyncTodo { +struct SyncTodo { todo: Vec<TodoPartition>, } @@ -60,7 +60,7 @@ struct TodoPartition { // (ranges of level 0 do not exist and their hash is simply the hash of the first item >= begin) // See RangeChecksum for the struct that stores this information. #[derive(Hash, PartialEq, Eq, Debug, Clone, Serialize, Deserialize)] -pub struct SyncRange { +pub(crate) struct SyncRange { begin: Vec<u8>, end: Vec<u8>, level: usize, @@ -81,7 +81,7 @@ impl std::cmp::Ord for SyncRange { } #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct RangeChecksum { +pub(crate) struct RangeChecksum { bounds: SyncRange, children: Vec<(SyncRange, Hash)>, found_limit: Option<Vec<u8>>, @@ -91,7 +91,7 @@ pub struct RangeChecksum { } #[derive(Debug, Clone)] -pub struct RangeChecksumCache { +struct RangeChecksumCache { hash: Option<Hash>, // None if no children found_limit: Option<Vec<u8>>, time: Instant, @@ -102,7 +102,7 @@ where F: TableSchema + 'static, R: TableReplication + 'static, { - pub async fn launch(table: Arc<Table<F, R>>) -> Arc<Self> { + pub(crate) async fn launch(table: Arc<Table<F, R>>) -> Arc<Self> { let todo = SyncTodo { todo: Vec::new() }; let syncer = Arc::new(TableSyncer { table: table.clone(), @@ -348,15 +348,14 @@ where } // All remote nodes have written those items, now we can delete them locally - for (k, v) in items.iter() { - self.table.store.transaction(|tx_db| { - if let Some(curv) = tx_db.get(k)? { - if curv == &v[..] { - tx_db.remove(&k[..])?; - } - } - Ok(()) - })?; + for was_removed in join_all( + items + .iter() + .map(|(k, v)| self.table.delete_if_equal(&k[..], &v[..])), + ) + .await + { + was_removed?; } Ok(()) @@ -642,7 +641,7 @@ where } } - pub async fn handle_rpc( + pub(crate) async fn handle_rpc( self: &Arc<Self>, message: &SyncRPC, mut must_exit: watch::Receiver<bool>, @@ -738,7 +737,7 @@ where Ok(SyncRPC::Difference(ret_ranges, ret_items)) } - pub async fn invalidate(self: Arc<Self>, item_key: Vec<u8>) -> Result<(), Error> { + pub(crate) fn invalidate(self: &Arc<Self>, item_key: &[u8]) { for i in 1..MAX_DEPTH { let needle = SyncRange { begin: item_key.to_vec(), @@ -747,14 +746,13 @@ where }; let mut cache = self.cache[i].lock().unwrap(); if let Some(cache_entry) = cache.range(..=needle).rev().next() { - if cache_entry.0.begin <= item_key && cache_entry.0.end > item_key { + if cache_entry.0.begin[..] <= *item_key && cache_entry.0.end[..] > *item_key { let index = cache_entry.0.clone(); drop(cache_entry); cache.remove(&index); } } } - Ok(()) } } |