diff options
Diffstat (limited to 'src/table/merkle.rs')
-rw-r--r-- | src/table/merkle.rs | 61 |
1 files changed, 27 insertions, 34 deletions
diff --git a/src/table/merkle.rs b/src/table/merkle.rs index a5c29723..e86d0251 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -3,12 +3,14 @@ use std::time::Duration; use async_trait::async_trait; use serde::{Deserialize, Serialize}; +use tokio::select; use tokio::sync::watch; use garage_db as db; use garage_util::background::*; use garage_util::data::*; +use garage_util::encode::{nonversioned_decode, nonversioned_encode}; use garage_util::error::Error; use garage_rpc::ring::*; @@ -64,22 +66,18 @@ pub enum MerkleNode { Leaf(Vec<u8>, Hash), } -impl<F, R> MerkleUpdater<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ - pub(crate) fn launch(background: &BackgroundRunner, data: Arc<TableData<F, R>>) -> Arc<Self> { - let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]); +impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> { + pub(crate) fn new(data: Arc<TableData<F, R>>) -> Arc<Self> { + let empty_node_hash = blake2sum(&nonversioned_encode(&MerkleNode::Empty).unwrap()[..]); - let ret = Arc::new(Self { + Arc::new(Self { data, empty_node_hash, - }); - - background.spawn_worker(MerkleWorker(ret.clone())); + }) + } - ret + pub(crate) fn spawn_workers(self: &Arc<Self>, background: &BackgroundRunner) { + background.spawn_worker(MerkleWorker(self.clone())); } fn updater_loop_iter(&self) -> Result<WorkerState, Error> { @@ -276,7 +274,7 @@ where tx.remove(&self.data.merkle_tree, k.encode())?; Ok(self.empty_node_hash) } else { - let vby = rmp_to_vec_all_named(v).map_err(|e| db::TxError::Abort(e.into()))?; + let vby = nonversioned_encode(v).map_err(|e| db::TxError::Abort(e.into()))?; let rethash = blake2sum(&vby[..]); tx.insert(&self.data.merkle_tree, k.encode(), vby)?; Ok(rethash) @@ -293,32 +291,27 @@ where Ok(self.data.merkle_tree.len()?) } + pub fn merkle_tree_fast_len(&self) -> Result<Option<usize>, Error> { + Ok(self.data.merkle_tree.fast_len()?) + } + pub fn todo_len(&self) -> Result<usize, Error> { Ok(self.data.merkle_todo.len()?) } } -struct MerkleWorker<F, R>(Arc<MerkleUpdater<F, R>>) -where - F: TableSchema + 'static, - R: TableReplication + 'static; +struct MerkleWorker<F: TableSchema, R: TableReplication>(Arc<MerkleUpdater<F, R>>); #[async_trait] -impl<F, R> Worker for MerkleWorker<F, R> -where - F: TableSchema + 'static, - R: TableReplication + 'static, -{ +impl<F: TableSchema, R: TableReplication> Worker for MerkleWorker<F, R> { fn name(&self) -> String { - format!("{} Merkle tree updater", F::TABLE_NAME) + format!("{} Merkle", F::TABLE_NAME) } - fn info(&self) -> Option<String> { - let l = self.0.todo_len().unwrap_or(0); - if l > 0 { - Some(format!("{} items in queue", l)) - } else { - None + fn status(&self) -> WorkerStatus { + WorkerStatus { + queue_length: Some(self.0.todo_len().unwrap_or(0) as u64), + ..Default::default() } } @@ -337,11 +330,11 @@ where .unwrap() } - async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState { - if *must_exit.borrow() { - return WorkerState::Done; + async fn wait_for_work(&mut self) -> WorkerState { + select! { + _ = tokio::time::sleep(Duration::from_secs(60)) => (), + _ = self.0.data.merkle_todo_notify.notified() => (), } - tokio::time::sleep(Duration::from_secs(10)).await; WorkerState::Busy } } @@ -372,7 +365,7 @@ impl MerkleNode { fn decode_opt(ent: &Option<db::Value>) -> Result<Self, Error> { match ent { None => Ok(MerkleNode::Empty), - Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?), + Some(v) => Ok(nonversioned_decode::<MerkleNode>(&v[..])?), } } |