aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-02-23 20:25:15 +0100
committerAlex Auvolat <alex@adnab.me>2021-02-23 20:25:15 +0100
commitbf25c95fe2fda4ded2e3ca14499e3991e7243532 (patch)
tree889ced32b11186e69caba5af740b26de3e3b0659 /src/table
parent28bc967c837c38ba416d9b19fd1ae96cbb292074 (diff)
downloadgarage-bf25c95fe2fda4ded2e3ca14499e3991e7243532.tar.gz
garage-bf25c95fe2fda4ded2e3ca14499e3991e7243532.zip
Make updated() be a sync function that doesn't fail
Diffstat (limited to 'src/table')
-rw-r--r--src/table/schema.rs10
-rw-r--r--src/table/table.rs6
-rw-r--r--src/table/table_sync.rs10
3 files changed, 11 insertions, 15 deletions
diff --git a/src/table/schema.rs b/src/table/schema.rs
index 61deb3c1..edd04000 100644
--- a/src/table/schema.rs
+++ b/src/table/schema.rs
@@ -1,8 +1,6 @@
-use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use garage_util::data::*;
-use garage_util::error::Error;
pub trait PartitionKey {
fn hash(&self) -> Hash;
@@ -45,7 +43,6 @@ pub trait Entry<P: PartitionKey, S: SortKey>:
fn merge(&mut self, other: &Self);
}
-#[async_trait]
pub trait TableSchema: Send + Sync {
type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
@@ -58,7 +55,12 @@ pub trait TableSchema: Send + Sync {
None
}
- async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error>;
+ // Updated triggers some stuff downstream, but it is not supposed to block or fail,
+ // as the update itself is an unchangeable fact that will never go back
+ // due to CRDT logic. Typically errors in propagation of info should be logged
+ // to stderr.
+ fn updated(&self, _old: Option<Self::E>, _new: Option<Self::E>) {}
+
fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool {
true
}
diff --git a/src/table/table.rs b/src/table/table.rs
index 018426c4..737ed589 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -414,7 +414,7 @@ where
epidemic_propagate.push(new_entry.clone());
}
- self.instance.updated(old_entry, Some(new_entry)).await?;
+ self.instance.updated(old_entry, Some(new_entry));
syncer.invalidate(&tree_key[..]);
}
}
@@ -429,7 +429,7 @@ where
Ok(())
}
- pub(crate) async fn delete_if_equal(
+ pub(crate) fn delete_if_equal(
self: &Arc<Self>,
k: &[u8],
v: &[u8],
@@ -445,7 +445,7 @@ where
})?;
if removed {
let old_entry = self.decode_entry(v)?;
- self.instance.updated(Some(old_entry), None).await?;
+ self.instance.updated(Some(old_entry), None);
self.syncer.load_full().unwrap().invalidate(k);
}
Ok(removed)
diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs
index 11b1c211..b81dad86 100644
--- a/src/table/table_sync.rs
+++ b/src/table/table_sync.rs
@@ -348,14 +348,8 @@ where
}
// All remote nodes have written those items, now we can delete them locally
- for was_removed in join_all(
- items
- .iter()
- .map(|(k, v)| self.table.delete_if_equal(&k[..], &v[..])),
- )
- .await
- {
- was_removed?;
+ for (k, v) in items.iter() {
+ self.table.delete_if_equal(&k[..], &v[..])?;
}
Ok(())