diff options
-rw-r--r-- | src/error.rs | 9 | ||||
-rw-r--r-- | src/object_table.rs | 11 | ||||
-rw-r--r-- | src/table.rs | 55 |
3 files changed, 57 insertions, 18 deletions
diff --git a/src/error.rs b/src/error.rs index 9cbbd4f7..8b2fc419 100644 --- a/src/error.rs +++ b/src/error.rs @@ -54,3 +54,12 @@ impl Error { } } } + +impl From<sled::TransactionError<Error>> for Error { + fn from(e: sled::TransactionError<Error>) -> Error { + match e { + sled::TransactionError::Abort(x) => x, + sled::TransactionError::Storage(x) => Error::Sled(x), + } + } +} diff --git a/src/object_table.rs b/src/object_table.rs index 626c00b2..092dddf8 100644 --- a/src/object_table.rs +++ b/src/object_table.rs @@ -47,20 +47,25 @@ impl Entry<String, String> for Object { &self.key } - fn merge(&mut self, other: &Self) { + fn merge(&mut self, other: &Self) -> bool { + let mut has_change = false; + for other_v in other.versions.iter() { match self.versions.binary_search_by(|v| (v.timestamp, &v.uuid).cmp(&(other_v.timestamp, &other_v.uuid))) { Ok(i) => { let mut v = &mut self.versions[i]; if other_v.size > v.size { v.size = other_v.size; + has_change = true; } - if other_v.is_complete { + if other_v.is_complete && !v.is_complete { v.is_complete = true; + has_change = true; } } Err(i) => { self.versions.insert(i, other_v.clone()); + has_change = true; } } } @@ -73,6 +78,8 @@ impl Entry<String, String> for Object { if let Some(last_vi) = last_complete { self.versions = self.versions.drain(last_vi..).collect::<Vec<_>>(); } + + has_change } } diff --git a/src/table.rs b/src/table.rs index 6a72c8bc..6d309967 100644 --- a/src/table.rs +++ b/src/table.rs @@ -76,7 +76,7 @@ pub trait Entry<P: PartitionKey, S: SortKey>: Clone + Serialize + for<'de> Deser fn partition_key(&self) -> &P; fn sort_key(&self) -> &S; - fn merge(&mut self, other: &Self); + fn merge(&mut self, other: &Self) -> bool; } #[derive(Clone, Serialize, Deserialize)] @@ -152,13 +152,17 @@ impl<F: TableFormat + 'static> Table<F> { .await?; let mut ret = None; + let mut not_all_same = false; for resp in resps { if let TableRPC::ReadEntryResponse(value) = resp { if let Some(v) = value { ret = match ret { None => Some(v), Some(mut x) => { - x.merge(&v); + let updated = x.merge(&v); + if updated { + not_all_same = true; + } Some(x) } } @@ -167,6 +171,16 @@ impl<F: TableFormat + 'static> Table<F> { return Err(Error::Message(format!("Invalid return value to read"))); } } + if let Some(ret_entry) = &ret { + if not_all_same { + // Repair on read + let _: Result<_, _> = self.rpc_try_call_many( + &who[..], + &TableRPC::<F>::Update(vec![ret_entry.clone()]), + who.len()) + .await; + } + } Ok(ret) } @@ -221,22 +235,31 @@ impl<F: TableFormat + 'static> Table<F> { } async fn handle_update(&self, mut entries: Vec<F::E>) -> Result<(), Error> { - for mut entry in entries.drain(..) { - let tree_key = self.tree_key(entry.partition_key(), entry.sort_key()); - - let old_val = match self.store.get(&tree_key)? { - Some(prev_bytes) => { - let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&prev_bytes)?; - entry.merge(&old_entry); - Some(old_entry) - } - None => None - }; + for update in entries.drain(..) { + let tree_key = self.tree_key(update.partition_key(), update.sort_key()); + + let (old_entry, new_entry) = self.store.transaction(|db| { + let mut new_entry = update.clone(); + + let old_entry = match db.get(&tree_key)? { + Some(prev_bytes) => { + let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&prev_bytes) + .map_err(Error::RMPDecode) + .map_err(sled::ConflictableTransactionError::Abort)?; + new_entry.merge(&old_entry); + Some(old_entry) + } + None => None + }; - let new_bytes = rmp_to_vec_all_named(&entry)?; - self.store.insert(&tree_key, new_bytes)?; + let new_bytes = rmp_to_vec_all_named(&new_entry) + .map_err(Error::RMPEncode) + .map_err(sled::ConflictableTransactionError::Abort)?; + db.insert(tree_key.clone(), new_bytes)?; + Ok((old_entry, new_entry)) + })?; - self.instance.updated(old_val.as_ref(), &entry).await; + self.instance.updated(old_entry.as_ref(), &new_entry).await; } Ok(()) } |