aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-06-24 10:31:11 +0200
committerAlex Auvolat <alex@adnab.me>2022-06-24 10:31:11 +0200
commit95ffba343f14d7274e08099b9aca5a85da2259ed (patch)
treef6a965fcce0a11dbab951e13acef24ab8ab7e9d9
parent59b43914d4a9ae9a50ae79fee61b1a46bff941f9 (diff)
downloadgarage-95ffba343f14d7274e08099b9aca5a85da2259ed.tar.gz
garage-95ffba343f14d7274e08099b9aca5a85da2259ed.zip
Error reporting
-rw-r--r--Cargo.lock188
-rw-r--r--Makefile11
-rw-r--r--src/garage/Cargo.toml1
-rw-r--r--src/garage/admin.rs20
-rw-r--r--src/garage/cli/cmd.rs8
-rw-r--r--src/garage/cli/structs.rs15
-rw-r--r--src/garage/cli/util.rs42
-rw-r--r--src/model/index_counter.rs6
-rw-r--r--src/table/gc.rs5
-rw-r--r--src/table/merkle.rs5
-rw-r--r--src/table/sync.rs5
-rw-r--r--src/util/background/mod.rs2
-rw-r--r--src/util/background/worker.rs15
13 files changed, 274 insertions, 49 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 007e3b4e..8de73002 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -95,6 +95,15 @@ dependencies = [
[[package]]
name = "autocfg"
+version = "0.1.8"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0dde43e75fd43e8a1bf86103336bc699aa8d17ad1be60c76c0bdfd4828e19b78"
+dependencies = [
+ "autocfg 1.1.0",
+]
+
+[[package]]
+name = "autocfg"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
@@ -543,7 +552,7 @@ version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1145cf131a2c6ba0615079ab6a638f7e1973ac9c2634fcbeaaad6114246efe8c"
dependencies = [
- "autocfg",
+ "autocfg 1.1.0",
"cfg-if 1.0.0",
"crossbeam-utils 0.8.8",
"lazy_static",
@@ -985,6 +994,7 @@ dependencies = [
"sha2",
"static_init",
"structopt",
+ "timeago",
"tokio",
"toml",
"tracing",
@@ -1631,7 +1641,7 @@ version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282a6247722caba404c065016bbfa522806e51714c34f5dfc3e4a3a46fcb4223"
dependencies = [
- "autocfg",
+ "autocfg 1.1.0",
"hashbrown",
]
@@ -1654,6 +1664,16 @@ dependencies = [
]
[[package]]
+name = "isolang"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "265ef164908329e47e753c769b14cbb27434abf0c41984dca201484022f09ce5"
+dependencies = [
+ "phf",
+ "phf_codegen",
+]
+
+[[package]]
name = "itertools"
version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1977,7 +1997,7 @@ version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce"
dependencies = [
- "autocfg",
+ "autocfg 1.1.0",
]
[[package]]
@@ -2139,7 +2159,7 @@ version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db"
dependencies = [
- "autocfg",
+ "autocfg 1.1.0",
"num-traits",
]
@@ -2149,7 +2169,7 @@ version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290"
dependencies = [
- "autocfg",
+ "autocfg 1.1.0",
]
[[package]]
@@ -2218,7 +2238,7 @@ version = "0.9.72"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e46109c383602735fa0a2e48dd2b7c892b048e1bf69e5c3b1d804b7d9c203cb"
dependencies = [
- "autocfg",
+ "autocfg 1.1.0",
"cc",
"libc",
"openssl-src",
@@ -2389,6 +2409,44 @@ dependencies = [
]
[[package]]
+name = "phf"
+version = "0.7.24"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b3da44b85f8e8dfaec21adae67f95d93244b2ecf6ad2a692320598dcc8e6dd18"
+dependencies = [
+ "phf_shared",
+]
+
+[[package]]
+name = "phf_codegen"
+version = "0.7.24"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b03e85129e324ad4166b06b2c7491ae27fe3ec353af72e72cd1654c7225d517e"
+dependencies = [
+ "phf_generator",
+ "phf_shared",
+]
+
+[[package]]
+name = "phf_generator"
+version = "0.7.24"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "09364cc93c159b8b06b1f4dd8a4398984503483891b0c26b867cf431fb132662"
+dependencies = [
+ "phf_shared",
+ "rand 0.6.5",
+]
+
+[[package]]
+name = "phf_shared"
+version = "0.7.24"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "234f71a15de2288bcb7e3b6515828d22af7ec8598ee6d24c3b526fa0a80b67a0"
+dependencies = [
+ "siphasher",
+]
+
+[[package]]
name = "pin-project"
version = "0.4.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2644,17 +2702,46 @@ dependencies = [
[[package]]
name = "rand"
+version = "0.6.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6d71dacdc3c88c1fde3885a3be3fbab9f35724e6ce99467f7d9c5026132184ca"
+dependencies = [
+ "autocfg 0.1.8",
+ "libc",
+ "rand_chacha 0.1.1",
+ "rand_core 0.4.2",
+ "rand_hc",
+ "rand_isaac",
+ "rand_jitter",
+ "rand_os",
+ "rand_pcg",
+ "rand_xorshift",
+ "winapi",
+]
+
+[[package]]
+name = "rand"
version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
- "rand_chacha",
+ "rand_chacha 0.3.1",
"rand_core 0.6.3",
]
[[package]]
name = "rand_chacha"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "556d3a1ca6600bfcbab7c7c91ccb085ac7fbbcd70e008a98742e7847f4f7bcef"
+dependencies = [
+ "autocfg 0.1.8",
+ "rand_core 0.3.1",
+]
+
+[[package]]
+name = "rand_chacha"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88"
@@ -2688,6 +2775,77 @@ dependencies = [
]
[[package]]
+name = "rand_hc"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7b40677c7be09ae76218dc623efbf7b18e34bced3f38883af07bb75630a21bc4"
+dependencies = [
+ "rand_core 0.3.1",
+]
+
+[[package]]
+name = "rand_isaac"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ded997c9d5f13925be2a6fd7e66bf1872597f759fd9dd93513dd7e92e5a5ee08"
+dependencies = [
+ "rand_core 0.3.1",
+]
+
+[[package]]
+name = "rand_jitter"
+version = "0.1.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1166d5c91dc97b88d1decc3285bb0a99ed84b05cfd0bc2341bdf2d43fc41e39b"
+dependencies = [
+ "libc",
+ "rand_core 0.4.2",
+ "winapi",
+]
+
+[[package]]
+name = "rand_os"
+version = "0.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7b75f676a1e053fc562eafbb47838d67c84801e38fc1ba459e8f180deabd5071"
+dependencies = [
+ "cloudabi",
+ "fuchsia-cprng",
+ "libc",
+ "rand_core 0.4.2",
+ "rdrand",
+ "winapi",
+]
+
+[[package]]
+name = "rand_pcg"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "abf9b09b01790cfe0364f52bf32995ea3c39f4d2dd011eac241d2914146d0b44"
+dependencies = [
+ "autocfg 0.1.8",
+ "rand_core 0.4.2",
+]
+
+[[package]]
+name = "rand_xorshift"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cbf7e9e623549b0e21f6e97cf8ecf247c1a8fd2e8a992ae265314300b2455d5c"
+dependencies = [
+ "rand_core 0.3.1",
+]
+
+[[package]]
+name = "rdrand"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2"
+dependencies = [
+ "rand_core 0.3.1",
+]
+
+[[package]]
name = "redox_syscall"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -3110,6 +3268,12 @@ dependencies = [
]
[[package]]
+name = "siphasher"
+version = "0.2.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0b8de496cf83d4ed58b6be86c3a275b8602f6ffe98d3024a869e124147a9a3ac"
+
+[[package]]
name = "slab"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -3358,6 +3522,16 @@ dependencies = [
]
[[package]]
+name = "timeago"
+version = "0.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "6ec32dde57efb15c035ac074118d7f32820451395f28cb0524a01d4e94983b26"
+dependencies = [
+ "chrono",
+ "isolang",
+]
+
+[[package]]
name = "tinyvec"
version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Makefile b/Makefile
index eeeffedb..406230c9 100644
--- a/Makefile
+++ b/Makefile
@@ -1,4 +1,4 @@
-.PHONY: doc all release shell
+.PHONY: doc all release shell run1 run2 run3
all:
clear; cargo build --all-features
@@ -11,3 +11,12 @@ release:
shell:
nix-shell
+
+run1:
+ RUST_LOG=garage=debug ./target/debug/garage -c tmp/config.1.toml server
+
+run2:
+ RUST_LOG=garage=debug ./target/debug/garage -c tmp/config.2.toml server
+
+run3:
+ RUST_LOG=garage=debug ./target/debug/garage -c tmp/config.3.toml server
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index 7678ea26..8948e750 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -32,6 +32,7 @@ garage_web = { version = "0.7.0", path = "../web" }
bytes = "1.0"
bytesize = "1.1"
+timeago = "0.3"
hex = "0.4"
tracing = { version = "0.1.30", features = ["log-always"] }
pretty_env_logger = "0.4"
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index c98f5142..9c6a0c57 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -5,7 +5,6 @@ use std::sync::Arc;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
-use garage_util::background::*;
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
@@ -49,7 +48,10 @@ pub enum AdminRpc {
},
KeyList(Vec<(String, String)>),
KeyInfo(Key, HashMap<Uuid, Bucket>),
- WorkerList(HashMap<usize, garage_util::background::WorkerInfo>),
+ WorkerList(
+ HashMap<usize, garage_util::background::WorkerInfo>,
+ WorkerListOpt,
+ ),
}
impl Rpc for AdminRpc {
@@ -830,19 +832,9 @@ impl AdminRpcHandler {
async fn handle_worker_cmd(&self, opt: WorkerOpt) -> Result<AdminRpc, Error> {
match opt.cmd {
- WorkerCmd::List { busy } => {
+ WorkerCmd::List { opt } => {
let workers = self.garage.background.get_worker_info();
- let workers = if busy {
- workers
- .into_iter()
- .filter(|(_, w)| {
- matches!(w.status, WorkerStatus::Busy | WorkerStatus::Throttled(_))
- })
- .collect()
- } else {
- workers
- };
- Ok(AdminRpc::WorkerList(workers))
+ Ok(AdminRpc::WorkerList(workers, opt))
}
}
}
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index 38a58b76..1aa2c2ff 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -1,4 +1,5 @@
use std::collections::HashSet;
+use std::time::Duration;
use garage_util::error::*;
use garage_util::formater::format_table;
@@ -101,6 +102,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tLast seen".to_string()];
for adv in status.iter().filter(|adv| !adv.is_up) {
if let Some(NodeRoleV(Some(cfg))) = layout.roles.get(&adv.id) {
+ let tf = timeago::Formatter::new();
failed_nodes.push(format!(
"{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}",
id = adv.id,
@@ -111,7 +113,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
capacity = cfg.capacity_string(),
last_seen = adv
.last_seen_secs_ago
- .map(|s| format!("{}s ago", s))
+ .map(|s| tf.convert(Duration::from_secs(s)))
.unwrap_or_else(|| "never seen".into()),
));
}
@@ -183,8 +185,8 @@ pub async fn cmd_admin(
AdminRpc::KeyInfo(key, rb) => {
print_key_info(&key, &rb);
}
- AdminRpc::WorkerList(wi) => {
- print_worker_info(wi);
+ AdminRpc::WorkerList(wi, wlo) => {
+ print_worker_info(wi, wlo);
}
r => {
error!("Unexpected response: {:?}", r);
diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs
index f6b2d197..c1ee32ab 100644
--- a/src/garage/cli/structs.rs
+++ b/src/garage/cli/structs.rs
@@ -476,8 +476,17 @@ pub enum WorkerCmd {
/// List all workers on Garage node
#[structopt(name = "list")]
List {
- /// Show only busy workers
- #[structopt(short = "b", long = "busy")]
- busy: bool,
+ #[structopt(flatten)]
+ opt: WorkerListOpt,
},
}
+
+#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
+pub struct WorkerListOpt {
+ /// Show only busy workers
+ #[structopt(short = "b", long = "busy")]
+ pub busy: bool,
+ /// Show only workers with errors
+ #[structopt(short = "e", long = "errors")]
+ pub errors: bool,
+}
diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs
index ddb353f9..fc5a9932 100644
--- a/src/garage/cli/util.rs
+++ b/src/garage/cli/util.rs
@@ -1,15 +1,19 @@
use std::collections::HashMap;
+use std::time::Duration;
use garage_util::background::*;
use garage_util::crdt::*;
use garage_util::data::Uuid;
use garage_util::error::*;
use garage_util::formater::format_table;
+use garage_util::time::*;
use garage_model::bucket_table::*;
use garage_model::key_table::*;
use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS};
+use crate::cli::structs::WorkerListOpt;
+
pub fn print_bucket_list(bl: Vec<Bucket>) {
println!("List of buckets:");
@@ -237,7 +241,7 @@ pub fn find_matching_node(
}
}
-pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>) {
+pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) {
let mut wi = wi.into_iter().collect::<Vec<_>>();
wi.sort_by_key(|(tid, info)| {
(
@@ -252,23 +256,39 @@ pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>) {
let mut table = vec![];
for (tid, info) in wi.iter() {
+ if wlo.busy && !matches!(info.status, WorkerStatus::Busy | WorkerStatus::Throttled(_)) {
+ continue;
+ }
+ if wlo.errors && info.errors == 0 {
+ continue;
+ }
+
table.push(format!("{}\t{:?}\t{}", tid, info.status, info.name));
if let Some(i) = &info.info {
- table.push(format!("\t\t{}", i));
+ table.push(format!("\t\t {}", i));
}
+ let tf = timeago::Formatter::new();
+ let (err_ago, err_msg) = info
+ .last_error
+ .as_ref()
+ .map(|(m, t)| {
+ (
+ tf.convert(Duration::from_millis(now_msec() - t)),
+ m.as_str(),
+ )
+ })
+ .unwrap_or(("(?) ago".into(), "(?)"));
if info.consecutive_errors > 0 {
table.push(format!(
- "\t\t{} CONSECUTIVE ERRORS ({} total), last: {}",
- info.consecutive_errors,
- info.errors,
- info.last_error.as_deref().unwrap_or("(?)")
+ "\t\t {} consecutive errors ({} total), last {}",
+ info.consecutive_errors, info.errors, err_ago,
));
+ table.push(format!("\t\t {}", err_msg));
} else if info.errors > 0 {
- table.push(format!(
- "\t\t{} errors, last: {}",
- info.errors,
- info.last_error.as_deref().unwrap_or("(?)")
- ));
+ table.push(format!("\t\t ({} errors, last {})", info.errors, err_ago,));
+ if wlo.errors {
+ table.push(format!("\t\t {}", err_msg));
+ }
}
}
format_table(table);
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index 9e29b421..474ec12c 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -408,7 +408,11 @@ impl<T: CountedItem> Worker for IndexPropagatorWorker<T> {
}
fn info(&self) -> Option<String> {
- Some(format!("{} items in queue", self.buf.len()))
+ if !self.buf.is_empty() {
+ Some(format!("{} items in queue", self.buf.len()))
+ } else {
+ None
+ }
}
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error> {
diff --git a/src/table/gc.rs b/src/table/gc.rs
index d088a11c..0899d5e5 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -357,7 +357,10 @@ where
}
}
- async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus {
+ async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus {
+ if *must_exit.borrow() {
+ return WorkerStatus::Done;
+ }
tokio::time::sleep(self.wait_delay).await;
WorkerStatus::Busy
}
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index 06d131cb..4c84933a 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -329,7 +329,10 @@ where
self.0.updater_loop_iter()
}
- async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus {
+ async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus {
+ if *must_exit.borrow() {
+ return WorkerStatus::Done;
+ }
tokio::time::sleep(Duration::from_secs(10)).await;
WorkerStatus::Busy
}
diff --git a/src/table/sync.rs b/src/table/sync.rs
index a331ec75..a7e1994c 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -595,7 +595,10 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWor
}
}
- async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus {
+ async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus {
+ if *must_exit.borrow() {
+ return WorkerStatus::Done;
+ }
select! {
s = self.add_full_sync_rx.recv() => {
if let Some(()) = s {
diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs
index f7e15b80..636b9c13 100644
--- a/src/util/background/mod.rs
+++ b/src/util/background/mod.rs
@@ -33,7 +33,7 @@ pub struct WorkerInfo {
pub status: WorkerStatus,
pub errors: usize,
pub consecutive_errors: usize,
- pub last_error: Option<String>,
+ pub last_error: Option<(String, u64)>,
}
impl BackgroundRunner {
diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs
index e4a04250..c08a0aaa 100644
--- a/src/util/background/worker.rs
+++ b/src/util/background/worker.rs
@@ -13,6 +13,7 @@ use tracing::*;
use crate::background::WorkerInfo;
use crate::error::Error;
+use crate::time::now_msec;
#[derive(PartialEq, Copy, Clone, Debug, Serialize, Deserialize)]
pub enum WorkerStatus {
@@ -167,7 +168,7 @@ impl WorkerProcessor {
select! {
_ = drain_everything => {
- info!("All workers exited in time \\o/");
+ info!("All workers exited peacefully \\o/");
}
_ = tokio::time::sleep(Duration::from_secs(9)) => {
error!("Some workers could not exit in time, we are cancelling some things in the middle");
@@ -176,7 +177,6 @@ impl WorkerProcessor {
}
}
-// TODO add tranquilizer
struct WorkerHandler {
task_id: usize,
stop_signal: watch::Receiver<bool>,
@@ -185,7 +185,7 @@ struct WorkerHandler {
status: WorkerStatus,
errors: usize,
consecutive_errors: usize,
- last_error: Option<String>,
+ last_error: Option<(String, u64)>,
}
impl WorkerHandler {
@@ -205,7 +205,7 @@ impl WorkerHandler {
);
self.errors += 1;
self.consecutive_errors += 1;
- self.last_error = Some(format!("{}", e));
+ self.last_error = Some((format!("{}", e), now_msec()));
// Sleep a bit so that error won't repeat immediately, exponential backoff
// strategy (min 1sec, max ~60sec)
self.status = WorkerStatus::Throttled(
@@ -215,7 +215,12 @@ impl WorkerHandler {
},
WorkerStatus::Throttled(delay) => {
// Sleep for given delay and go back to busy state
- tokio::time::sleep(Duration::from_secs_f32(delay)).await;
+ if !*self.stop_signal.borrow() {
+ select! {
+ _ = tokio::time::sleep(Duration::from_secs_f32(delay)) => (),
+ _ = self.stop_signal.changed() => (),
+ }
+ }
self.status = WorkerStatus::Busy;
}
WorkerStatus::Idle => {