aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rpc/system.rs29
-rw-r--r--src/table/table.rs15
-rw-r--r--src/util/background/mod.rs16
3 files changed, 23 insertions, 37 deletions
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 3b321a7d..f03df509 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -21,7 +21,6 @@ use netapp::peering::fullmesh::FullMeshPeeringStrategy;
use netapp::util::parse_and_resolve_peer_addr_async;
use netapp::{NetApp, NetworkKey, NodeID, NodeKey};
-use garage_util::background::{self};
use garage_util::config::Config;
#[cfg(feature = "kubernetes-discovery")]
use garage_util::config::KubernetesDiscoveryConfig;
@@ -622,11 +621,7 @@ impl System {
if info.cluster_layout_version > local_info.cluster_layout_version
|| info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash
{
- let self2 = self.clone();
- background::spawn(async move {
- self2.pull_cluster_layout(from).await;
- Ok(())
- });
+ tokio::spawn(self.clone().pull_cluster_layout(from));
}
self.node_status
@@ -668,16 +663,18 @@ impl System {
drop(update_ring);
let self2 = self.clone();
- background::spawn(async move {
- self2
+ tokio::spawn(async move {
+ if let Err(e) = self2
.rpc
.broadcast(
&self2.system_endpoint,
SystemRpc::AdvertiseClusterLayout(layout),
RequestStrategy::with_priority(PRIO_HIGH),
)
- .await?;
- Ok(())
+ .await
+ {
+ warn!("Error while broadcasting new cluster layout: {}", e);
+ }
});
self.save_cluster_layout().await?;
@@ -766,12 +763,12 @@ impl System {
}
for (node_id, node_addr) in ping_list {
- background::spawn(
- self.netapp
- .clone()
- .try_connect(node_addr, node_id)
- .map(move |r| r.err_context(connect_error_message(node_addr, node_id))),
- );
+ let self2 = self.clone();
+ tokio::spawn(async move {
+ if let Err(e) = self2.netapp.clone().try_connect(node_addr, node_id).await {
+ error!("{}\n{}", connect_error_message(node_addr, node_id), e);
+ }
+ });
}
}
diff --git a/src/table/table.rs b/src/table/table.rs
index 4d93102e..bbcd5971 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -14,7 +14,7 @@ use opentelemetry::{
use garage_db as db;
-use garage_util::background::{self, BackgroundRunner};
+use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
use garage_util::metrics::RecordDuration;
@@ -275,7 +275,11 @@ where
if not_all_same {
let self2 = self.clone();
let ent2 = ret_entry.clone();
- background::spawn(async move { self2.repair_on_read(&who[..], ent2).await });
+ tokio::spawn(async move {
+ if let Err(e) = self2.repair_on_read(&who[..], ent2).await {
+ warn!("Error doing repair on read: {}", e);
+ }
+ });
}
}
@@ -372,11 +376,12 @@ where
.into_iter()
.map(|k| ret.get(&k).unwrap().clone())
.collect::<Vec<_>>();
- background::spawn(async move {
+ tokio::spawn(async move {
for v in to_repair {
- self2.repair_on_read(&who[..], v).await?;
+ if let Err(e) = self2.repair_on_read(&who[..], v).await {
+ warn!("Error doing repair on read: {}", e);
+ }
}
- Ok(())
});
}
diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs
index 0bb4fb67..41b48e93 100644
--- a/src/util/background/mod.rs
+++ b/src/util/background/mod.rs
@@ -2,20 +2,15 @@
pub mod worker;
-use core::future::Future;
-
use std::collections::HashMap;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, watch};
-use crate::error::Error;
use worker::WorkerProcessor;
pub use worker::{Worker, WorkerState};
-pub(crate) type JobOutput = Result<(), Error>;
-
/// Job runner for futures and async functions
pub struct BackgroundRunner {
send_worker: mpsc::UnboundedSender<Box<dyn Worker>>,
@@ -77,14 +72,3 @@ impl BackgroundRunner {
.expect("Could not put worker in queue");
}
}
-
-pub fn spawn<T>(job: T)
-where
- T: Future<Output = JobOutput> + Send + 'static,
-{
- tokio::spawn(async move {
- if let Err(e) = job.await {
- error!("{}", e);
- }
- });
-}