From 83c8467e23c1f531ae233766d5dc7244afe57f08 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 11:58:06 +0100 Subject: Proper queueing for delayed inserts, now backed to disk --- src/table/merkle.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'src/table/merkle.rs') diff --git a/src/table/merkle.rs b/src/table/merkle.rs index e977bfb5..bcf9f9d7 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -3,6 +3,7 @@ use std::time::Duration; use async_trait::async_trait; use serde::{Deserialize, Serialize}; +use tokio::select; use tokio::sync::watch; use garage_db as db; @@ -343,7 +344,10 @@ where if *must_exit.borrow() { return WorkerState::Done; } - tokio::time::sleep(Duration::from_secs(10)).await; + select! { + _ = tokio::time::sleep(Duration::from_secs(60)) => (), + _ = self.0.data.merkle_todo_notify.notified() => (), + } WorkerState::Busy } } -- cgit v1.2.3 From 2183518edccadef47cdeaf6476033b52d8832d6e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 12:28:07 +0100 Subject: Spawn all background workers in a separate step --- src/table/merkle.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'src/table/merkle.rs') diff --git a/src/table/merkle.rs b/src/table/merkle.rs index bcf9f9d7..0fe7d2cb 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -70,17 +70,17 @@ where F: TableSchema + 'static, R: TableReplication + 'static, { - pub(crate) fn launch(background: &BackgroundRunner, data: Arc>) -> Arc { + pub(crate) fn new(data: Arc>) -> Arc { let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]); - let ret = Arc::new(Self { + Arc::new(Self { data, empty_node_hash, - }); - - background.spawn_worker(MerkleWorker(ret.clone())); + }) + } - ret + pub(crate) fn spawn_workers(self: &Arc, background: &BackgroundRunner) { + background.spawn_worker(MerkleWorker(self.clone())); } fn updater_loop_iter(&self) -> Result { -- cgit v1.2.3 From dfc131850a09e7ceacfa98315adbef156e07e9ca Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 15:25:29 +0100 Subject: Simplified and more aggressive worker exit logic --- src/table/merkle.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) (limited to 'src/table/merkle.rs') diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 0fe7d2cb..736354fa 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -340,10 +340,7 @@ where .unwrap() } - async fn wait_for_work(&mut self, must_exit: &watch::Receiver) -> WorkerState { - if *must_exit.borrow() { - return WorkerState::Done; - } + async fn wait_for_work(&mut self) -> WorkerState { select! { _ = tokio::time::sleep(Duration::from_secs(60)) => (), _ = self.0.data.merkle_todo_notify.notified() => (), -- cgit v1.2.3