diff options
author | Alex Auvolat <alex@adnab.me> | 2021-03-15 20:09:44 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-03-15 20:09:44 +0100 |
commit | 4d4117f2b4eb69b63e2329f6e0b8929e6a8b5b31 (patch) | |
tree | 03da6808b6f0391293ab5a252d5efd1328acdefc /src/table | |
parent | 667e4e72a8e64a094d57ceeb6442cef08f1ef0e1 (diff) | |
download | garage-4d4117f2b4eb69b63e2329f6e0b8929e6a8b5b31.tar.gz garage-4d4117f2b4eb69b63e2329f6e0b8929e6a8b5b31.zip |
Refactor block resync loop; make workers infaillible
Diffstat (limited to 'src/table')
-rw-r--r-- | src/table/gc.rs | 3 | ||||
-rw-r--r-- | src/table/merkle.rs | 3 | ||||
-rw-r--r-- | src/table/sync.rs | 10 |
3 files changed, 6 insertions, 10 deletions
diff --git a/src/table/gc.rs b/src/table/gc.rs index c13c8234..fd9a26d1 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -70,7 +70,7 @@ where gc } - async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) -> Result<(), Error> { + async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) { while !*must_exit.borrow() { match self.gc_loop_iter().await { Ok(true) => { @@ -89,7 +89,6 @@ where _ = must_exit.recv().fuse() => (), } } - Ok(()) } async fn gc_loop_iter(&self) -> Result<bool, Error> { diff --git a/src/table/merkle.rs b/src/table/merkle.rs index aefb5169..5ce9cee3 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -104,7 +104,7 @@ impl MerkleUpdater { async fn updater_loop( self: Arc<Self>, mut must_exit: watch::Receiver<bool>, - ) -> Result<(), Error> { + ) { while !*must_exit.borrow() { if let Some(x) = self.todo.iter().next() { match x { @@ -131,7 +131,6 @@ impl MerkleUpdater { } } } - Ok(()) } fn update_item(&self, k: &[u8], vhash_by: &[u8]) -> Result<(), Error> { diff --git a/src/table/sync.rs b/src/table/sync.rs index 6c8792d2..b344eb88 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -136,7 +136,7 @@ where self: Arc<Self>, mut must_exit: watch::Receiver<bool>, mut busy_rx: mpsc::UnboundedReceiver<bool>, - ) -> Result<(), Error> { + ) { let mut prev_ring: Arc<Ring> = self.aux.system.ring.borrow().clone(); let mut ring_recv: watch::Receiver<Arc<Ring>> = self.aux.system.ring.clone(); let mut nothing_to_do_since = Some(Instant::now()); @@ -183,7 +183,6 @@ where } } } - Ok(()) } pub fn add_full_sync(&self) { @@ -197,11 +196,11 @@ where self: Arc<Self>, mut must_exit: watch::Receiver<bool>, busy_tx: mpsc::UnboundedSender<bool>, - ) -> Result<(), Error> { + ) { while !*must_exit.borrow() { let task = self.todo.lock().unwrap().pop_task(); if let Some(partition) = task { - busy_tx.send(true)?; + busy_tx.send(true).unwrap(); let res = self .clone() .sync_partition(&partition, &mut must_exit) @@ -213,11 +212,10 @@ where ); } } else { - busy_tx.send(false)?; + busy_tx.send(false).unwrap(); tokio::time::delay_for(Duration::from_secs(1)).await; } } - Ok(()) } async fn sync_partition( |