aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-03-15 22:36:41 +0100
committerAlex Auvolat <alex@adnab.me>2021-03-15 22:36:41 +0100
commit0cd5b2ae19965b8c1f3176afeb8f678c4d8366dd (patch)
treea3f57c18da5377a618c38f3e4bba002c9eed1358 /src/table
parent4d4117f2b4eb69b63e2329f6e0b8929e6a8b5b31 (diff)
downloadgarage-0cd5b2ae19965b8c1f3176afeb8f678c4d8366dd.tar.gz
garage-0cd5b2ae19965b8c1f3176afeb8f678c4d8366dd.zip
WIP migrate to tokio 1
Diffstat (limited to 'src/table')
-rw-r--r--src/table/Cargo.toml2
-rw-r--r--src/table/data.rs6
-rw-r--r--src/table/gc.rs4
-rw-r--r--src/table/merkle.rs4
-rw-r--r--src/table/sync.rs35
5 files changed, 20 insertions, 31 deletions
diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml
index 6b3aaceb..8f73470e 100644
--- a/src/table/Cargo.toml
+++ b/src/table/Cargo.toml
@@ -31,5 +31,5 @@ serde_bytes = "0.11"
async-trait = "0.1.30"
futures = "0.3"
futures-util = "0.3"
-tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] }
+tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
diff --git a/src/table/data.rs b/src/table/data.rs
index 0a7b2cec..0029b936 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -159,7 +159,7 @@ where
if let Some((old_entry, new_entry, new_bytes_hash)) = changed {
let is_tombstone = new_entry.is_tombstone();
self.instance.updated(old_entry, Some(new_entry));
- self.merkle_updater.todo_notify.notify();
+ self.merkle_updater.todo_notify.notify_one();
if is_tombstone {
self.gc_todo.insert(&tree_key, new_bytes_hash.as_slice())?;
}
@@ -184,7 +184,7 @@ where
if removed {
let old_entry = self.decode_entry(v)?;
self.instance.updated(Some(old_entry), None);
- self.merkle_updater.todo_notify.notify();
+ self.merkle_updater.todo_notify.notify_one();
}
Ok(removed)
}
@@ -209,7 +209,7 @@ where
if let Some(old_v) = removed {
let old_entry = self.decode_entry(&old_v[..])?;
self.instance.updated(Some(old_entry), None);
- self.merkle_updater.todo_notify.notify();
+ self.merkle_updater.todo_notify.notify_one();
Ok(true)
} else {
Ok(false)
diff --git a/src/table/gc.rs b/src/table/gc.rs
index fd9a26d1..d37fdf35 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -85,8 +85,8 @@ where
}
}
select! {
- _ = tokio::time::delay_for(Duration::from_secs(10)).fuse() => (),
- _ = must_exit.recv().fuse() => (),
+ _ = tokio::time::sleep(Duration::from_secs(10)).fuse() => (),
+ _ = must_exit.changed().fuse() => (),
}
}
}
diff --git a/src/table/merkle.rs b/src/table/merkle.rs
index 5ce9cee3..86289bf1 100644
--- a/src/table/merkle.rs
+++ b/src/table/merkle.rs
@@ -121,13 +121,13 @@ impl MerkleUpdater {
"({}) Error while iterating on Merkle todo tree: {}",
self.table_name, e
);
- tokio::time::delay_for(Duration::from_secs(10)).await;
+ tokio::time::sleep(Duration::from_secs(10)).await;
}
}
} else {
select! {
_ = self.todo_notify.notified().fuse() => (),
- _ = must_exit.recv().fuse() => (),
+ _ = must_exit.changed().fuse() => (),
}
}
}
diff --git a/src/table/sync.rs b/src/table/sync.rs
index b344eb88..65231cd5 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -3,7 +3,7 @@ use std::convert::TryInto;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
-use futures::{pin_mut, select};
+use futures::{select};
use futures_util::future::*;
use futures_util::stream::*;
use rand::Rng;
@@ -110,7 +110,7 @@ where
let s3 = syncer.clone();
tokio::spawn(async move {
- tokio::time::delay_for(Duration::from_secs(20)).await;
+ tokio::time::sleep(Duration::from_secs(20)).await;
s3.add_full_sync();
});
@@ -142,23 +142,16 @@ where
let mut nothing_to_do_since = Some(Instant::now());
while !*must_exit.borrow() {
- let s_ring_recv = ring_recv.recv().fuse();
- let s_busy = busy_rx.recv().fuse();
- let s_must_exit = must_exit.recv().fuse();
- let s_timeout = tokio::time::delay_for(Duration::from_secs(1)).fuse();
- pin_mut!(s_ring_recv, s_busy, s_must_exit, s_timeout);
-
select! {
- new_ring_r = s_ring_recv => {
- if let Some(new_ring) = new_ring_r {
- if !Arc::ptr_eq(&new_ring, &prev_ring) {
- debug!("({}) Ring changed, adding full sync to syncer todo list", self.data.name);
- self.add_full_sync();
- prev_ring = new_ring;
- }
+ _ = ring_recv.changed().fuse() => {
+ let new_ring = ring_recv.borrow();
+ if !Arc::ptr_eq(&new_ring, &prev_ring) {
+ debug!("({}) Ring changed, adding full sync to syncer todo list", self.data.name);
+ self.add_full_sync();
+ prev_ring = new_ring.clone();
}
}
- busy_opt = s_busy => {
+ busy_opt = busy_rx.recv().fuse() => {
if let Some(busy) = busy_opt {
if busy {
nothing_to_do_since = None;
@@ -169,12 +162,8 @@ where
}
}
}
- must_exit_v = s_must_exit => {
- if must_exit_v.unwrap_or(false) {
- break;
- }
- }
- _ = s_timeout => {
+ _ = must_exit.changed().fuse() => (),
+ _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {
if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) {
nothing_to_do_since = None;
debug!("({}) Interval passed, adding full sync to syncer todo list", self.data.name);
@@ -213,7 +202,7 @@ where
}
} else {
busy_tx.send(false).unwrap();
- tokio::time::delay_for(Duration::from_secs(1)).await;
+ tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}