From dfc131850a09e7ceacfa98315adbef156e07e9ca Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 15:25:29 +0100 Subject: Simplified and more aggressive worker exit logic --- src/table/gc.rs | 5 +---- src/table/merkle.rs | 5 +---- src/table/queue.rs | 5 +---- src/table/sync.rs | 5 +---- 4 files changed, 4 insertions(+), 16 deletions(-) (limited to 'src/table') diff --git a/src/table/gc.rs b/src/table/gc.rs index 1fc16364..90594fba 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -348,10 +348,7 @@ where } } - async fn wait_for_work(&mut self, must_exit: &watch::Receiver) -> WorkerState { - if *must_exit.borrow() { - return WorkerState::Done; - } + async fn wait_for_work(&mut self) -> WorkerState { tokio::time::sleep(self.wait_delay).await; WorkerState::Busy } diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 0fe7d2cb..736354fa 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -340,10 +340,7 @@ where .unwrap() } - async fn wait_for_work(&mut self, must_exit: &watch::Receiver) -> WorkerState { - if *must_exit.borrow() { - return WorkerState::Done; - } + async fn wait_for_work(&mut self) -> WorkerState { select! { _ = tokio::time::sleep(Duration::from_secs(60)) => (), _ = self.0.data.merkle_todo_notify.notified() => (), diff --git a/src/table/queue.rs b/src/table/queue.rs index 3671ea7d..860f20d3 100644 --- a/src/table/queue.rs +++ b/src/table/queue.rs @@ -71,10 +71,7 @@ where Ok(WorkerState::Busy) } - async fn wait_for_work(&mut self, must_exit: &watch::Receiver) -> WorkerState { - if *must_exit.borrow() { - return WorkerState::Done; - } + async fn wait_for_work(&mut self) -> WorkerState { select! { _ = tokio::time::sleep(Duration::from_secs(600)) => (), _ = self.0.data.insert_queue_notify.notified() => (), diff --git a/src/table/sync.rs b/src/table/sync.rs index 1e7618ca..d6d272ab 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -593,10 +593,7 @@ impl Worker for SyncWor } } - async fn wait_for_work(&mut self, must_exit: &watch::Receiver) -> WorkerState { - if *must_exit.borrow() { - return WorkerState::Done; - } + async fn wait_for_work(&mut self) -> WorkerState { select! { s = self.add_full_sync_rx.recv() => { if let Some(()) = s { -- cgit v1.2.3