aboutsummaryrefslogtreecommitdiff
path: root/src/table/merkle.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/merkle.rs')
-rw-r--r--src/table/merkle.rs54
1 files changed, 29 insertions, 25 deletions
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index 93bf7e47..4b0b44ce 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -4,11 +4,10 @@ use std::time::Duration;
use futures::select;
use futures_util::future::*;
use serde::{Deserialize, Serialize};
-use sled::transaction::{
- ConflictableTransactionError, ConflictableTransactionResult, TransactionalTree,
-};
use tokio::sync::watch;
+use garage_db as db;
+
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
@@ -90,7 +89,8 @@ where
async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
while !*must_exit.borrow() {
- if let Some(x) = self.data.merkle_todo.iter().next() {
+ if let Some(x) = self.data.merkle_todo.iter().unwrap().next() {
+ // TODO unwrap to remove
match x {
Ok((key, valhash)) => {
if let Err(e) = self.update_item(&key[..], &valhash[..]) {
@@ -137,13 +137,18 @@ where
};
self.data
.merkle_tree
+ .db()
.transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?;
- let deleted = self
- .data
- .merkle_todo
- .compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash_by), None)?
- .is_ok();
+ let deleted = self.data.merkle_todo.db().transaction(|tx| {
+ let old_val = tx.get(&self.data.merkle_todo, k)?;
+ if old_val == Some(vhash_by.into()) {
+ tx.remove(&self.data.merkle_todo, k)?;
+ tx.commit(true)
+ } else {
+ tx.commit(false)
+ }
+ })?;
if !deleted {
debug!(
@@ -157,12 +162,12 @@ where
fn update_item_rec(
&self,
- tx: &TransactionalTree,
+ tx: db::Transaction<'_>,
k: &[u8],
khash: &Hash,
key: &MerkleNodeKey,
new_vhash: Option<Hash>,
- ) -> ConflictableTransactionResult<Option<Hash>, Error> {
+ ) -> db::TxResult<Option<Hash>, Error> {
let i = key.prefix.len();
// Read node at current position (defined by the prefix stored in key)
@@ -203,7 +208,7 @@ where
}
MerkleNode::Intermediate(_) => Some(MerkleNode::Intermediate(children)),
x @ MerkleNode::Leaf(_, _) => {
- tx.remove(key_sub.encode())?;
+ tx.remove(&self.data.merkle_tree, key_sub.encode())?;
Some(x)
}
}
@@ -283,28 +288,27 @@ where
fn read_node_txn(
&self,
- tx: &TransactionalTree,
+ tx: db::Transaction<'_>,
k: &MerkleNodeKey,
- ) -> ConflictableTransactionResult<MerkleNode, Error> {
- let ent = tx.get(k.encode())?;
- MerkleNode::decode_opt(ent).map_err(ConflictableTransactionError::Abort)
+ ) -> db::TxResult<MerkleNode, Error> {
+ let ent = tx.get(&self.data.merkle_tree, k.encode())?;
+ MerkleNode::decode_opt(ent).map_err(db::TxError::Abort)
}
fn put_node_txn(
&self,
- tx: &TransactionalTree,
+ tx: db::Transaction<'_>,
k: &MerkleNodeKey,
v: &MerkleNode,
- ) -> ConflictableTransactionResult<Hash, Error> {
+ ) -> db::TxResult<Hash, Error> {
trace!("Put Merkle node: {:?} => {:?}", k, v);
if *v == MerkleNode::Empty {
- tx.remove(k.encode())?;
+ tx.remove(&self.data.merkle_tree, k.encode())?;
Ok(self.empty_node_hash)
} else {
- let vby = rmp_to_vec_all_named(v)
- .map_err(|e| ConflictableTransactionError::Abort(e.into()))?;
+ let vby = rmp_to_vec_all_named(v).map_err(|e| db::TxError::Abort(e.into()))?;
let rethash = blake2sum(&vby[..]);
- tx.insert(k.encode(), vby)?;
+ tx.insert(&self.data.merkle_tree, k.encode(), vby)?;
Ok(rethash)
}
}
@@ -316,11 +320,11 @@ where
}
pub fn merkle_tree_len(&self) -> usize {
- self.data.merkle_tree.len()
+ self.data.merkle_tree.len().unwrap() // TODO fix unwrap
}
pub fn todo_len(&self) -> usize {
- self.data.merkle_todo.len()
+ self.data.merkle_todo.len().unwrap() // TODO fix unwrap
}
}
@@ -347,7 +351,7 @@ impl MerkleNodeKey {
}
impl MerkleNode {
- fn decode_opt(ent: Option<sled::IVec>) -> Result<Self, Error> {
+ fn decode_opt(ent: Option<db::Value<'_>>) -> Result<Self, Error> {
match ent {
None => Ok(MerkleNode::Empty),
Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?),