aboutsummaryrefslogtreecommitdiff
path: root/src/table/table.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-02-23 19:59:43 +0100
committerAlex Auvolat <alex@adnab.me>2021-02-23 19:59:43 +0100
commit28bc967c837c38ba416d9b19fd1ae96cbb292074 (patch)
tree6d2a7c704776a86e0c92bec4f1417111379993c1 /src/table/table.rs
parent55156cca9df6b6b9a117f5a7105c8ba6ded34f83 (diff)
downloadgarage-28bc967c837c38ba416d9b19fd1ae96cbb292074.tar.gz
garage-28bc967c837c38ba416d9b19fd1ae96cbb292074.zip
Handle correctly deletion dues to offloading
Diffstat (limited to 'src/table/table.rs')
-rw-r--r--src/table/table.rs44
1 files changed, 21 insertions, 23 deletions
diff --git a/src/table/table.rs b/src/table/table.rs
index 31241530..018426c4 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -27,7 +27,7 @@ pub struct Table<F: TableSchema, R: TableReplication> {
pub replication: R,
pub name: String,
- pub rpc_client: Arc<RpcClient<TableRPC<F>>>,
+ pub(crate) rpc_client: Arc<RpcClient<TableRPC<F>>>,
pub system: Arc<System>,
pub store: sled::Tree,
@@ -35,7 +35,7 @@ pub struct Table<F: TableSchema, R: TableReplication> {
}
#[derive(Serialize, Deserialize)]
-pub enum TableRPC<F: TableSchema> {
+pub(crate) enum TableRPC<F: TableSchema> {
Ok,
ReadEntry(F::P, F::S),
@@ -415,9 +415,7 @@ where
}
self.instance.updated(old_entry, Some(new_entry)).await?;
- self.system
- .background
- .spawn_cancellable(syncer.clone().invalidate(tree_key));
+ syncer.invalidate(&tree_key[..]);
}
}
@@ -431,26 +429,26 @@ where
Ok(())
}
- pub async fn delete_range(&self, begin: &Hash, end: &Hash) -> Result<(), Error> {
- let syncer = self.syncer.load_full().unwrap();
-
- debug!("({}) Deleting range {:?} - {:?}", self.name, begin, end);
- let mut count: usize = 0;
- while let Some((key, _value)) = self.store.get_lt(end.as_slice())? {
- if key.as_ref() < begin.as_slice() {
- break;
- }
- if let Some(old_val) = self.store.remove(&key)? {
- let old_entry = self.decode_entry(&old_val)?;
- self.instance.updated(Some(old_entry), None).await?;
- self.system
- .background
- .spawn_cancellable(syncer.clone().invalidate(key.to_vec()));
- count += 1;
+ pub(crate) async fn delete_if_equal(
+ self: &Arc<Self>,
+ k: &[u8],
+ v: &[u8],
+ ) -> Result<bool, Error> {
+ let removed = self.store.transaction(|txn| {
+ if let Some(cur_v) = self.store.get(k)? {
+ if cur_v == v {
+ txn.remove(v)?;
+ return Ok(true);
+ }
}
+ Ok(false)
+ })?;
+ if removed {
+ let old_entry = self.decode_entry(v)?;
+ self.instance.updated(Some(old_entry), None).await?;
+ self.syncer.load_full().unwrap().invalidate(k);
}
- debug!("({}) {} entries deleted", self.name, count);
- Ok(())
+ Ok(removed)
}
fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> {