aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-03-15 20:09:44 +0100
committerAlex Auvolat <alex@adnab.me>2021-03-15 20:09:44 +0100
commit4d4117f2b4eb69b63e2329f6e0b8929e6a8b5b31 (patch)
tree03da6808b6f0391293ab5a252d5efd1328acdefc /src
parent667e4e72a8e64a094d57ceeb6442cef08f1ef0e1 (diff)
downloadgarage-4d4117f2b4eb69b63e2329f6e0b8929e6a8b5b31.tar.gz
garage-4d4117f2b4eb69b63e2329f6e0b8929e6a8b5b31.zip
Refactor block resync loop; make workers infaillible
Diffstat (limited to 'src')
-rw-r--r--src/garage/repair.rs10
-rw-r--r--src/model/block.rs58
-rw-r--r--src/rpc/membership.rs5
-rw-r--r--src/table/gc.rs3
-rw-r--r--src/table/merkle.rs3
-rw-r--r--src/table/sync.rs10
-rw-r--r--src/util/background.rs9
7 files changed, 50 insertions, 48 deletions
diff --git a/src/garage/repair.rs b/src/garage/repair.rs
index 599c1965..4ee66452 100644
--- a/src/garage/repair.rs
+++ b/src/garage/repair.rs
@@ -20,6 +20,16 @@ impl Repair {
&self,
opt: RepairOpt,
must_exit: watch::Receiver<bool>,
+ ) {
+ if let Err(e) = self.repair_worker_aux(opt, must_exit).await {
+ warn!("Repair worker failed with error: {}", e);
+ }
+ }
+
+ async fn repair_worker_aux(
+ &self,
+ opt: RepairOpt,
+ must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
let todo = |x| opt.what.as_ref().map(|y| *y == x).unwrap_or(true);
diff --git a/src/model/block.rs b/src/model/block.rs
index 9fe6c76b..023ed3ab 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -258,46 +258,48 @@ impl BlockManager {
async fn resync_loop(
self: Arc<Self>,
mut must_exit: watch::Receiver<bool>,
- ) -> Result<(), Error> {
- let mut n_failures = 0usize;
+ ) {
while !*must_exit.borrow() {
- if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? {
- let time_msec = u64_from_be_bytes(&time_bytes[0..8]);
- let now = now_msec();
- if now >= time_msec {
- let hash = Hash::try_from(&hash_bytes[..]).unwrap();
-
- if let Err(e) = self.resync_iter(&hash).await {
- warn!("Failed to resync block {:?}, retrying later: {}", hash, e);
- self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT)?;
- n_failures += 1;
- if n_failures >= 10 {
- warn!("Too many resync failures, throttling.");
- tokio::time::delay_for(Duration::from_secs(1)).await;
- }
- } else {
- n_failures = 0;
- }
- } else {
- self.resync_queue.insert(time_bytes, hash_bytes)?;
- let delay = tokio::time::delay_for(Duration::from_millis(time_msec - now));
- select! {
- _ = delay.fuse() => (),
- _ = self.resync_notify.notified().fuse() => (),
- _ = must_exit.recv().fuse() => (),
- }
+ if let Err(e) = self.resync_iter(&mut must_exit).await {
+ warn!("Error in block resync loop: {}", e);
+ tokio::time::delay_for(Duration::from_secs(10)).await;
+ }
+ }
+ }
+
+
+ async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> {
+ if let Some(first_item) = self.resync_queue.iter().next() {
+ let (time_bytes, hash_bytes) = first_item?;
+ let time_msec = u64_from_be_bytes(&time_bytes[0..8]);
+ let now = now_msec();
+ if now >= time_msec {
+ let hash = Hash::try_from(&hash_bytes[..]).unwrap();
+ let res = self.resync_block(&hash).await;
+ if let Err(e) = &res {
+ warn!("Error when resyncing {:?}: {}", hash, e);
+ self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT)?;
}
+ self.resync_queue.remove(&time_bytes)?;
+ res?; // propagate error to delay main loop
} else {
+ let delay = tokio::time::delay_for(Duration::from_millis(time_msec - now));
select! {
+ _ = delay.fuse() => (),
_ = self.resync_notify.notified().fuse() => (),
_ = must_exit.recv().fuse() => (),
}
}
+ } else {
+ select! {
+ _ = self.resync_notify.notified().fuse() => (),
+ _ = must_exit.recv().fuse() => (),
+ }
}
Ok(())
}
- async fn resync_iter(&self, hash: &Hash) -> Result<(), Error> {
+ async fn resync_block(&self, hash: &Hash) -> Result<(), Error> {
let lock = self.data_dir_lock.lock().await;
let path = self.block_path(hash);
diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs
index 6636e50b..6749478a 100644
--- a/src/rpc/membership.rs
+++ b/src/rpc/membership.rs
@@ -318,9 +318,7 @@ impl System {
let self2 = self.clone();
self.clone()
.background
- .spawn_worker(format!("ping loop"), |stop_signal| {
- self2.ping_loop(stop_signal).map(Ok)
- });
+ .spawn_worker(format!("ping loop"), |stop_signal| self2.ping_loop(stop_signal));
if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) {
let self2 = self.clone();
@@ -329,7 +327,6 @@ impl System {
.spawn_worker(format!("Consul loop"), |stop_signal| {
self2
.consul_loop(stop_signal, consul_host, consul_service_name)
- .map(Ok)
});
}
}
diff --git a/src/table/gc.rs b/src/table/gc.rs
index c13c8234..fd9a26d1 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -70,7 +70,7 @@ where
gc
}
- async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) -> Result<(), Error> {
+ async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
while !*must_exit.borrow() {
match self.gc_loop_iter().await {
Ok(true) => {
@@ -89,7 +89,6 @@ where
_ = must_exit.recv().fuse() => (),
}
}
- Ok(())
}
async fn gc_loop_iter(&self) -> Result<bool, Error> {
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index aefb5169..5ce9cee3 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -104,7 +104,7 @@ impl MerkleUpdater {
async fn updater_loop(
self: Arc<Self>,
mut must_exit: watch::Receiver<bool>,
- ) -> Result<(), Error> {
+ ) {
while !*must_exit.borrow() {
if let Some(x) = self.todo.iter().next() {
match x {
@@ -131,7 +131,6 @@ impl MerkleUpdater {
}
}
}
- Ok(())
}
fn update_item(&self, k: &[u8], vhash_by: &[u8]) -> Result<(), Error> {
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 6c8792d2..b344eb88 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -136,7 +136,7 @@ where
self: Arc<Self>,
mut must_exit: watch::Receiver<bool>,
mut busy_rx: mpsc::UnboundedReceiver<bool>,
- ) -> Result<(), Error> {
+ ) {
let mut prev_ring: Arc<Ring> = self.aux.system.ring.borrow().clone();
let mut ring_recv: watch::Receiver<Arc<Ring>> = self.aux.system.ring.clone();
let mut nothing_to_do_since = Some(Instant::now());
@@ -183,7 +183,6 @@ where
}
}
}
- Ok(())
}
pub fn add_full_sync(&self) {
@@ -197,11 +196,11 @@ where
self: Arc<Self>,
mut must_exit: watch::Receiver<bool>,
busy_tx: mpsc::UnboundedSender<bool>,
- ) -> Result<(), Error> {
+ ) {
while !*must_exit.borrow() {
let task = self.todo.lock().unwrap().pop_task();
if let Some(partition) = task {
- busy_tx.send(true)?;
+ busy_tx.send(true).unwrap();
let res = self
.clone()
.sync_partition(&partition, &mut must_exit)
@@ -213,11 +212,10 @@ where
);
}
} else {
- busy_tx.send(false)?;
+ busy_tx.send(false).unwrap();
tokio::time::delay_for(Duration::from_secs(1)).await;
}
}
- Ok(())
}
async fn sync_partition(
diff --git a/src/util/background.rs b/src/util/background.rs
index 8081f157..3e600fdf 100644
--- a/src/util/background.rs
+++ b/src/util/background.rs
@@ -76,16 +76,13 @@ impl BackgroundRunner {
pub fn spawn_worker<F, T>(&self, name: String, worker: F)
where
F: FnOnce(watch::Receiver<bool>) -> T + Send + 'static,
- T: Future<Output = JobOutput> + Send + 'static,
+ T: Future<Output = ()> + Send + 'static,
{
let mut workers = self.workers.lock().unwrap();
let stop_signal = self.stop_signal.clone();
workers.push(tokio::spawn(async move {
- if let Err(e) = worker(stop_signal).await {
- error!("Worker stopped with error: {}, error: {}", name, e);
- } else {
- info!("Worker exited successfully: {}", name);
- }
+ worker(stop_signal).await;
+ info!("Worker exited: {}", name);
}));
}