diff options
-rw-r--r-- | src/garage/repair.rs | 10 | ||||
-rw-r--r-- | src/model/block.rs | 58 | ||||
-rw-r--r-- | src/rpc/membership.rs | 5 | ||||
-rw-r--r-- | src/table/gc.rs | 3 | ||||
-rw-r--r-- | src/table/merkle.rs | 3 | ||||
-rw-r--r-- | src/table/sync.rs | 10 | ||||
-rw-r--r-- | src/util/background.rs | 9 |
7 files changed, 50 insertions, 48 deletions
diff --git a/src/garage/repair.rs b/src/garage/repair.rs index 599c1965..4ee66452 100644 --- a/src/garage/repair.rs +++ b/src/garage/repair.rs @@ -20,6 +20,16 @@ impl Repair { &self, opt: RepairOpt, must_exit: watch::Receiver<bool>, + ) { + if let Err(e) = self.repair_worker_aux(opt, must_exit).await { + warn!("Repair worker failed with error: {}", e); + } + } + + async fn repair_worker_aux( + &self, + opt: RepairOpt, + must_exit: watch::Receiver<bool>, ) -> Result<(), Error> { let todo = |x| opt.what.as_ref().map(|y| *y == x).unwrap_or(true); diff --git a/src/model/block.rs b/src/model/block.rs index 9fe6c76b..023ed3ab 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -258,46 +258,48 @@ impl BlockManager { async fn resync_loop( self: Arc<Self>, mut must_exit: watch::Receiver<bool>, - ) -> Result<(), Error> { - let mut n_failures = 0usize; + ) { while !*must_exit.borrow() { - if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? { - let time_msec = u64_from_be_bytes(&time_bytes[0..8]); - let now = now_msec(); - if now >= time_msec { - let hash = Hash::try_from(&hash_bytes[..]).unwrap(); - - if let Err(e) = self.resync_iter(&hash).await { - warn!("Failed to resync block {:?}, retrying later: {}", hash, e); - self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT)?; - n_failures += 1; - if n_failures >= 10 { - warn!("Too many resync failures, throttling."); - tokio::time::delay_for(Duration::from_secs(1)).await; - } - } else { - n_failures = 0; - } - } else { - self.resync_queue.insert(time_bytes, hash_bytes)?; - let delay = tokio::time::delay_for(Duration::from_millis(time_msec - now)); - select! { - _ = delay.fuse() => (), - _ = self.resync_notify.notified().fuse() => (), - _ = must_exit.recv().fuse() => (), - } + if let Err(e) = self.resync_iter(&mut must_exit).await { + warn!("Error in block resync loop: {}", e); + tokio::time::delay_for(Duration::from_secs(10)).await; + } + } + } + + + async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> { + if let Some(first_item) = self.resync_queue.iter().next() { + let (time_bytes, hash_bytes) = first_item?; + let time_msec = u64_from_be_bytes(&time_bytes[0..8]); + let now = now_msec(); + if now >= time_msec { + let hash = Hash::try_from(&hash_bytes[..]).unwrap(); + let res = self.resync_block(&hash).await; + if let Err(e) = &res { + warn!("Error when resyncing {:?}: {}", hash, e); + self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT)?; } + self.resync_queue.remove(&time_bytes)?; + res?; // propagate error to delay main loop } else { + let delay = tokio::time::delay_for(Duration::from_millis(time_msec - now)); select! { + _ = delay.fuse() => (), _ = self.resync_notify.notified().fuse() => (), _ = must_exit.recv().fuse() => (), } } + } else { + select! { + _ = self.resync_notify.notified().fuse() => (), + _ = must_exit.recv().fuse() => (), + } } Ok(()) } - async fn resync_iter(&self, hash: &Hash) -> Result<(), Error> { + async fn resync_block(&self, hash: &Hash) -> Result<(), Error> { let lock = self.data_dir_lock.lock().await; let path = self.block_path(hash); diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index 6636e50b..6749478a 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -318,9 +318,7 @@ impl System { let self2 = self.clone(); self.clone() .background - .spawn_worker(format!("ping loop"), |stop_signal| { - self2.ping_loop(stop_signal).map(Ok) - }); + .spawn_worker(format!("ping loop"), |stop_signal| self2.ping_loop(stop_signal)); if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) { let self2 = self.clone(); @@ -329,7 +327,6 @@ impl System { .spawn_worker(format!("Consul loop"), |stop_signal| { self2 .consul_loop(stop_signal, consul_host, consul_service_name) - .map(Ok) }); } } diff --git a/src/table/gc.rs b/src/table/gc.rs index c13c8234..fd9a26d1 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -70,7 +70,7 @@ where gc } - async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) -> Result<(), Error> { + async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) { while !*must_exit.borrow() { match self.gc_loop_iter().await { Ok(true) => { @@ -89,7 +89,6 @@ where _ = must_exit.recv().fuse() => (), } } - Ok(()) } async fn gc_loop_iter(&self) -> Result<bool, Error> { diff --git a/src/table/merkle.rs b/src/table/merkle.rs index aefb5169..5ce9cee3 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -104,7 +104,7 @@ impl MerkleUpdater { async fn updater_loop( self: Arc<Self>, mut must_exit: watch::Receiver<bool>, - ) -> Result<(), Error> { + ) { while !*must_exit.borrow() { if let Some(x) = self.todo.iter().next() { match x { @@ -131,7 +131,6 @@ impl MerkleUpdater { } } } - Ok(()) } fn update_item(&self, k: &[u8], vhash_by: &[u8]) -> Result<(), Error> { diff --git a/src/table/sync.rs b/src/table/sync.rs index 6c8792d2..b344eb88 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -136,7 +136,7 @@ where self: Arc<Self>, mut must_exit: watch::Receiver<bool>, mut busy_rx: mpsc::UnboundedReceiver<bool>, - ) -> Result<(), Error> { + ) { let mut prev_ring: Arc<Ring> = self.aux.system.ring.borrow().clone(); let mut ring_recv: watch::Receiver<Arc<Ring>> = self.aux.system.ring.clone(); let mut nothing_to_do_since = Some(Instant::now()); @@ -183,7 +183,6 @@ where } } } - Ok(()) } pub fn add_full_sync(&self) { @@ -197,11 +196,11 @@ where self: Arc<Self>, mut must_exit: watch::Receiver<bool>, busy_tx: mpsc::UnboundedSender<bool>, - ) -> Result<(), Error> { + ) { while !*must_exit.borrow() { let task = self.todo.lock().unwrap().pop_task(); if let Some(partition) = task { - busy_tx.send(true)?; + busy_tx.send(true).unwrap(); let res = self .clone() .sync_partition(&partition, &mut must_exit) @@ -213,11 +212,10 @@ where ); } } else { - busy_tx.send(false)?; + busy_tx.send(false).unwrap(); tokio::time::delay_for(Duration::from_secs(1)).await; } } - Ok(()) } async fn sync_partition( diff --git a/src/util/background.rs b/src/util/background.rs index 8081f157..3e600fdf 100644 --- a/src/util/background.rs +++ b/src/util/background.rs @@ -76,16 +76,13 @@ impl BackgroundRunner { pub fn spawn_worker<F, T>(&self, name: String, worker: F) where F: FnOnce(watch::Receiver<bool>) -> T + Send + 'static, - T: Future<Output = JobOutput> + Send + 'static, + T: Future<Output = ()> + Send + 'static, { let mut workers = self.workers.lock().unwrap(); let stop_signal = self.stop_signal.clone(); workers.push(tokio::spawn(async move { - if let Err(e) = worker(stop_signal).await { - error!("Worker stopped with error: {}, error: {}", name, e); - } else { - info!("Worker exited successfully: {}", name); - } + worker(stop_signal).await; + info!("Worker exited: {}", name); })); } |