aboutsummaryrefslogtreecommitdiff
path: root/src/table/merkle.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/merkle.rs')
-rw-r--r--src/table/merkle.rs61
1 files changed, 27 insertions, 34 deletions
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index a5c29723..e86d0251 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -3,12 +3,14 @@ use std::time::Duration;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
+use tokio::select;
use tokio::sync::watch;
use garage_db as db;
use garage_util::background::*;
use garage_util::data::*;
+use garage_util::encode::{nonversioned_decode, nonversioned_encode};
use garage_util::error::Error;
use garage_rpc::ring::*;
@@ -64,22 +66,18 @@ pub enum MerkleNode {
Leaf(Vec<u8>, Hash),
}
-impl<F, R> MerkleUpdater<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
- pub(crate) fn launch(background: &BackgroundRunner, data: Arc<TableData<F, R>>) -> Arc<Self> {
- let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]);
+impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> {
+ pub(crate) fn new(data: Arc<TableData<F, R>>) -> Arc<Self> {
+ let empty_node_hash = blake2sum(&nonversioned_encode(&MerkleNode::Empty).unwrap()[..]);
- let ret = Arc::new(Self {
+ Arc::new(Self {
data,
empty_node_hash,
- });
-
- background.spawn_worker(MerkleWorker(ret.clone()));
+ })
+ }
- ret
+ pub(crate) fn spawn_workers(self: &Arc<Self>, background: &BackgroundRunner) {
+ background.spawn_worker(MerkleWorker(self.clone()));
}
fn updater_loop_iter(&self) -> Result<WorkerState, Error> {
@@ -276,7 +274,7 @@ where
tx.remove(&self.data.merkle_tree, k.encode())?;
Ok(self.empty_node_hash)
} else {
- let vby = rmp_to_vec_all_named(v).map_err(|e| db::TxError::Abort(e.into()))?;
+ let vby = nonversioned_encode(v).map_err(|e| db::TxError::Abort(e.into()))?;
let rethash = blake2sum(&vby[..]);
tx.insert(&self.data.merkle_tree, k.encode(), vby)?;
Ok(rethash)
@@ -293,32 +291,27 @@ where
Ok(self.data.merkle_tree.len()?)
}
+ pub fn merkle_tree_fast_len(&self) -> Result<Option<usize>, Error> {
+ Ok(self.data.merkle_tree.fast_len()?)
+ }
+
pub fn todo_len(&self) -> Result<usize, Error> {
Ok(self.data.merkle_todo.len()?)
}
}
-struct MerkleWorker<F, R>(Arc<MerkleUpdater<F, R>>)
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static;
+struct MerkleWorker<F: TableSchema, R: TableReplication>(Arc<MerkleUpdater<F, R>>);
#[async_trait]
-impl<F, R> Worker for MerkleWorker<F, R>
-where
- F: TableSchema + 'static,
- R: TableReplication + 'static,
-{
+impl<F: TableSchema, R: TableReplication> Worker for MerkleWorker<F, R> {
fn name(&self) -> String {
- format!("{} Merkle tree updater", F::TABLE_NAME)
+ format!("{} Merkle", F::TABLE_NAME)
}
- fn info(&self) -> Option<String> {
- let l = self.0.todo_len().unwrap_or(0);
- if l > 0 {
- Some(format!("{} items in queue", l))
- } else {
- None
+ fn status(&self) -> WorkerStatus {
+ WorkerStatus {
+ queue_length: Some(self.0.todo_len().unwrap_or(0) as u64),
+ ..Default::default()
}
}
@@ -337,11 +330,11 @@ where
.unwrap()
}
- async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
- if *must_exit.borrow() {
- return WorkerState::Done;
+ async fn wait_for_work(&mut self) -> WorkerState {
+ select! {
+ _ = tokio::time::sleep(Duration::from_secs(60)) => (),
+ _ = self.0.data.merkle_todo_notify.notified() => (),
}
- tokio::time::sleep(Duration::from_secs(10)).await;
WorkerState::Busy
}
}
@@ -372,7 +365,7 @@ impl MerkleNode {
fn decode_opt(ent: &Option<db::Value>) -> Result<Self, Error> {
match ent {
None => Ok(MerkleNode::Empty),
- Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?),
+ Some(v) => Ok(nonversioned_decode::<MerkleNode>(&v[..])?),
}
}