From 4d4117f2b4eb69b63e2329f6e0b8929e6a8b5b31 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 15 Mar 2021 20:09:44 +0100 Subject: Refactor block resync loop; make workers infaillible --- src/table/gc.rs | 3 +-- src/table/merkle.rs | 3 +-- src/table/sync.rs | 10 ++++------ 3 files changed, 6 insertions(+), 10 deletions(-) (limited to 'src/table') diff --git a/src/table/gc.rs b/src/table/gc.rs index c13c8234..fd9a26d1 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -70,7 +70,7 @@ where gc } - async fn gc_loop(self: Arc, mut must_exit: watch::Receiver) -> Result<(), Error> { + async fn gc_loop(self: Arc, mut must_exit: watch::Receiver) { while !*must_exit.borrow() { match self.gc_loop_iter().await { Ok(true) => { @@ -89,7 +89,6 @@ where _ = must_exit.recv().fuse() => (), } } - Ok(()) } async fn gc_loop_iter(&self) -> Result { diff --git a/src/table/merkle.rs b/src/table/merkle.rs index aefb5169..5ce9cee3 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -104,7 +104,7 @@ impl MerkleUpdater { async fn updater_loop( self: Arc, mut must_exit: watch::Receiver, - ) -> Result<(), Error> { + ) { while !*must_exit.borrow() { if let Some(x) = self.todo.iter().next() { match x { @@ -131,7 +131,6 @@ impl MerkleUpdater { } } } - Ok(()) } fn update_item(&self, k: &[u8], vhash_by: &[u8]) -> Result<(), Error> { diff --git a/src/table/sync.rs b/src/table/sync.rs index 6c8792d2..b344eb88 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -136,7 +136,7 @@ where self: Arc, mut must_exit: watch::Receiver, mut busy_rx: mpsc::UnboundedReceiver, - ) -> Result<(), Error> { + ) { let mut prev_ring: Arc = self.aux.system.ring.borrow().clone(); let mut ring_recv: watch::Receiver> = self.aux.system.ring.clone(); let mut nothing_to_do_since = Some(Instant::now()); @@ -183,7 +183,6 @@ where } } } - Ok(()) } pub fn add_full_sync(&self) { @@ -197,11 +196,11 @@ where self: Arc, mut must_exit: watch::Receiver, busy_tx: mpsc::UnboundedSender, - ) -> Result<(), Error> { + ) { while !*must_exit.borrow() { let task = self.todo.lock().unwrap().pop_task(); if let Some(partition) = task { - busy_tx.send(true)?; + busy_tx.send(true).unwrap(); let res = self .clone() .sync_partition(&partition, &mut must_exit) @@ -213,11 +212,10 @@ where ); } } else { - busy_tx.send(false)?; + busy_tx.send(false).unwrap(); tokio::time::delay_for(Duration::from_secs(1)).await; } } - Ok(()) } async fn sync_partition( -- cgit v1.2.3