aboutsummaryrefslogtreecommitdiff
path: root/src/table/sync.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-03-15 22:36:41 +0100
committerAlex Auvolat <alex@adnab.me>2021-03-15 22:36:41 +0100
commit0cd5b2ae19965b8c1f3176afeb8f678c4d8366dd (patch)
treea3f57c18da5377a618c38f3e4bba002c9eed1358 /src/table/sync.rs
parent4d4117f2b4eb69b63e2329f6e0b8929e6a8b5b31 (diff)
downloadgarage-0cd5b2ae19965b8c1f3176afeb8f678c4d8366dd.tar.gz
garage-0cd5b2ae19965b8c1f3176afeb8f678c4d8366dd.zip
WIP migrate to tokio 1
Diffstat (limited to 'src/table/sync.rs')
-rw-r--r--src/table/sync.rs35
1 files changed, 12 insertions, 23 deletions
diff --git a/src/table/sync.rs b/src/table/sync.rs
index b344eb88..65231cd5 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -3,7 +3,7 @@ use std::convert::TryInto;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
-use futures::{pin_mut, select};
+use futures::{select};
use futures_util::future::*;
use futures_util::stream::*;
use rand::Rng;
@@ -110,7 +110,7 @@ where
let s3 = syncer.clone();
tokio::spawn(async move {
- tokio::time::delay_for(Duration::from_secs(20)).await;
+ tokio::time::sleep(Duration::from_secs(20)).await;
s3.add_full_sync();
});
@@ -142,23 +142,16 @@ where
let mut nothing_to_do_since = Some(Instant::now());
while !*must_exit.borrow() {
- let s_ring_recv = ring_recv.recv().fuse();
- let s_busy = busy_rx.recv().fuse();
- let s_must_exit = must_exit.recv().fuse();
- let s_timeout = tokio::time::delay_for(Duration::from_secs(1)).fuse();
- pin_mut!(s_ring_recv, s_busy, s_must_exit, s_timeout);
-
select! {
- new_ring_r = s_ring_recv => {
- if let Some(new_ring) = new_ring_r {
- if !Arc::ptr_eq(&new_ring, &prev_ring) {
- debug!("({}) Ring changed, adding full sync to syncer todo list", self.data.name);
- self.add_full_sync();
- prev_ring = new_ring;
- }
+ _ = ring_recv.changed().fuse() => {
+ let new_ring = ring_recv.borrow();
+ if !Arc::ptr_eq(&new_ring, &prev_ring) {
+ debug!("({}) Ring changed, adding full sync to syncer todo list", self.data.name);
+ self.add_full_sync();
+ prev_ring = new_ring.clone();
}
}
- busy_opt = s_busy => {
+ busy_opt = busy_rx.recv().fuse() => {
if let Some(busy) = busy_opt {
if busy {
nothing_to_do_since = None;
@@ -169,12 +162,8 @@ where
}
}
}
- must_exit_v = s_must_exit => {
- if must_exit_v.unwrap_or(false) {
- break;
- }
- }
- _ = s_timeout => {
+ _ = must_exit.changed().fuse() => (),
+ _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {
if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) {
nothing_to_do_since = None;
debug!("({}) Interval passed, adding full sync to syncer todo list", self.data.name);
@@ -213,7 +202,7 @@ where
}
} else {
busy_tx.send(false).unwrap();
- tokio::time::delay_for(Duration::from_secs(1)).await;
+ tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}