diff options
Diffstat (limited to 'src/table.rs')
-rw-r--r-- | src/table.rs | 55 |
1 files changed, 39 insertions, 16 deletions
diff --git a/src/table.rs b/src/table.rs index 6a72c8bc..6d309967 100644 --- a/src/table.rs +++ b/src/table.rs @@ -76,7 +76,7 @@ pub trait Entry<P: PartitionKey, S: SortKey>: Clone + Serialize + for<'de> Deser fn partition_key(&self) -> &P; fn sort_key(&self) -> &S; - fn merge(&mut self, other: &Self); + fn merge(&mut self, other: &Self) -> bool; } #[derive(Clone, Serialize, Deserialize)] @@ -152,13 +152,17 @@ impl<F: TableFormat + 'static> Table<F> { .await?; let mut ret = None; + let mut not_all_same = false; for resp in resps { if let TableRPC::ReadEntryResponse(value) = resp { if let Some(v) = value { ret = match ret { None => Some(v), Some(mut x) => { - x.merge(&v); + let updated = x.merge(&v); + if updated { + not_all_same = true; + } Some(x) } } @@ -167,6 +171,16 @@ impl<F: TableFormat + 'static> Table<F> { return Err(Error::Message(format!("Invalid return value to read"))); } } + if let Some(ret_entry) = &ret { + if not_all_same { + // Repair on read + let _: Result<_, _> = self.rpc_try_call_many( + &who[..], + &TableRPC::<F>::Update(vec![ret_entry.clone()]), + who.len()) + .await; + } + } Ok(ret) } @@ -221,22 +235,31 @@ impl<F: TableFormat + 'static> Table<F> { } async fn handle_update(&self, mut entries: Vec<F::E>) -> Result<(), Error> { - for mut entry in entries.drain(..) { - let tree_key = self.tree_key(entry.partition_key(), entry.sort_key()); - - let old_val = match self.store.get(&tree_key)? { - Some(prev_bytes) => { - let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&prev_bytes)?; - entry.merge(&old_entry); - Some(old_entry) - } - None => None - }; + for update in entries.drain(..) { + let tree_key = self.tree_key(update.partition_key(), update.sort_key()); + + let (old_entry, new_entry) = self.store.transaction(|db| { + let mut new_entry = update.clone(); + + let old_entry = match db.get(&tree_key)? { + Some(prev_bytes) => { + let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&prev_bytes) + .map_err(Error::RMPDecode) + .map_err(sled::ConflictableTransactionError::Abort)?; + new_entry.merge(&old_entry); + Some(old_entry) + } + None => None + }; - let new_bytes = rmp_to_vec_all_named(&entry)?; - self.store.insert(&tree_key, new_bytes)?; + let new_bytes = rmp_to_vec_all_named(&new_entry) + .map_err(Error::RMPEncode) + .map_err(sled::ConflictableTransactionError::Abort)?; + db.insert(tree_key.clone(), new_bytes)?; + Ok((old_entry, new_entry)) + })?; - self.instance.updated(old_val.as_ref(), &entry).await; + self.instance.updated(old_entry.as_ref(), &new_entry).await; } Ok(()) } |