diff options
Diffstat (limited to 'src/table')
-rw-r--r-- | src/table/Cargo.toml | 2 | ||||
-rw-r--r-- | src/table/data.rs | 6 | ||||
-rw-r--r-- | src/table/gc.rs | 4 | ||||
-rw-r--r-- | src/table/merkle.rs | 4 | ||||
-rw-r--r-- | src/table/sync.rs | 35 |
5 files changed, 20 insertions, 31 deletions
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 6b3aaceb..8f73470e 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -31,5 +31,5 @@ serde_bytes = "0.11" async-trait = "0.1.30" futures = "0.3" futures-util = "0.3" -tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] } +tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } diff --git a/src/table/data.rs b/src/table/data.rs index 0a7b2cec..0029b936 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -159,7 +159,7 @@ where if let Some((old_entry, new_entry, new_bytes_hash)) = changed { let is_tombstone = new_entry.is_tombstone(); self.instance.updated(old_entry, Some(new_entry)); - self.merkle_updater.todo_notify.notify(); + self.merkle_updater.todo_notify.notify_one(); if is_tombstone { self.gc_todo.insert(&tree_key, new_bytes_hash.as_slice())?; } @@ -184,7 +184,7 @@ where if removed { let old_entry = self.decode_entry(v)?; self.instance.updated(Some(old_entry), None); - self.merkle_updater.todo_notify.notify(); + self.merkle_updater.todo_notify.notify_one(); } Ok(removed) } @@ -209,7 +209,7 @@ where if let Some(old_v) = removed { let old_entry = self.decode_entry(&old_v[..])?; self.instance.updated(Some(old_entry), None); - self.merkle_updater.todo_notify.notify(); + self.merkle_updater.todo_notify.notify_one(); Ok(true) } else { Ok(false) diff --git a/src/table/gc.rs b/src/table/gc.rs index fd9a26d1..d37fdf35 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -85,8 +85,8 @@ where } } select! { - _ = tokio::time::delay_for(Duration::from_secs(10)).fuse() => (), - _ = must_exit.recv().fuse() => (), + _ = tokio::time::sleep(Duration::from_secs(10)).fuse() => (), + _ = must_exit.changed().fuse() => (), } } } diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 5ce9cee3..86289bf1 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -121,13 +121,13 @@ impl MerkleUpdater { "({}) Error while iterating on Merkle todo tree: {}", self.table_name, e ); - tokio::time::delay_for(Duration::from_secs(10)).await; + tokio::time::sleep(Duration::from_secs(10)).await; } } } else { select! { _ = self.todo_notify.notified().fuse() => (), - _ = must_exit.recv().fuse() => (), + _ = must_exit.changed().fuse() => (), } } } diff --git a/src/table/sync.rs b/src/table/sync.rs index b344eb88..65231cd5 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -3,7 +3,7 @@ use std::convert::TryInto; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; -use futures::{pin_mut, select}; +use futures::{select}; use futures_util::future::*; use futures_util::stream::*; use rand::Rng; @@ -110,7 +110,7 @@ where let s3 = syncer.clone(); tokio::spawn(async move { - tokio::time::delay_for(Duration::from_secs(20)).await; + tokio::time::sleep(Duration::from_secs(20)).await; s3.add_full_sync(); }); @@ -142,23 +142,16 @@ where let mut nothing_to_do_since = Some(Instant::now()); while !*must_exit.borrow() { - let s_ring_recv = ring_recv.recv().fuse(); - let s_busy = busy_rx.recv().fuse(); - let s_must_exit = must_exit.recv().fuse(); - let s_timeout = tokio::time::delay_for(Duration::from_secs(1)).fuse(); - pin_mut!(s_ring_recv, s_busy, s_must_exit, s_timeout); - select! { - new_ring_r = s_ring_recv => { - if let Some(new_ring) = new_ring_r { - if !Arc::ptr_eq(&new_ring, &prev_ring) { - debug!("({}) Ring changed, adding full sync to syncer todo list", self.data.name); - self.add_full_sync(); - prev_ring = new_ring; - } + _ = ring_recv.changed().fuse() => { + let new_ring = ring_recv.borrow(); + if !Arc::ptr_eq(&new_ring, &prev_ring) { + debug!("({}) Ring changed, adding full sync to syncer todo list", self.data.name); + self.add_full_sync(); + prev_ring = new_ring.clone(); } } - busy_opt = s_busy => { + busy_opt = busy_rx.recv().fuse() => { if let Some(busy) = busy_opt { if busy { nothing_to_do_since = None; @@ -169,12 +162,8 @@ where } } } - must_exit_v = s_must_exit => { - if must_exit_v.unwrap_or(false) { - break; - } - } - _ = s_timeout => { + _ = must_exit.changed().fuse() => (), + _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => { if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) { nothing_to_do_since = None; debug!("({}) Interval passed, adding full sync to syncer todo list", self.data.name); @@ -213,7 +202,7 @@ where } } else { busy_tx.send(false).unwrap(); - tokio::time::delay_for(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_secs(1)).await; } } } |