From bf25c95fe2fda4ded2e3ca14499e3991e7243532 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 23 Feb 2021 20:25:15 +0100 Subject: Make updated() be a sync function that doesn't fail --- src/table/schema.rs | 10 ++++++---- src/table/table.rs | 6 +++--- src/table/table_sync.rs | 10 ++-------- 3 files changed, 11 insertions(+), 15 deletions(-) (limited to 'src/table') diff --git a/src/table/schema.rs b/src/table/schema.rs index 61deb3c1..edd04000 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -1,8 +1,6 @@ -use async_trait::async_trait; use serde::{Deserialize, Serialize}; use garage_util::data::*; -use garage_util::error::Error; pub trait PartitionKey { fn hash(&self) -> Hash; @@ -45,7 +43,6 @@ pub trait Entry: fn merge(&mut self, other: &Self); } -#[async_trait] pub trait TableSchema: Send + Sync { type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; @@ -58,7 +55,12 @@ pub trait TableSchema: Send + Sync { None } - async fn updated(&self, old: Option, new: Option) -> Result<(), Error>; + // Updated triggers some stuff downstream, but it is not supposed to block or fail, + // as the update itself is an unchangeable fact that will never go back + // due to CRDT logic. Typically errors in propagation of info should be logged + // to stderr. + fn updated(&self, _old: Option, _new: Option) {} + fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool { true } diff --git a/src/table/table.rs b/src/table/table.rs index 018426c4..737ed589 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -414,7 +414,7 @@ where epidemic_propagate.push(new_entry.clone()); } - self.instance.updated(old_entry, Some(new_entry)).await?; + self.instance.updated(old_entry, Some(new_entry)); syncer.invalidate(&tree_key[..]); } } @@ -429,7 +429,7 @@ where Ok(()) } - pub(crate) async fn delete_if_equal( + pub(crate) fn delete_if_equal( self: &Arc, k: &[u8], v: &[u8], @@ -445,7 +445,7 @@ where })?; if removed { let old_entry = self.decode_entry(v)?; - self.instance.updated(Some(old_entry), None).await?; + self.instance.updated(Some(old_entry), None); self.syncer.load_full().unwrap().invalidate(k); } Ok(removed) diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs index 11b1c211..b81dad86 100644 --- a/src/table/table_sync.rs +++ b/src/table/table_sync.rs @@ -348,14 +348,8 @@ where } // All remote nodes have written those items, now we can delete them locally - for was_removed in join_all( - items - .iter() - .map(|(k, v)| self.table.delete_if_equal(&k[..], &v[..])), - ) - .await - { - was_removed?; + for (k, v) in items.iter() { + self.table.delete_if_equal(&k[..], &v[..])?; } Ok(()) -- cgit v1.2.3