diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/model/block_ref_table.rs | 14 | ||||
-rw-r--r-- | src/model/bucket_table.rs | 7 | ||||
-rw-r--r-- | src/model/key_table.rs | 8 | ||||
-rw-r--r-- | src/model/object_table.rs | 53 | ||||
-rw-r--r-- | src/model/version_table.rs | 37 | ||||
-rw-r--r-- | src/table/schema.rs | 10 | ||||
-rw-r--r-- | src/table/table.rs | 6 | ||||
-rw-r--r-- | src/table/table_sync.rs | 10 |
8 files changed, 62 insertions, 83 deletions
diff --git a/src/model/block_ref_table.rs b/src/model/block_ref_table.rs index 5a7d9aa1..9ab67737 100644 --- a/src/model/block_ref_table.rs +++ b/src/model/block_ref_table.rs @@ -1,10 +1,8 @@ -use async_trait::async_trait; use serde::{Deserialize, Serialize}; use std::sync::Arc; use garage_util::background::*; use garage_util::data::*; -use garage_util::error::Error; use garage_table::*; @@ -42,24 +40,26 @@ pub struct BlockRefTable { pub block_manager: Arc<BlockManager>, } -#[async_trait] impl TableSchema for BlockRefTable { type P = Hash; type S = UUID; type E = BlockRef; type Filter = DeletedFilter; - async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> { + fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) { let block = &old.as_ref().or(new.as_ref()).unwrap().block; let was_before = old.as_ref().map(|x| !x.deleted).unwrap_or(false); let is_after = new.as_ref().map(|x| !x.deleted).unwrap_or(false); if is_after && !was_before { - self.block_manager.block_incref(block)?; + if let Err(e) = self.block_manager.block_incref(block) { + warn!("block_incref failed for block {:?}: {}", block, e); + } } if was_before && !is_after { - self.block_manager.block_decref(block)?; + if let Err(e) = self.block_manager.block_decref(block) { + warn!("block_decref failed for block {:?}: {}", block, e); + } } - Ok(()) } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index a101555f..af42d551 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -1,10 +1,8 @@ -use async_trait::async_trait; use serde::{Deserialize, Serialize}; use garage_table::crdt::CRDT; use garage_table::*; -use garage_util::error::Error; use crate::key_table::PermissionSet; @@ -100,17 +98,12 @@ impl Entry<EmptyKey, String> for Bucket { pub struct BucketTable; -#[async_trait] impl TableSchema for BucketTable { type P = EmptyKey; type S = String; type E = Bucket; type Filter = DeletedFilter; - async fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) -> Result<(), Error> { - Ok(()) - } - fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { filter.apply(entry.is_deleted()) } diff --git a/src/model/key_table.rs b/src/model/key_table.rs index 20da3cc6..5942df75 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -1,11 +1,8 @@ -use async_trait::async_trait; use serde::{Deserialize, Serialize}; use garage_table::crdt::CRDT; use garage_table::*; -use garage_util::error::Error; - use model010::key_table as prev; #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] @@ -92,17 +89,12 @@ impl Entry<EmptyKey, String> for Key { pub struct KeyTable; -#[async_trait] impl TableSchema for KeyTable { type P = EmptyKey; type S = String; type E = Key; type Filter = DeletedFilter; - async fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) -> Result<(), Error> { - Ok(()) - } - fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { filter.apply(entry.deleted.get()) } diff --git a/src/model/object_table.rs b/src/model/object_table.rs index 929b63f0..16cce72c 100644 --- a/src/model/object_table.rs +++ b/src/model/object_table.rs @@ -1,11 +1,9 @@ -use async_trait::async_trait; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::sync::Arc; use garage_util::background::BackgroundRunner; use garage_util::data::*; -use garage_util::error::Error; use garage_table::table_sharded::*; use garage_table::*; @@ -191,41 +189,42 @@ pub struct ObjectTable { pub version_table: Arc<Table<VersionTable, TableShardedReplication>>, } -#[async_trait] impl TableSchema for ObjectTable { type P = String; type S = String; type E = Object; type Filter = DeletedFilter; - async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> { + fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) { let version_table = self.version_table.clone(); - if let (Some(old_v), Some(new_v)) = (old, new) { - // Propagate deletion of old versions - for v in old_v.versions.iter() { - let newly_deleted = match new_v - .versions - .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())) - { - Err(_) => true, - Ok(i) => { - new_v.versions[i].state == ObjectVersionState::Aborted - && v.state != ObjectVersionState::Aborted + self.background.spawn(async move { + if let (Some(old_v), Some(new_v)) = (old, new) { + // Propagate deletion of old versions + for v in old_v.versions.iter() { + let newly_deleted = match new_v + .versions + .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())) + { + Err(_) => true, + Ok(i) => { + new_v.versions[i].state == ObjectVersionState::Aborted + && v.state != ObjectVersionState::Aborted + } + }; + if newly_deleted { + let deleted_version = Version::new( + v.uuid, + old_v.bucket.clone(), + old_v.key.clone(), + true, + vec![], + ); + version_table.insert(&deleted_version).await?; } - }; - if newly_deleted { - let deleted_version = Version::new( - v.uuid, - old_v.bucket.clone(), - old_v.key.clone(), - true, - vec![], - ); - version_table.insert(&deleted_version).await?; } } - } - Ok(()) + Ok(()) + }) } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { diff --git a/src/model/version_table.rs b/src/model/version_table.rs index 0d831998..cf9fbe98 100644 --- a/src/model/version_table.rs +++ b/src/model/version_table.rs @@ -1,10 +1,8 @@ -use async_trait::async_trait; use serde::{Deserialize, Serialize}; use std::sync::Arc; use garage_util::background::BackgroundRunner; use garage_util::data::*; -use garage_util::error::Error; use garage_table::table_sharded::*; use garage_table::*; @@ -112,31 +110,32 @@ pub struct VersionTable { pub block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>, } -#[async_trait] impl TableSchema for VersionTable { type P = Hash; type S = EmptyKey; type E = Version; type Filter = DeletedFilter; - async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error> { + fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) { let block_ref_table = self.block_ref_table.clone(); - if let (Some(old_v), Some(new_v)) = (old, new) { - // Propagate deletion of version blocks - if new_v.deleted && !old_v.deleted { - let deleted_block_refs = old_v - .blocks - .iter() - .map(|vb| BlockRef { - block: vb.hash, - version: old_v.uuid, - deleted: true, - }) - .collect::<Vec<_>>(); - block_ref_table.insert_many(&deleted_block_refs[..]).await?; + self.background.spawn(async move { + if let (Some(old_v), Some(new_v)) = (old, new) { + // Propagate deletion of version blocks + if new_v.deleted && !old_v.deleted { + let deleted_block_refs = old_v + .blocks + .iter() + .map(|vb| BlockRef { + block: vb.hash, + version: old_v.uuid, + deleted: true, + }) + .collect::<Vec<_>>(); + block_ref_table.insert_many(&deleted_block_refs[..]).await?; + } } - } - Ok(()) + Ok(()) + }) } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { diff --git a/src/table/schema.rs b/src/table/schema.rs index 61deb3c1..edd04000 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -1,8 +1,6 @@ -use async_trait::async_trait; use serde::{Deserialize, Serialize}; use garage_util::data::*; -use garage_util::error::Error; pub trait PartitionKey { fn hash(&self) -> Hash; @@ -45,7 +43,6 @@ pub trait Entry<P: PartitionKey, S: SortKey>: fn merge(&mut self, other: &Self); } -#[async_trait] pub trait TableSchema: Send + Sync { type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; @@ -58,7 +55,12 @@ pub trait TableSchema: Send + Sync { None } - async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error>; + // Updated triggers some stuff downstream, but it is not supposed to block or fail, + // as the update itself is an unchangeable fact that will never go back + // due to CRDT logic. Typically errors in propagation of info should be logged + // to stderr. + fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) {} + fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { true } diff --git a/src/table/table.rs b/src/table/table.rs index 018426c4..737ed589 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -414,7 +414,7 @@ where epidemic_propagate.push(new_entry.clone()); } - self.instance.updated(old_entry, Some(new_entry)).await?; + self.instance.updated(old_entry, Some(new_entry)); syncer.invalidate(&tree_key[..]); } } @@ -429,7 +429,7 @@ where Ok(()) } - pub(crate) async fn delete_if_equal( + pub(crate) fn delete_if_equal( self: &Arc<Self>, k: &[u8], v: &[u8], @@ -445,7 +445,7 @@ where })?; if removed { let old_entry = self.decode_entry(v)?; - self.instance.updated(Some(old_entry), None).await?; + self.instance.updated(Some(old_entry), None); self.syncer.load_full().unwrap().invalidate(k); } Ok(removed) diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs index 11b1c211..b81dad86 100644 --- a/src/table/table_sync.rs +++ b/src/table/table_sync.rs @@ -348,14 +348,8 @@ where } // All remote nodes have written those items, now we can delete them locally - for was_removed in join_all( - items - .iter() - .map(|(k, v)| self.table.delete_if_equal(&k[..], &v[..])), - ) - .await - { - was_removed?; + for (k, v) in items.iter() { + self.table.delete_if_equal(&k[..], &v[..])?; } Ok(()) |