aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-03-15 20:09:44 +0100
committerAlex Auvolat <alex@adnab.me>2021-03-15 20:09:44 +0100
commit4d4117f2b4eb69b63e2329f6e0b8929e6a8b5b31 (patch)
tree03da6808b6f0391293ab5a252d5efd1328acdefc /src/table
parent667e4e72a8e64a094d57ceeb6442cef08f1ef0e1 (diff)
downloadgarage-4d4117f2b4eb69b63e2329f6e0b8929e6a8b5b31.tar.gz
garage-4d4117f2b4eb69b63e2329f6e0b8929e6a8b5b31.zip
Refactor block resync loop; make workers infaillible
Diffstat (limited to 'src/table')
-rw-r--r--src/table/gc.rs3
-rw-r--r--src/table/merkle.rs3
-rw-r--r--src/table/sync.rs10
3 files changed, 6 insertions, 10 deletions
diff --git a/src/table/gc.rs b/src/table/gc.rs
index c13c8234..fd9a26d1 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -70,7 +70,7 @@ where
gc
}
- async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) -> Result<(), Error> {
+ async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
while !*must_exit.borrow() {
match self.gc_loop_iter().await {
Ok(true) => {
@@ -89,7 +89,6 @@ where
_ = must_exit.recv().fuse() => (),
}
}
- Ok(())
}
async fn gc_loop_iter(&self) -> Result<bool, Error> {
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index aefb5169..5ce9cee3 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -104,7 +104,7 @@ impl MerkleUpdater {
async fn updater_loop(
self: Arc<Self>,
mut must_exit: watch::Receiver<bool>,
- ) -> Result<(), Error> {
+ ) {
while !*must_exit.borrow() {
if let Some(x) = self.todo.iter().next() {
match x {
@@ -131,7 +131,6 @@ impl MerkleUpdater {
}
}
}
- Ok(())
}
fn update_item(&self, k: &[u8], vhash_by: &[u8]) -> Result<(), Error> {
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 6c8792d2..b344eb88 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -136,7 +136,7 @@ where
self: Arc<Self>,
mut must_exit: watch::Receiver<bool>,
mut busy_rx: mpsc::UnboundedReceiver<bool>,
- ) -> Result<(), Error> {
+ ) {
let mut prev_ring: Arc<Ring> = self.aux.system.ring.borrow().clone();
let mut ring_recv: watch::Receiver<Arc<Ring>> = self.aux.system.ring.clone();
let mut nothing_to_do_since = Some(Instant::now());
@@ -183,7 +183,6 @@ where
}
}
}
- Ok(())
}
pub fn add_full_sync(&self) {
@@ -197,11 +196,11 @@ where
self: Arc<Self>,
mut must_exit: watch::Receiver<bool>,
busy_tx: mpsc::UnboundedSender<bool>,
- ) -> Result<(), Error> {
+ ) {
while !*must_exit.borrow() {
let task = self.todo.lock().unwrap().pop_task();
if let Some(partition) = task {
- busy_tx.send(true)?;
+ busy_tx.send(true).unwrap();
let res = self
.clone()
.sync_partition(&partition, &mut must_exit)
@@ -213,11 +212,10 @@ where
);
}
} else {
- busy_tx.send(false)?;
+ busy_tx.send(false).unwrap();
tokio::time::delay_for(Duration::from_secs(1)).await;
}
}
- Ok(())
}
async fn sync_partition(