diff options
author | Alex Auvolat <alex@adnab.me> | 2022-07-08 10:39:41 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2022-07-08 10:39:41 +0200 |
commit | d1cf1a0fa6952e84874f36f1dc66e4a978959d8f (patch) | |
tree | f65135f725b9f22ac2eac54e5023ab6a548549ec /src | |
parent | 0f660b086c23d13c91b0c55fd4d43017a09c1f4b (diff) | |
download | garage-d1cf1a0fa6952e84874f36f1dc66e4a978959d8f.tar.gz garage-d1cf1a0fa6952e84874f36f1dc66e4a978959d8f.zip |
Rename WorkerStatus to WorkerState
because it's a state in a state machine
Diffstat (limited to 'src')
-rw-r--r-- | src/block/manager.rs | 10 | ||||
-rw-r--r-- | src/block/repair.rs | 32 | ||||
-rw-r--r-- | src/garage/cli/util.rs | 12 | ||||
-rw-r--r-- | src/garage/repair/online.rs | 16 | ||||
-rw-r--r-- | src/model/index_counter.rs | 18 | ||||
-rw-r--r-- | src/table/gc.rs | 12 | ||||
-rw-r--r-- | src/table/merkle.rs | 18 | ||||
-rw-r--r-- | src/table/sync.rs | 14 | ||||
-rw-r--r-- | src/util/background/job_worker.rs | 12 | ||||
-rw-r--r-- | src/util/background/mod.rs | 4 | ||||
-rw-r--r-- | src/util/background/worker.rs | 54 | ||||
-rw-r--r-- | src/util/tranquilizer.rs | 8 |
12 files changed, 105 insertions, 105 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs index 17d4a72d..e54fb992 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -776,16 +776,16 @@ impl Worker for ResyncWorker { async fn work( &mut self, _must_exit: &mut watch::Receiver<bool>, - ) -> Result<WorkerStatus, Error> { + ) -> Result<WorkerState, Error> { self.tranquilizer.reset(); match self.manager.resync_iter().await { Ok(ResyncIterResult::BusyDidSomething) => Ok(self .tranquilizer .tranquilize_worker(self.manager.background_tranquility)), - Ok(ResyncIterResult::BusyDidNothing) => Ok(WorkerStatus::Busy), + Ok(ResyncIterResult::BusyDidNothing) => Ok(WorkerState::Busy), Ok(ResyncIterResult::IdleFor(delay)) => { self.next_delay = delay; - Ok(WorkerStatus::Idle) + Ok(WorkerState::Idle) } Err(e) => { // The errors that we have here are only Sled errors @@ -799,12 +799,12 @@ impl Worker for ResyncWorker { } } - async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus { + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState { select! { _ = tokio::time::sleep(self.next_delay) => (), _ = self.manager.resync_notify.notified() => (), }; - WorkerStatus::Busy + WorkerState::Busy } } diff --git a/src/block/repair.rs b/src/block/repair.rs index 26d1bd8f..cd5afe44 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -65,7 +65,7 @@ impl Worker for RepairWorker { async fn work( &mut self, _must_exit: &mut watch::Receiver<bool>, - ) -> Result<WorkerStatus, Error> { + ) -> Result<WorkerState, Error> { match self.block_iter.as_mut() { None => { // Phase 1: Repair blocks from RC table. @@ -101,7 +101,7 @@ impl Worker for RepairWorker { if batch_of_hashes.is_empty() { // move on to phase 2 self.block_iter = Some(BlockStoreIterator::new(&self.manager)); - return Ok(WorkerStatus::Busy); + return Ok(WorkerState::Busy); } for hash in batch_of_hashes.into_iter() { @@ -109,7 +109,7 @@ impl Worker for RepairWorker { self.next_start = Some(hash) } - Ok(WorkerStatus::Busy) + Ok(WorkerState::Busy) } Some(bi) => { // Phase 2: Repair blocks actually on disk @@ -118,15 +118,15 @@ impl Worker for RepairWorker { // so that we can offload them if necessary and then delete them locally. if let Some(hash) = bi.next().await? { self.manager.put_to_resync(&hash, Duration::from_secs(0))?; - Ok(WorkerStatus::Busy) + Ok(WorkerState::Busy) } else { - Ok(WorkerStatus::Done) + Ok(WorkerState::Done) } } } } - async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus { + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState { unreachable!() } } @@ -282,10 +282,10 @@ impl Worker for ScrubWorker { async fn work( &mut self, _must_exit: &mut watch::Receiver<bool>, - ) -> Result<WorkerStatus, Error> { + ) -> Result<WorkerState, Error> { match self.rx_cmd.try_recv() { Ok(cmd) => self.handle_cmd(cmd).await, - Err(mpsc::error::TryRecvError::Disconnected) => return Ok(WorkerStatus::Done), + Err(mpsc::error::TryRecvError::Disconnected) => return Ok(WorkerState::Done), Err(mpsc::error::TryRecvError::Empty) => (), }; @@ -310,16 +310,16 @@ impl Worker for ScrubWorker { self.persister.save_async(&self.persisted).await?; self.work = ScrubWorkerState::Finished; self.tranquilizer.clear(); - Ok(WorkerStatus::Idle) + Ok(WorkerState::Idle) } } - _ => Ok(WorkerStatus::Idle), + _ => Ok(WorkerState::Idle), } } - async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus { + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState { let (wait_until, command) = match &self.work { - ScrubWorkerState::Running(_) => return WorkerStatus::Busy, + ScrubWorkerState::Running(_) => return WorkerState::Busy, ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume), ScrubWorkerState::Finished => ( self.persisted.time_last_complete_scrub + SCRUB_INTERVAL.as_millis() as u64, @@ -330,7 +330,7 @@ impl Worker for ScrubWorker { let now = now_msec(); if now >= wait_until { self.handle_cmd(command).await; - return WorkerStatus::Busy; + return WorkerState::Busy; } let delay = Duration::from_millis(wait_until - now); select! { @@ -338,13 +338,13 @@ impl Worker for ScrubWorker { cmd = self.rx_cmd.recv() => if let Some(cmd) = cmd { self.handle_cmd(cmd).await; } else { - return WorkerStatus::Done; + return WorkerState::Done; } } match &self.work { - ScrubWorkerState::Running(_) => WorkerStatus::Busy, - _ => WorkerStatus::Idle, + ScrubWorkerState::Running(_) => WorkerState::Busy, + _ => WorkerState::Idle, } } } diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index 8be56138..396938ae 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -245,10 +245,10 @@ pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) { let mut wi = wi.into_iter().collect::<Vec<_>>(); wi.sort_by_key(|(tid, info)| { ( - match info.status { - WorkerStatus::Busy | WorkerStatus::Throttled(_) => 0, - WorkerStatus::Idle => 1, - WorkerStatus::Done => 2, + match info.state { + WorkerState::Busy | WorkerState::Throttled(_) => 0, + WorkerState::Idle => 1, + WorkerState::Done => 2, }, *tid, ) @@ -256,14 +256,14 @@ pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) { let mut table = vec![]; for (tid, info) in wi.iter() { - if wlo.busy && !matches!(info.status, WorkerStatus::Busy | WorkerStatus::Throttled(_)) { + if wlo.busy && !matches!(info.state, WorkerState::Busy | WorkerState::Throttled(_)) { continue; } if wlo.errors && info.errors == 0 { continue; } - table.push(format!("{}\t{}\t{}", tid, info.status, info.name)); + table.push(format!("{}\t{}\t{}", tid, info.state, info.name)); if let Some(i) = &info.info { table.push(format!("\t\t {}", i)); } diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index eeb9cea3..160ce8f8 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -92,7 +92,7 @@ impl Worker for RepairVersionsWorker { async fn work( &mut self, _must_exit: &mut watch::Receiver<bool>, - ) -> Result<WorkerStatus, Error> { + ) -> Result<WorkerState, Error> { let item_bytes = match self.garage.version_table.data.store.get_gt(&self.pos)? { Some((k, v)) => { self.pos = k; @@ -100,7 +100,7 @@ impl Worker for RepairVersionsWorker { } None => { info!("repair_versions: finished, done {}", self.counter); - return Ok(WorkerStatus::Done); + return Ok(WorkerState::Done); } }; @@ -134,10 +134,10 @@ impl Worker for RepairVersionsWorker { } } - Ok(WorkerStatus::Busy) + Ok(WorkerState::Busy) } - async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus { + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState { unreachable!() } } @@ -173,7 +173,7 @@ impl Worker for RepairBlockrefsWorker { async fn work( &mut self, _must_exit: &mut watch::Receiver<bool>, - ) -> Result<WorkerStatus, Error> { + ) -> Result<WorkerState, Error> { let item_bytes = match self.garage.block_ref_table.data.store.get_gt(&self.pos)? { Some((k, v)) => { self.pos = k; @@ -181,7 +181,7 @@ impl Worker for RepairBlockrefsWorker { } None => { info!("repair_block_ref: finished, done {}", self.counter); - return Ok(WorkerStatus::Done); + return Ok(WorkerState::Done); } }; @@ -212,10 +212,10 @@ impl Worker for RepairBlockrefsWorker { } } - Ok(WorkerStatus::Busy) + Ok(WorkerState::Busy) } - async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus { + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState { unreachable!() } } diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 9d5aa955..26833390 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -415,7 +415,7 @@ impl<T: CountedItem> Worker for IndexPropagatorWorker<T> { } } - async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error> { + async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { // This loop batches updates to counters to be sent all at once. // They are sent once the propagate_rx channel has been emptied (or is closed). let closed = loop { @@ -435,7 +435,7 @@ impl<T: CountedItem> Worker for IndexPropagatorWorker<T> { self.errors += 1; if self.errors >= 2 && *must_exit.borrow() { error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, self.buf.len(), e); - return Ok(WorkerStatus::Done); + return Ok(WorkerState::Done); } // Propagate error up to worker manager, it will log it, increment a counter, // and sleep for a certain delay (with exponential backoff), waiting for @@ -448,23 +448,23 @@ impl<T: CountedItem> Worker for IndexPropagatorWorker<T> { self.errors = 0; } - return Ok(WorkerStatus::Busy); + return Ok(WorkerState::Busy); } else if closed { - return Ok(WorkerStatus::Done); + return Ok(WorkerState::Done); } else { - return Ok(WorkerStatus::Idle); + return Ok(WorkerState::Idle); } } - async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus { + async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState { match self.propagate_rx.recv().await { Some((pk, sk, counters)) => { self.add_ent(pk, sk, counters); - WorkerStatus::Busy + WorkerState::Busy } None => match self.buf.is_empty() { - false => WorkerStatus::Busy, - true => WorkerStatus::Done, + false => WorkerState::Busy, + true => WorkerState::Done, }, } } diff --git a/src/table/gc.rs b/src/table/gc.rs index 0899d5e5..9ffae184 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -347,22 +347,22 @@ where async fn work( &mut self, _must_exit: &mut watch::Receiver<bool>, - ) -> Result<WorkerStatus, Error> { + ) -> Result<WorkerState, Error> { match self.gc.gc_loop_iter().await? { - None => Ok(WorkerStatus::Busy), + None => Ok(WorkerState::Busy), Some(delay) => { self.wait_delay = delay; - Ok(WorkerStatus::Idle) + Ok(WorkerState::Idle) } } } - async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus { + async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState { if *must_exit.borrow() { - return WorkerStatus::Done; + return WorkerState::Done; } tokio::time::sleep(self.wait_delay).await; - WorkerStatus::Busy + WorkerState::Busy } } diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 21186220..ca5891a7 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -82,12 +82,12 @@ where ret } - fn updater_loop_iter(&self) -> Result<WorkerStatus, Error> { + fn updater_loop_iter(&self) -> Result<WorkerState, Error> { if let Some((key, valhash)) = self.data.merkle_todo.first()? { self.update_item(&key, &valhash)?; - Ok(WorkerStatus::Busy) + Ok(WorkerState::Busy) } else { - Ok(WorkerStatus::Idle) + Ok(WorkerState::Idle) } } @@ -325,27 +325,27 @@ where async fn work( &mut self, _must_exit: &mut watch::Receiver<bool>, - ) -> Result<WorkerStatus, Error> { + ) -> Result<WorkerState, Error> { let updater = self.0.clone(); tokio::task::spawn_blocking(move || { for _i in 0..100 { let s = updater.updater_loop_iter(); - if !matches!(s, Ok(WorkerStatus::Busy)) { + if !matches!(s, Ok(WorkerState::Busy)) { return s; } } - Ok(WorkerStatus::Busy) + Ok(WorkerState::Busy) }) .await .unwrap() } - async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus { + async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState { if *must_exit.borrow() { - return WorkerStatus::Done; + return WorkerState::Done; } tokio::time::sleep(Duration::from_secs(10)).await; - WorkerStatus::Busy + WorkerState::Busy } } diff --git a/src/table/sync.rs b/src/table/sync.rs index a7e1994c..b3756a5e 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -586,18 +586,18 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWor } } - async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error> { + async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { if let Some(partition) = self.pop_task() { self.syncer.sync_partition(&partition, must_exit).await?; - Ok(WorkerStatus::Busy) + Ok(WorkerState::Busy) } else { - Ok(WorkerStatus::Idle) + Ok(WorkerState::Idle) } } - async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus { + async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState { if *must_exit.borrow() { - return WorkerStatus::Done; + return WorkerState::Done; } select! { s = self.add_full_sync_rx.recv() => { @@ -619,8 +619,8 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWor } } match self.todo.is_empty() { - false => WorkerStatus::Busy, - true => WorkerStatus::Idle, + false => WorkerState::Busy, + true => WorkerState::Idle, } } } diff --git a/src/util/background/job_worker.rs b/src/util/background/job_worker.rs index fcdac582..6754382a 100644 --- a/src/util/background/job_worker.rs +++ b/src/util/background/job_worker.rs @@ -24,17 +24,17 @@ impl Worker for JobWorker { async fn work( &mut self, _must_exit: &mut watch::Receiver<bool>, - ) -> Result<WorkerStatus, Error> { + ) -> Result<WorkerState, Error> { match self.next_job.take() { - None => return Ok(WorkerStatus::Idle), + None => return Ok(WorkerState::Idle), Some(job) => { job.await?; - Ok(WorkerStatus::Busy) + Ok(WorkerState::Busy) } } } - async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus { + async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState { loop { match self.job_chan.lock().await.recv().await { Some((job, cancellable)) => { @@ -42,9 +42,9 @@ impl Worker for JobWorker { continue; } self.next_job = Some(job); - return WorkerStatus::Busy; + return WorkerState::Busy; } - None => return WorkerStatus::Done, + None => return WorkerState::Done, } } } diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs index 636b9c13..619f5068 100644 --- a/src/util/background/mod.rs +++ b/src/util/background/mod.rs @@ -14,7 +14,7 @@ use tokio::sync::{mpsc, watch, Mutex}; use crate::error::Error; use worker::WorkerProcessor; -pub use worker::{Worker, WorkerStatus}; +pub use worker::{Worker, WorkerState}; pub(crate) type JobOutput = Result<(), Error>; pub(crate) type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>; @@ -30,7 +30,7 @@ pub struct BackgroundRunner { pub struct WorkerInfo { pub name: String, pub info: Option<String>, - pub status: WorkerStatus, + pub state: WorkerState, pub errors: usize, pub consecutive_errors: usize, pub last_error: Option<(String, u64)>, diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs index aadc677f..7f573a07 100644 --- a/src/util/background/worker.rs +++ b/src/util/background/worker.rs @@ -16,20 +16,20 @@ use crate::error::Error; use crate::time::now_msec; #[derive(PartialEq, Copy, Clone, Serialize, Deserialize, Debug)] -pub enum WorkerStatus { +pub enum WorkerState { Busy, Throttled(f32), Idle, Done, } -impl std::fmt::Display for WorkerStatus { +impl std::fmt::Display for WorkerState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - WorkerStatus::Busy => write!(f, "Busy"), - WorkerStatus::Throttled(t) => write!(f, "Thr:{:.3}", t), - WorkerStatus::Idle => write!(f, "Idle"), - WorkerStatus::Done => write!(f, "Done"), + WorkerState::Busy => write!(f, "Busy"), + WorkerState::Throttled(t) => write!(f, "Thr:{:.3}", t), + WorkerState::Idle => write!(f, "Idle"), + WorkerState::Done => write!(f, "Done"), } } } @@ -43,18 +43,18 @@ pub trait Worker: Send { } /// Work: do a basic unit of work, if one is available (otherwise, should return - /// WorkerStatus::Idle immediately). We will do our best to not interrupt this future in the + /// WorkerState::Idle immediately). We will do our best to not interrupt this future in the /// middle of processing, it will only be interrupted at the last minute when Garage is trying /// to exit and this hasn't returned yet. This function may return an error to indicate that /// its unit of work could not be processed due to an error: the error will be logged and /// .work() will be called again after a short delay. - async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error>; + async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error>; /// Wait for work: await for some task to become available. This future can be interrupted in /// the middle for any reason. This future doesn't have to await on must_exit.changed(), we /// are doing it for you. Therefore it only receives a read refernce to must_exit which allows /// it to check if we are exiting. - async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus; + async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState; } pub(crate) struct WorkerProcessor { @@ -100,7 +100,7 @@ impl WorkerProcessor { stop_signal, stop_signal_worker, worker: new_worker, - status: WorkerStatus::Busy, + state: WorkerState::Busy, errors: 0, consecutive_errors: 0, last_error: None, @@ -113,13 +113,13 @@ impl WorkerProcessor { } worker = await_next_worker => { if let Some(mut worker) = worker { - trace!("{} (TID {}): {:?}", worker.worker.name(), worker.task_id, worker.status); + trace!("{} (TID {}): {:?}", worker.worker.name(), worker.task_id, worker.state); // Save worker info let mut wi = self.worker_info.lock().unwrap(); match wi.get_mut(&worker.task_id) { Some(i) => { - i.status = worker.status; + i.state = worker.state; i.info = worker.worker.info(); i.errors = worker.errors; i.consecutive_errors = worker.consecutive_errors; @@ -130,7 +130,7 @@ impl WorkerProcessor { None => { wi.insert(worker.task_id, WorkerInfo { name: worker.worker.name(), - status: worker.status, + state: worker.state, info: worker.worker.info(), errors: worker.errors, consecutive_errors: worker.consecutive_errors, @@ -139,7 +139,7 @@ impl WorkerProcessor { } } - if worker.status == WorkerStatus::Done { + if worker.state == WorkerState::Done { info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id); } else { workers.push(async move { @@ -157,14 +157,14 @@ impl WorkerProcessor { let drain_half_time = Instant::now() + Duration::from_secs(5); let drain_everything = async move { while let Some(mut worker) = workers.next().await { - if worker.status == WorkerStatus::Done { + if worker.state == WorkerState::Done { info!( "Worker {} (TID {}) exited", worker.worker.name(), worker.task_id ); } else if Instant::now() > drain_half_time { - warn!("Worker {} (TID {}) interrupted between two iterations in state {:?} (this should be fine)", worker.worker.name(), worker.task_id, worker.status); + warn!("Worker {} (TID {}) interrupted between two iterations in state {:?} (this should be fine)", worker.worker.name(), worker.task_id, worker.state); } else { workers.push( async move { @@ -193,7 +193,7 @@ struct WorkerHandler { stop_signal: watch::Receiver<bool>, stop_signal_worker: watch::Receiver<bool>, worker: Box<dyn Worker>, - status: WorkerStatus, + state: WorkerState, errors: usize, consecutive_errors: usize, last_error: Option<(String, u64)>, @@ -201,10 +201,10 @@ struct WorkerHandler { impl WorkerHandler { async fn step(&mut self) { - match self.status { - WorkerStatus::Busy => match self.worker.work(&mut self.stop_signal).await { + match self.state { + WorkerState::Busy => match self.worker.work(&mut self.stop_signal).await { Ok(s) => { - self.status = s; + self.state = s; self.consecutive_errors = 0; } Err(e) => { @@ -219,12 +219,12 @@ impl WorkerHandler { self.last_error = Some((format!("{}", e), now_msec())); // Sleep a bit so that error won't repeat immediately, exponential backoff // strategy (min 1sec, max ~60sec) - self.status = WorkerStatus::Throttled( + self.state = WorkerState::Throttled( (1.5f32).powf(std::cmp::min(10, self.consecutive_errors - 1) as f32), ); } }, - WorkerStatus::Throttled(delay) => { + WorkerState::Throttled(delay) => { // Sleep for given delay and go back to busy state if !*self.stop_signal.borrow() { select! { @@ -232,13 +232,13 @@ impl WorkerHandler { _ = self.stop_signal.changed() => (), } } - self.status = WorkerStatus::Busy; + self.state = WorkerState::Busy; } - WorkerStatus::Idle => { + WorkerState::Idle => { if *self.stop_signal.borrow() { select! { new_st = self.worker.wait_for_work(&self.stop_signal_worker) => { - self.status = new_st; + self.state = new_st; } _ = tokio::time::sleep(Duration::from_secs(1)) => { // stay in Idle state @@ -247,7 +247,7 @@ impl WorkerHandler { } else { select! { new_st = self.worker.wait_for_work(&self.stop_signal_worker) => { - self.status = new_st; + self.state = new_st; } _ = self.stop_signal.changed() => { // stay in Idle state @@ -255,7 +255,7 @@ impl WorkerHandler { } } } - WorkerStatus::Done => unreachable!(), + WorkerState::Done => unreachable!(), } } } diff --git a/src/util/tranquilizer.rs b/src/util/tranquilizer.rs index 9c796f8b..fdb2918b 100644 --- a/src/util/tranquilizer.rs +++ b/src/util/tranquilizer.rs @@ -3,7 +3,7 @@ use std::time::{Duration, Instant}; use tokio::time::sleep; -use crate::background::WorkerStatus; +use crate::background::WorkerState; /// A tranquilizer is a helper object that is used to make /// background operations not take up too much time. @@ -61,10 +61,10 @@ impl Tranquilizer { } #[must_use] - pub fn tranquilize_worker(&mut self, tranquility: u32) -> WorkerStatus { + pub fn tranquilize_worker(&mut self, tranquility: u32) -> WorkerState { match self.tranquilize_internal(tranquility) { - Some(delay) => WorkerStatus::Throttled(delay.as_secs_f32()), - None => WorkerStatus::Busy, + Some(delay) => WorkerState::Throttled(delay.as_secs_f32()), + None => WorkerState::Busy, } } |