aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/error.rs9
-rw-r--r--src/object_table.rs11
-rw-r--r--src/table.rs55
3 files changed, 57 insertions, 18 deletions
diff --git a/src/error.rs b/src/error.rs
index 9cbbd4f7..8b2fc419 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -54,3 +54,12 @@ impl Error {
}
}
}
+
+impl From<sled::TransactionError<Error>> for Error {
+ fn from(e: sled::TransactionError<Error>) -> Error {
+ match e {
+ sled::TransactionError::Abort(x) => x,
+ sled::TransactionError::Storage(x) => Error::Sled(x),
+ }
+ }
+}
diff --git a/src/object_table.rs b/src/object_table.rs
index 626c00b2..092dddf8 100644
--- a/src/object_table.rs
+++ b/src/object_table.rs
@@ -47,20 +47,25 @@ impl Entry<String, String> for Object {
&self.key
}
- fn merge(&mut self, other: &Self) {
+ fn merge(&mut self, other: &Self) -> bool {
+ let mut has_change = false;
+
for other_v in other.versions.iter() {
match self.versions.binary_search_by(|v| (v.timestamp, &v.uuid).cmp(&(other_v.timestamp, &other_v.uuid))) {
Ok(i) => {
let mut v = &mut self.versions[i];
if other_v.size > v.size {
v.size = other_v.size;
+ has_change = true;
}
- if other_v.is_complete {
+ if other_v.is_complete && !v.is_complete {
v.is_complete = true;
+ has_change = true;
}
}
Err(i) => {
self.versions.insert(i, other_v.clone());
+ has_change = true;
}
}
}
@@ -73,6 +78,8 @@ impl Entry<String, String> for Object {
if let Some(last_vi) = last_complete {
self.versions = self.versions.drain(last_vi..).collect::<Vec<_>>();
}
+
+ has_change
}
}
diff --git a/src/table.rs b/src/table.rs
index 6a72c8bc..6d309967 100644
--- a/src/table.rs
+++ b/src/table.rs
@@ -76,7 +76,7 @@ pub trait Entry<P: PartitionKey, S: SortKey>: Clone + Serialize + for<'de> Deser
fn partition_key(&self) -> &P;
fn sort_key(&self) -> &S;
- fn merge(&mut self, other: &Self);
+ fn merge(&mut self, other: &Self) -> bool;
}
#[derive(Clone, Serialize, Deserialize)]
@@ -152,13 +152,17 @@ impl<F: TableFormat + 'static> Table<F> {
.await?;
let mut ret = None;
+ let mut not_all_same = false;
for resp in resps {
if let TableRPC::ReadEntryResponse(value) = resp {
if let Some(v) = value {
ret = match ret {
None => Some(v),
Some(mut x) => {
- x.merge(&v);
+ let updated = x.merge(&v);
+ if updated {
+ not_all_same = true;
+ }
Some(x)
}
}
@@ -167,6 +171,16 @@ impl<F: TableFormat + 'static> Table<F> {
return Err(Error::Message(format!("Invalid return value to read")));
}
}
+ if let Some(ret_entry) = &ret {
+ if not_all_same {
+ // Repair on read
+ let _: Result<_, _> = self.rpc_try_call_many(
+ &who[..],
+ &TableRPC::<F>::Update(vec![ret_entry.clone()]),
+ who.len())
+ .await;
+ }
+ }
Ok(ret)
}
@@ -221,22 +235,31 @@ impl<F: TableFormat + 'static> Table<F> {
}
async fn handle_update(&self, mut entries: Vec<F::E>) -> Result<(), Error> {
- for mut entry in entries.drain(..) {
- let tree_key = self.tree_key(entry.partition_key(), entry.sort_key());
-
- let old_val = match self.store.get(&tree_key)? {
- Some(prev_bytes) => {
- let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&prev_bytes)?;
- entry.merge(&old_entry);
- Some(old_entry)
- }
- None => None
- };
+ for update in entries.drain(..) {
+ let tree_key = self.tree_key(update.partition_key(), update.sort_key());
+
+ let (old_entry, new_entry) = self.store.transaction(|db| {
+ let mut new_entry = update.clone();
+
+ let old_entry = match db.get(&tree_key)? {
+ Some(prev_bytes) => {
+ let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&prev_bytes)
+ .map_err(Error::RMPDecode)
+ .map_err(sled::ConflictableTransactionError::Abort)?;
+ new_entry.merge(&old_entry);
+ Some(old_entry)
+ }
+ None => None
+ };
- let new_bytes = rmp_to_vec_all_named(&entry)?;
- self.store.insert(&tree_key, new_bytes)?;
+ let new_bytes = rmp_to_vec_all_named(&new_entry)
+ .map_err(Error::RMPEncode)
+ .map_err(sled::ConflictableTransactionError::Abort)?;
+ db.insert(tree_key.clone(), new_bytes)?;
+ Ok((old_entry, new_entry))
+ })?;
- self.instance.updated(old_val.as_ref(), &entry).await;
+ self.instance.updated(old_entry.as_ref(), &new_entry).await;
}
Ok(())
}