diff options
-rw-r--r-- | src/rpc/system.rs | 29 | ||||
-rw-r--r-- | src/table/table.rs | 15 | ||||
-rw-r--r-- | src/util/background/mod.rs | 16 |
3 files changed, 23 insertions, 37 deletions
diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 3b321a7d..f03df509 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -21,7 +21,6 @@ use netapp::peering::fullmesh::FullMeshPeeringStrategy; use netapp::util::parse_and_resolve_peer_addr_async; use netapp::{NetApp, NetworkKey, NodeID, NodeKey}; -use garage_util::background::{self}; use garage_util::config::Config; #[cfg(feature = "kubernetes-discovery")] use garage_util::config::KubernetesDiscoveryConfig; @@ -622,11 +621,7 @@ impl System { if info.cluster_layout_version > local_info.cluster_layout_version || info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash { - let self2 = self.clone(); - background::spawn(async move { - self2.pull_cluster_layout(from).await; - Ok(()) - }); + tokio::spawn(self.clone().pull_cluster_layout(from)); } self.node_status @@ -668,16 +663,18 @@ impl System { drop(update_ring); let self2 = self.clone(); - background::spawn(async move { - self2 + tokio::spawn(async move { + if let Err(e) = self2 .rpc .broadcast( &self2.system_endpoint, SystemRpc::AdvertiseClusterLayout(layout), RequestStrategy::with_priority(PRIO_HIGH), ) - .await?; - Ok(()) + .await + { + warn!("Error while broadcasting new cluster layout: {}", e); + } }); self.save_cluster_layout().await?; @@ -766,12 +763,12 @@ impl System { } for (node_id, node_addr) in ping_list { - background::spawn( - self.netapp - .clone() - .try_connect(node_addr, node_id) - .map(move |r| r.err_context(connect_error_message(node_addr, node_id))), - ); + let self2 = self.clone(); + tokio::spawn(async move { + if let Err(e) = self2.netapp.clone().try_connect(node_addr, node_id).await { + error!("{}\n{}", connect_error_message(node_addr, node_id), e); + } + }); } } diff --git a/src/table/table.rs b/src/table/table.rs index 4d93102e..bbcd5971 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -14,7 +14,7 @@ use opentelemetry::{ use garage_db as db; -use garage_util::background::{self, BackgroundRunner}; +use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; use garage_util::metrics::RecordDuration; @@ -275,7 +275,11 @@ where if not_all_same { let self2 = self.clone(); let ent2 = ret_entry.clone(); - background::spawn(async move { self2.repair_on_read(&who[..], ent2).await }); + tokio::spawn(async move { + if let Err(e) = self2.repair_on_read(&who[..], ent2).await { + warn!("Error doing repair on read: {}", e); + } + }); } } @@ -372,11 +376,12 @@ where .into_iter() .map(|k| ret.get(&k).unwrap().clone()) .collect::<Vec<_>>(); - background::spawn(async move { + tokio::spawn(async move { for v in to_repair { - self2.repair_on_read(&who[..], v).await?; + if let Err(e) = self2.repair_on_read(&who[..], v).await { + warn!("Error doing repair on read: {}", e); + } } - Ok(()) }); } diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs index 0bb4fb67..41b48e93 100644 --- a/src/util/background/mod.rs +++ b/src/util/background/mod.rs @@ -2,20 +2,15 @@ pub mod worker; -use core::future::Future; - use std::collections::HashMap; use std::sync::Arc; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, watch}; -use crate::error::Error; use worker::WorkerProcessor; pub use worker::{Worker, WorkerState}; -pub(crate) type JobOutput = Result<(), Error>; - /// Job runner for futures and async functions pub struct BackgroundRunner { send_worker: mpsc::UnboundedSender<Box<dyn Worker>>, @@ -77,14 +72,3 @@ impl BackgroundRunner { .expect("Could not put worker in queue"); } } - -pub fn spawn<T>(job: T) -where - T: Future<Output = JobOutput> + Send + 'static, -{ - tokio::spawn(async move { - if let Err(e) = job.await { - error!("{}", e); - } - }); -} |