From dfc131850a09e7ceacfa98315adbef156e07e9ca Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Dec 2022 15:25:29 +0100 Subject: Simplified and more aggressive worker exit logic --- src/block/repair.rs | 4 +-- src/block/resync.rs | 2 +- src/garage/repair/online.rs | 4 +-- src/table/gc.rs | 5 +-- src/table/merkle.rs | 5 +-- src/table/queue.rs | 5 +-- src/table/sync.rs | 5 +-- src/util/background/worker.rs | 73 ++++++++++++++----------------------------- 8 files changed, 32 insertions(+), 71 deletions(-) diff --git a/src/block/repair.rs b/src/block/repair.rs index 1878027e..f5515d4e 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -148,7 +148,7 @@ impl Worker for RepairWorker { } } - async fn wait_for_work(&mut self, _must_exit: &watch::Receiver) -> WorkerState { + async fn wait_for_work(&mut self) -> WorkerState { unreachable!() } } @@ -341,7 +341,7 @@ impl Worker for ScrubWorker { } } - async fn wait_for_work(&mut self, _must_exit: &watch::Receiver) -> WorkerState { + async fn wait_for_work(&mut self) -> WorkerState { let (wait_until, command) = match &self.work { ScrubWorkerState::Running(_) => return WorkerState::Busy, ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume), diff --git a/src/block/resync.rs b/src/block/resync.rs index 8231b55d..51bb9846 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -540,7 +540,7 @@ impl Worker for ResyncWorker { } } - async fn wait_for_work(&mut self, _must_exit: &watch::Receiver) -> WorkerState { + async fn wait_for_work(&mut self) -> WorkerState { while self.index >= self.manager.resync.persisted.load().n_workers { self.manager.resync.notify.notified().await } diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 4b4118a8..cd7de49b 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -136,7 +136,7 @@ impl Worker for RepairVersionsWorker { Ok(WorkerState::Busy) } - async fn wait_for_work(&mut self, _must_exit: &watch::Receiver) -> WorkerState { + async fn wait_for_work(&mut self) -> WorkerState { unreachable!() } } @@ -214,7 +214,7 @@ impl Worker for RepairBlockrefsWorker { Ok(WorkerState::Busy) } - async fn wait_for_work(&mut self, _must_exit: &watch::Receiver) -> WorkerState { + async fn wait_for_work(&mut self) -> WorkerState { unreachable!() } } diff --git a/src/table/gc.rs b/src/table/gc.rs index 1fc16364..90594fba 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -348,10 +348,7 @@ where } } - async fn wait_for_work(&mut self, must_exit: &watch::Receiver) -> WorkerState { - if *must_exit.borrow() { - return WorkerState::Done; - } + async fn wait_for_work(&mut self) -> WorkerState { tokio::time::sleep(self.wait_delay).await; WorkerState::Busy } diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 0fe7d2cb..736354fa 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -340,10 +340,7 @@ where .unwrap() } - async fn wait_for_work(&mut self, must_exit: &watch::Receiver) -> WorkerState { - if *must_exit.borrow() { - return WorkerState::Done; - } + async fn wait_for_work(&mut self) -> WorkerState { select! { _ = tokio::time::sleep(Duration::from_secs(60)) => (), _ = self.0.data.merkle_todo_notify.notified() => (), diff --git a/src/table/queue.rs b/src/table/queue.rs index 3671ea7d..860f20d3 100644 --- a/src/table/queue.rs +++ b/src/table/queue.rs @@ -71,10 +71,7 @@ where Ok(WorkerState::Busy) } - async fn wait_for_work(&mut self, must_exit: &watch::Receiver) -> WorkerState { - if *must_exit.borrow() { - return WorkerState::Done; - } + async fn wait_for_work(&mut self) -> WorkerState { select! { _ = tokio::time::sleep(Duration::from_secs(600)) => (), _ = self.0.data.insert_queue_notify.notified() => (), diff --git a/src/table/sync.rs b/src/table/sync.rs index 1e7618ca..d6d272ab 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -593,10 +593,7 @@ impl Worker for SyncWor } } - async fn wait_for_work(&mut self, must_exit: &watch::Receiver) -> WorkerState { - if *must_exit.borrow() { - return WorkerState::Done; - } + async fn wait_for_work(&mut self) -> WorkerState { select! { s = self.add_full_sync_rx.recv() => { if let Some(()) = s { diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs index 7e9da7f8..8165e2cb 100644 --- a/src/util/background/worker.rs +++ b/src/util/background/worker.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; use async_trait::async_trait; use futures::future::*; @@ -14,6 +14,10 @@ use crate::background::{WorkerInfo, WorkerStatus}; use crate::error::Error; use crate::time::now_msec; +// All workers that haven't exited for this time after an exit signal was recieved +// will be interrupted in the middle of whatever they are doing. +const EXIT_DEADLINE: Duration = Duration::from_secs(8); + #[derive(PartialEq, Copy, Clone, Serialize, Deserialize, Debug)] pub enum WorkerState { Busy, @@ -50,10 +54,8 @@ pub trait Worker: Send { async fn work(&mut self, must_exit: &mut watch::Receiver) -> Result; /// Wait for work: await for some task to become available. This future can be interrupted in - /// the middle for any reason. This future doesn't have to await on must_exit.changed(), we - /// are doing it for you. Therefore it only receives a read refernce to must_exit which allows - /// it to check if we are exiting. - async fn wait_for_work(&mut self, must_exit: &watch::Receiver) -> WorkerState; + /// the middle for any reason, for example if an interrupt signal was recieved. + async fn wait_for_work(&mut self) -> WorkerState; } pub(crate) struct WorkerProcessor { @@ -93,11 +95,9 @@ impl WorkerProcessor { let task_id = next_task_id; next_task_id += 1; let stop_signal = self.stop_signal.clone(); - let stop_signal_worker = self.stop_signal.clone(); let mut worker = WorkerHandler { task_id, stop_signal, - stop_signal_worker, worker: new_worker, state: WorkerState::Busy, errors: 0, @@ -153,26 +153,14 @@ impl WorkerProcessor { } // We are exiting, drain everything - let drain_half_time = Instant::now() + Duration::from_secs(5); let drain_everything = async move { - while let Some(mut worker) = workers.next().await { - if worker.state == WorkerState::Done { - info!( - "Worker {} (TID {}) exited", - worker.worker.name(), - worker.task_id - ); - } else if Instant::now() > drain_half_time { - warn!("Worker {} (TID {}) interrupted between two iterations in state {:?} (this should be fine)", worker.worker.name(), worker.task_id, worker.state); - } else { - workers.push( - async move { - worker.step().await; - worker - } - .boxed(), - ); - } + while let Some(worker) = workers.next().await { + info!( + "Worker {} (TID {}) exited (last state: {:?})", + worker.worker.name(), + worker.task_id, + worker.state + ); } }; @@ -180,7 +168,7 @@ impl WorkerProcessor { _ = drain_everything => { info!("All workers exited peacefully \\o/"); } - _ = tokio::time::sleep(Duration::from_secs(9)) => { + _ = tokio::time::sleep(EXIT_DEADLINE) => { error!("Some workers could not exit in time, we are cancelling some things in the middle"); } } @@ -190,7 +178,6 @@ impl WorkerProcessor { struct WorkerHandler { task_id: usize, stop_signal: watch::Receiver, - stop_signal_worker: watch::Receiver, worker: Box, state: WorkerState, errors: usize, @@ -225,33 +212,19 @@ impl WorkerHandler { }, WorkerState::Throttled(delay) => { // Sleep for given delay and go back to busy state - if !*self.stop_signal.borrow() { - select! { - _ = tokio::time::sleep(Duration::from_secs_f32(delay)) => (), - _ = self.stop_signal.changed() => (), + select! { + _ = tokio::time::sleep(Duration::from_secs_f32(delay)) => { + self.state = WorkerState::Busy; } + _ = self.stop_signal.changed() => (), } - self.state = WorkerState::Busy; } WorkerState::Idle => { - if *self.stop_signal.borrow() { - select! { - new_st = self.worker.wait_for_work(&self.stop_signal_worker) => { - self.state = new_st; - } - _ = tokio::time::sleep(Duration::from_secs(1)) => { - // stay in Idle state - } - } - } else { - select! { - new_st = self.worker.wait_for_work(&self.stop_signal_worker) => { - self.state = new_st; - } - _ = self.stop_signal.changed() => { - // stay in Idle state - } + select! { + new_st = self.worker.wait_for_work() => { + self.state = new_st; } + _ = self.stop_signal.changed() => (), } } WorkerState::Done => unreachable!(), -- cgit v1.2.3