diff options
author | Alex Auvolat <alex@adnab.me> | 2020-04-19 20:52:20 +0000 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-04-19 20:52:20 +0000 |
commit | 04acaea231a9af77e5ca05068336f4492fe32ac0 (patch) | |
tree | 412a346fed1a24b861a90c9a3e527e46f050ffa0 /src | |
parent | 5ae32972efaba357ecc0027fe852d710b16b6d0e (diff) | |
download | garage-04acaea231a9af77e5ca05068336f4492fe32ac0.tar.gz garage-04acaea231a9af77e5ca05068336f4492fe32ac0.zip |
Don't do version & block_ref updates in background on deletion
Diffstat (limited to 'src')
-rw-r--r-- | src/block_ref_table.rs | 12 | ||||
-rw-r--r-- | src/bucket_table.rs | 5 | ||||
-rw-r--r-- | src/object_table.rs | 37 | ||||
-rw-r--r-- | src/table.rs | 6 | ||||
-rw-r--r-- | src/version_table.rs | 31 |
5 files changed, 45 insertions, 46 deletions
diff --git a/src/block_ref_table.rs b/src/block_ref_table.rs index 17efa155..6a256aa3 100644 --- a/src/block_ref_table.rs +++ b/src/block_ref_table.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use crate::background::*; use crate::data::*; +use crate::error::Error; use crate::table::*; use crate::block::*; @@ -47,20 +48,17 @@ impl TableSchema for BlockRefTable { type E = BlockRef; type Filter = (); - async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) { + async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> { let block = &old.as_ref().or(new.as_ref()).unwrap().block; let was_before = old.as_ref().map(|x| !x.deleted).unwrap_or(false); let is_after = new.as_ref().map(|x| !x.deleted).unwrap_or(false); if is_after && !was_before { - if let Err(e) = self.block_manager.block_incref(block) { - eprintln!("Failed to incref block {:?}: {}", block, e); - } + self.block_manager.block_incref(block)?; } if was_before && !is_after { - if let Err(e) = self.block_manager.block_decref(block) { - eprintln!("Failed to decref block {:?}: {}", block, e); - } + self.block_manager.block_decref(block)?; } + Ok(()) } fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { diff --git a/src/bucket_table.rs b/src/bucket_table.rs index be7dd348..5604049c 100644 --- a/src/bucket_table.rs +++ b/src/bucket_table.rs @@ -1,6 +1,7 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; +use crate::error::Error; use crate::table::*; #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] @@ -71,7 +72,9 @@ impl TableSchema for BucketTable { type E = Bucket; type Filter = (); - async fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) {} + async fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) -> Result<(), Error> { + Ok(()) + } fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { !entry.deleted diff --git a/src/object_table.rs b/src/object_table.rs index 59ce3b7f..0d0de146 100644 --- a/src/object_table.rs +++ b/src/object_table.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use crate::background::BackgroundRunner; use crate::data::*; +use crate::error::Error; use crate::table::*; use crate::table_sharded::*; @@ -101,30 +102,28 @@ impl TableSchema for ObjectTable { type E = Object; type Filter = (); - async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) { + async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> { let version_table = self.version_table.clone(); if let (Some(old_v), Some(new_v)) = (old, new) { // Propagate deletion of old versions - self.background.spawn(async move { - for v in old_v.versions.iter() { - if new_v - .versions - .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())) - .is_err() - { - let deleted_version = Version { - uuid: v.uuid.clone(), - deleted: true, - blocks: vec![], - bucket: old_v.bucket.clone(), - key: old_v.key.clone(), - }; - version_table.insert(&deleted_version).await?; - } + for v in old_v.versions.iter() { + if new_v + .versions + .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())) + .is_err() + { + let deleted_version = Version { + uuid: v.uuid.clone(), + deleted: true, + blocks: vec![], + bucket: old_v.bucket.clone(), + key: old_v.key.clone(), + }; + version_table.insert(&deleted_version).await?; } - Ok(()) - }); + } } + Ok(()) } fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { diff --git a/src/table.rs b/src/table.rs index 80364d17..d9b505c9 100644 --- a/src/table.rs +++ b/src/table.rs @@ -105,7 +105,7 @@ pub trait TableSchema: Send + Sync { type E: Entry<Self::P, Self::S>; type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; - async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>); + async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error>; fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { true } @@ -469,7 +469,7 @@ where epidemic_propagate.push(new_entry.clone()); } - self.instance.updated(old_entry, Some(new_entry)).await; + self.instance.updated(old_entry, Some(new_entry)).await?; self.system .background .spawn(syncer.clone().invalidate(tree_key)); @@ -497,7 +497,7 @@ where } if let Some(old_val) = self.store.remove(&key)? { let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&old_val)?; - self.instance.updated(Some(old_entry), None).await; + self.instance.updated(Some(old_entry), None).await?; self.system .background .spawn(syncer.clone().invalidate(key.to_vec())); diff --git a/src/version_table.rs b/src/version_table.rs index dfd27812..230b7f1c 100644 --- a/src/version_table.rs +++ b/src/version_table.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use crate::background::BackgroundRunner; use crate::data::*; +use crate::error::Error; use crate::table::*; use crate::table_sharded::*; @@ -67,26 +68,24 @@ impl TableSchema for VersionTable { type E = Version; type Filter = (); - async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) { + async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> { let block_ref_table = self.block_ref_table.clone(); if let (Some(old_v), Some(new_v)) = (old, new) { // Propagate deletion of version blocks - self.background.spawn(async move { - if new_v.deleted && !old_v.deleted { - let deleted_block_refs = old_v - .blocks - .iter() - .map(|vb| BlockRef { - block: vb.hash.clone(), - version: old_v.uuid.clone(), - deleted: true, - }) - .collect::<Vec<_>>(); - block_ref_table.insert_many(&deleted_block_refs[..]).await?; - } - Ok(()) - }); + if new_v.deleted && !old_v.deleted { + let deleted_block_refs = old_v + .blocks + .iter() + .map(|vb| BlockRef { + block: vb.hash.clone(), + version: old_v.uuid.clone(), + deleted: true, + }) + .collect::<Vec<_>>(); + block_ref_table.insert_many(&deleted_block_refs[..]).await?; + } } + Ok(()) } fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { |