aboutsummaryrefslogtreecommitdiff
path: root/src/table/merkle.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/table/merkle.rs')
-rw-r--r--src/table/merkle.rs21
1 files changed, 11 insertions, 10 deletions
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index e977bfb5..736354fa 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -3,6 +3,7 @@ use std::time::Duration;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
+use tokio::select;
use tokio::sync::watch;
use garage_db as db;
@@ -69,17 +70,17 @@ where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
- pub(crate) fn launch(background: &BackgroundRunner, data: Arc<TableData<F, R>>) -> Arc<Self> {
+ pub(crate) fn new(data: Arc<TableData<F, R>>) -> Arc<Self> {
let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]);
- let ret = Arc::new(Self {
+ Arc::new(Self {
data,
empty_node_hash,
- });
-
- background.spawn_worker(MerkleWorker(ret.clone()));
+ })
+ }
- ret
+ pub(crate) fn spawn_workers(self: &Arc<Self>, background: &BackgroundRunner) {
+ background.spawn_worker(MerkleWorker(self.clone()));
}
fn updater_loop_iter(&self) -> Result<WorkerState, Error> {
@@ -339,11 +340,11 @@ where
.unwrap()
}
- async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
- if *must_exit.borrow() {
- return WorkerState::Done;
+ async fn wait_for_work(&mut self) -> WorkerState {
+ select! {
+ _ = tokio::time::sleep(Duration::from_secs(60)) => (),
+ _ = self.0.data.merkle_todo_notify.notified() => (),
}
- tokio::time::sleep(Duration::from_secs(10)).await;
WorkerState::Busy
}
}