aboutsummaryrefslogtreecommitdiff
path: root/src/table.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-09 20:58:39 +0200
committerAlex Auvolat <alex@adnab.me>2020-04-09 20:58:39 +0200
commita3eb88e6013e70238e7ddd66b4644f138b3d1b93 (patch)
tree850f76d80d1a609d699040e651e569645604e6ab /src/table.rs
parent1d786c2c663ac6f6e3e3ef52accd6e9eca049988 (diff)
downloadgarage-a3eb88e6013e70238e7ddd66b4644f138b3d1b93.tar.gz
garage-a3eb88e6013e70238e7ddd66b4644f138b3d1b93.zip
Locally, transactions
Diffstat (limited to 'src/table.rs')
-rw-r--r--src/table.rs55
1 files changed, 39 insertions, 16 deletions
diff --git a/src/table.rs b/src/table.rs
index 6a72c8bc..6d309967 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -76,7 +76,7 @@ pub trait Entry<P: PartitionKey, S: SortKey>: Clone + Serialize + for<'de> Deser
fn partition_key(&self) -> &P;
fn sort_key(&self) -> &S;
- fn merge(&mut self, other: &Self);
+ fn merge(&mut self, other: &Self) -> bool;
}
#[derive(Clone, Serialize, Deserialize)]
@@ -152,13 +152,17 @@ impl<F: TableFormat + 'static> Table<F> {
.await?;
let mut ret = None;
+ let mut not_all_same = false;
for resp in resps {
if let TableRPC::ReadEntryResponse(value) = resp {
if let Some(v) = value {
ret = match ret {
None => Some(v),
Some(mut x) => {
- x.merge(&v);
+ let updated = x.merge(&v);
+ if updated {
+ not_all_same = true;
+ }
Some(x)
}
}
@@ -167,6 +171,16 @@ impl<F: TableFormat + 'static> Table<F> {
return Err(Error::Message(format!("Invalid return value to read")));
}
}
+ if let Some(ret_entry) = &ret {
+ if not_all_same {
+ // Repair on read
+ let _: Result<_, _> = self.rpc_try_call_many(
+ &who[..],
+ &TableRPC::<F>::Update(vec![ret_entry.clone()]),
+ who.len())
+ .await;
+ }
+ }
Ok(ret)
}
@@ -221,22 +235,31 @@ impl<F: TableFormat + 'static> Table<F> {
}
async fn handle_update(&self, mut entries: Vec<F::E>) -> Result<(), Error> {
- for mut entry in entries.drain(..) {
- let tree_key = self.tree_key(entry.partition_key(), entry.sort_key());
-
- let old_val = match self.store.get(&tree_key)? {
- Some(prev_bytes) => {
- let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&prev_bytes)?;
- entry.merge(&old_entry);
- Some(old_entry)
- }
- None => None
- };
+ for update in entries.drain(..) {
+ let tree_key = self.tree_key(update.partition_key(), update.sort_key());
+
+ let (old_entry, new_entry) = self.store.transaction(|db| {
+ let mut new_entry = update.clone();
+
+ let old_entry = match db.get(&tree_key)? {
+ Some(prev_bytes) => {
+ let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&prev_bytes)
+ .map_err(Error::RMPDecode)
+ .map_err(sled::ConflictableTransactionError::Abort)?;
+ new_entry.merge(&old_entry);
+ Some(old_entry)
+ }
+ None => None
+ };
- let new_bytes = rmp_to_vec_all_named(&entry)?;
- self.store.insert(&tree_key, new_bytes)?;
+ let new_bytes = rmp_to_vec_all_named(&new_entry)
+ .map_err(Error::RMPEncode)
+ .map_err(sled::ConflictableTransactionError::Abort)?;
+ db.insert(tree_key.clone(), new_bytes)?;
+ Ok((old_entry, new_entry))
+ })?;
- self.instance.updated(old_val.as_ref(), &entry).await;
+ self.instance.updated(old_entry.as_ref(), &new_entry).await;
}
Ok(())
}