diff options
author | Alex Auvolat <alex@adnab.me> | 2021-02-23 19:59:43 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-02-23 19:59:43 +0100 |
commit | 28bc967c837c38ba416d9b19fd1ae96cbb292074 (patch) | |
tree | 6d2a7c704776a86e0c92bec4f1417111379993c1 /src/table/table.rs | |
parent | 55156cca9df6b6b9a117f5a7105c8ba6ded34f83 (diff) | |
download | garage-28bc967c837c38ba416d9b19fd1ae96cbb292074.tar.gz garage-28bc967c837c38ba416d9b19fd1ae96cbb292074.zip |
Handle correctly deletion dues to offloading
Diffstat (limited to 'src/table/table.rs')
-rw-r--r-- | src/table/table.rs | 44 |
1 files changed, 21 insertions, 23 deletions
diff --git a/src/table/table.rs b/src/table/table.rs index 31241530..018426c4 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -27,7 +27,7 @@ pub struct Table<F: TableSchema, R: TableReplication> { pub replication: R, pub name: String, - pub rpc_client: Arc<RpcClient<TableRPC<F>>>, + pub(crate) rpc_client: Arc<RpcClient<TableRPC<F>>>, pub system: Arc<System>, pub store: sled::Tree, @@ -35,7 +35,7 @@ pub struct Table<F: TableSchema, R: TableReplication> { } #[derive(Serialize, Deserialize)] -pub enum TableRPC<F: TableSchema> { +pub(crate) enum TableRPC<F: TableSchema> { Ok, ReadEntry(F::P, F::S), @@ -415,9 +415,7 @@ where } self.instance.updated(old_entry, Some(new_entry)).await?; - self.system - .background - .spawn_cancellable(syncer.clone().invalidate(tree_key)); + syncer.invalidate(&tree_key[..]); } } @@ -431,26 +429,26 @@ where Ok(()) } - pub async fn delete_range(&self, begin: &Hash, end: &Hash) -> Result<(), Error> { - let syncer = self.syncer.load_full().unwrap(); - - debug!("({}) Deleting range {:?} - {:?}", self.name, begin, end); - let mut count: usize = 0; - while let Some((key, _value)) = self.store.get_lt(end.as_slice())? { - if key.as_ref() < begin.as_slice() { - break; - } - if let Some(old_val) = self.store.remove(&key)? { - let old_entry = self.decode_entry(&old_val)?; - self.instance.updated(Some(old_entry), None).await?; - self.system - .background - .spawn_cancellable(syncer.clone().invalidate(key.to_vec())); - count += 1; + pub(crate) async fn delete_if_equal( + self: &Arc<Self>, + k: &[u8], + v: &[u8], + ) -> Result<bool, Error> { + let removed = self.store.transaction(|txn| { + if let Some(cur_v) = self.store.get(k)? { + if cur_v == v { + txn.remove(v)?; + return Ok(true); + } } + Ok(false) + })?; + if removed { + let old_entry = self.decode_entry(v)?; + self.instance.updated(Some(old_entry), None).await?; + self.syncer.load_full().unwrap().invalidate(k); } - debug!("({}) {} entries deleted", self.name, count); - Ok(()) + Ok(removed) } fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> { |