diff options
Diffstat (limited to 'src/table/merkle.rs')
-rw-r--r-- | src/table/merkle.rs | 150 |
1 files changed, 86 insertions, 64 deletions
diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 93bf7e47..a5c29723 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -1,15 +1,13 @@ use std::sync::Arc; use std::time::Duration; -use futures::select; -use futures_util::future::*; +use async_trait::async_trait; use serde::{Deserialize, Serialize}; -use sled::transaction::{ - ConflictableTransactionError, ConflictableTransactionResult, TransactionalTree, -}; use tokio::sync::watch; -use garage_util::background::BackgroundRunner; +use garage_db as db; + +use garage_util::background::*; use garage_util::data::*; use garage_util::error::Error; @@ -79,43 +77,17 @@ where empty_node_hash, }); - let ret2 = ret.clone(); - background.spawn_worker( - format!("Merkle tree updater for {}", F::TABLE_NAME), - |must_exit: watch::Receiver<bool>| ret2.updater_loop(must_exit), - ); + background.spawn_worker(MerkleWorker(ret.clone())); ret } - async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) { - while !*must_exit.borrow() { - if let Some(x) = self.data.merkle_todo.iter().next() { - match x { - Ok((key, valhash)) => { - if let Err(e) = self.update_item(&key[..], &valhash[..]) { - warn!( - "({}) Error while updating Merkle tree item: {}", - F::TABLE_NAME, - e - ); - } - } - Err(e) => { - warn!( - "({}) Error while iterating on Merkle todo tree: {}", - F::TABLE_NAME, - e - ); - tokio::time::sleep(Duration::from_secs(10)).await; - } - } - } else { - select! { - _ = self.data.merkle_todo_notify.notified().fuse() => {}, - _ = must_exit.changed().fuse() => {}, - } - } + fn updater_loop_iter(&self) -> Result<WorkerState, Error> { + if let Some((key, valhash)) = self.data.merkle_todo.first()? { + self.update_item(&key, &valhash)?; + Ok(WorkerState::Busy) + } else { + Ok(WorkerState::Idle) } } @@ -137,13 +109,16 @@ where }; self.data .merkle_tree - .transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?; + .db() + .transaction(|mut tx| self.update_item_rec(&mut tx, k, &khash, &key, new_vhash))?; - let deleted = self - .data - .merkle_todo - .compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash_by), None)? - .is_ok(); + let deleted = self.data.merkle_todo.db().transaction(|mut tx| { + let remove = matches!(tx.get(&self.data.merkle_todo, k)?, Some(ov) if ov == vhash_by); + if remove { + tx.remove(&self.data.merkle_todo, k)?; + } + Ok(remove) + })?; if !deleted { debug!( @@ -157,12 +132,12 @@ where fn update_item_rec( &self, - tx: &TransactionalTree, + tx: &mut db::Transaction<'_>, k: &[u8], khash: &Hash, key: &MerkleNodeKey, new_vhash: Option<Hash>, - ) -> ConflictableTransactionResult<Option<Hash>, Error> { + ) -> db::TxResult<Option<Hash>, Error> { let i = key.prefix.len(); // Read node at current position (defined by the prefix stored in key) @@ -203,7 +178,7 @@ where } MerkleNode::Intermediate(_) => Some(MerkleNode::Intermediate(children)), x @ MerkleNode::Leaf(_, _) => { - tx.remove(key_sub.encode())?; + tx.remove(&self.data.merkle_tree, key_sub.encode())?; Some(x) } } @@ -283,28 +258,27 @@ where fn read_node_txn( &self, - tx: &TransactionalTree, + tx: &mut db::Transaction<'_>, k: &MerkleNodeKey, - ) -> ConflictableTransactionResult<MerkleNode, Error> { - let ent = tx.get(k.encode())?; - MerkleNode::decode_opt(ent).map_err(ConflictableTransactionError::Abort) + ) -> db::TxResult<MerkleNode, Error> { + let ent = tx.get(&self.data.merkle_tree, k.encode())?; + MerkleNode::decode_opt(&ent).map_err(db::TxError::Abort) } fn put_node_txn( &self, - tx: &TransactionalTree, + tx: &mut db::Transaction<'_>, k: &MerkleNodeKey, v: &MerkleNode, - ) -> ConflictableTransactionResult<Hash, Error> { + ) -> db::TxResult<Hash, Error> { trace!("Put Merkle node: {:?} => {:?}", k, v); if *v == MerkleNode::Empty { - tx.remove(k.encode())?; + tx.remove(&self.data.merkle_tree, k.encode())?; Ok(self.empty_node_hash) } else { - let vby = rmp_to_vec_all_named(v) - .map_err(|e| ConflictableTransactionError::Abort(e.into()))?; + let vby = rmp_to_vec_all_named(v).map_err(|e| db::TxError::Abort(e.into()))?; let rethash = blake2sum(&vby[..]); - tx.insert(k.encode(), vby)?; + tx.insert(&self.data.merkle_tree, k.encode(), vby)?; Ok(rethash) } } @@ -312,15 +286,63 @@ where // Access a node in the Merkle tree, used by the sync protocol pub(crate) fn read_node(&self, k: &MerkleNodeKey) -> Result<MerkleNode, Error> { let ent = self.data.merkle_tree.get(k.encode())?; - MerkleNode::decode_opt(ent) + MerkleNode::decode_opt(&ent) + } + + pub fn merkle_tree_len(&self) -> Result<usize, Error> { + Ok(self.data.merkle_tree.len()?) } - pub fn merkle_tree_len(&self) -> usize { - self.data.merkle_tree.len() + pub fn todo_len(&self) -> Result<usize, Error> { + Ok(self.data.merkle_todo.len()?) } +} + +struct MerkleWorker<F, R>(Arc<MerkleUpdater<F, R>>) +where + F: TableSchema + 'static, + R: TableReplication + 'static; - pub fn todo_len(&self) -> usize { - self.data.merkle_todo.len() +#[async_trait] +impl<F, R> Worker for MerkleWorker<F, R> +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + fn name(&self) -> String { + format!("{} Merkle tree updater", F::TABLE_NAME) + } + + fn info(&self) -> Option<String> { + let l = self.0.todo_len().unwrap_or(0); + if l > 0 { + Some(format!("{} items in queue", l)) + } else { + None + } + } + + async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { + let updater = self.0.clone(); + tokio::task::spawn_blocking(move || { + for _i in 0..100 { + let s = updater.updater_loop_iter(); + if !matches!(s, Ok(WorkerState::Busy)) { + return s; + } + } + Ok(WorkerState::Busy) + }) + .await + .unwrap() + } + + async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState { + if *must_exit.borrow() { + return WorkerState::Done; + } + tokio::time::sleep(Duration::from_secs(10)).await; + WorkerState::Busy } } @@ -347,7 +369,7 @@ impl MerkleNodeKey { } impl MerkleNode { - fn decode_opt(ent: Option<sled::IVec>) -> Result<Self, Error> { + fn decode_opt(ent: &Option<db::Value>) -> Result<Self, Error> { match ent { None => Ok(MerkleNode::Empty), Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?), |