diff options
Diffstat (limited to 'src/table/merkle.rs')
-rw-r--r-- | src/table/merkle.rs | 54 |
1 files changed, 29 insertions, 25 deletions
diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 93bf7e47..4b0b44ce 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -4,11 +4,10 @@ use std::time::Duration; use futures::select; use futures_util::future::*; use serde::{Deserialize, Serialize}; -use sled::transaction::{ - ConflictableTransactionError, ConflictableTransactionResult, TransactionalTree, -}; use tokio::sync::watch; +use garage_db as db; + use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; @@ -90,7 +89,8 @@ where async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) { while !*must_exit.borrow() { - if let Some(x) = self.data.merkle_todo.iter().next() { + if let Some(x) = self.data.merkle_todo.iter().unwrap().next() { + // TODO unwrap to remove match x { Ok((key, valhash)) => { if let Err(e) = self.update_item(&key[..], &valhash[..]) { @@ -137,13 +137,18 @@ where }; self.data .merkle_tree + .db() .transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?; - let deleted = self - .data - .merkle_todo - .compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash_by), None)? - .is_ok(); + let deleted = self.data.merkle_todo.db().transaction(|tx| { + let old_val = tx.get(&self.data.merkle_todo, k)?; + if old_val == Some(vhash_by.into()) { + tx.remove(&self.data.merkle_todo, k)?; + tx.commit(true) + } else { + tx.commit(false) + } + })?; if !deleted { debug!( @@ -157,12 +162,12 @@ where fn update_item_rec( &self, - tx: &TransactionalTree, + tx: db::Transaction<'_>, k: &[u8], khash: &Hash, key: &MerkleNodeKey, new_vhash: Option<Hash>, - ) -> ConflictableTransactionResult<Option<Hash>, Error> { + ) -> db::TxResult<Option<Hash>, Error> { let i = key.prefix.len(); // Read node at current position (defined by the prefix stored in key) @@ -203,7 +208,7 @@ where } MerkleNode::Intermediate(_) => Some(MerkleNode::Intermediate(children)), x @ MerkleNode::Leaf(_, _) => { - tx.remove(key_sub.encode())?; + tx.remove(&self.data.merkle_tree, key_sub.encode())?; Some(x) } } @@ -283,28 +288,27 @@ where fn read_node_txn( &self, - tx: &TransactionalTree, + tx: db::Transaction<'_>, k: &MerkleNodeKey, - ) -> ConflictableTransactionResult<MerkleNode, Error> { - let ent = tx.get(k.encode())?; - MerkleNode::decode_opt(ent).map_err(ConflictableTransactionError::Abort) + ) -> db::TxResult<MerkleNode, Error> { + let ent = tx.get(&self.data.merkle_tree, k.encode())?; + MerkleNode::decode_opt(ent).map_err(db::TxError::Abort) } fn put_node_txn( &self, - tx: &TransactionalTree, + tx: db::Transaction<'_>, k: &MerkleNodeKey, v: &MerkleNode, - ) -> ConflictableTransactionResult<Hash, Error> { + ) -> db::TxResult<Hash, Error> { trace!("Put Merkle node: {:?} => {:?}", k, v); if *v == MerkleNode::Empty { - tx.remove(k.encode())?; + tx.remove(&self.data.merkle_tree, k.encode())?; Ok(self.empty_node_hash) } else { - let vby = rmp_to_vec_all_named(v) - .map_err(|e| ConflictableTransactionError::Abort(e.into()))?; + let vby = rmp_to_vec_all_named(v).map_err(|e| db::TxError::Abort(e.into()))?; let rethash = blake2sum(&vby[..]); - tx.insert(k.encode(), vby)?; + tx.insert(&self.data.merkle_tree, k.encode(), vby)?; Ok(rethash) } } @@ -316,11 +320,11 @@ where } pub fn merkle_tree_len(&self) -> usize { - self.data.merkle_tree.len() + self.data.merkle_tree.len().unwrap() // TODO fix unwrap } pub fn todo_len(&self) -> usize { - self.data.merkle_todo.len() + self.data.merkle_todo.len().unwrap() // TODO fix unwrap } } @@ -347,7 +351,7 @@ impl MerkleNodeKey { } impl MerkleNode { - fn decode_opt(ent: Option<sled::IVec>) -> Result<Self, Error> { + fn decode_opt(ent: Option<db::Value<'_>>) -> Result<Self, Error> { match ent { None => Ok(MerkleNode::Empty), Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?), |