aboutsummaryrefslogtreecommitdiff
path: root/src/model
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-03-15 22:36:41 +0100
committerAlex Auvolat <alex@adnab.me>2021-03-15 22:36:41 +0100
commit0cd5b2ae19965b8c1f3176afeb8f678c4d8366dd (patch)
treea3f57c18da5377a618c38f3e4bba002c9eed1358 /src/model
parent4d4117f2b4eb69b63e2329f6e0b8929e6a8b5b31 (diff)
downloadgarage-0cd5b2ae19965b8c1f3176afeb8f678c4d8366dd.tar.gz
garage-0cd5b2ae19965b8c1f3176afeb8f678c4d8366dd.zip
WIP migrate to tokio 1
Diffstat (limited to 'src/model')
-rw-r--r--src/model/Cargo.toml3
-rw-r--r--src/model/block.rs23
-rw-r--r--src/model/object_table.rs3
-rw-r--r--src/model/version_table.rs3
4 files changed, 18 insertions, 14 deletions
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index caeed66c..8f36cf2e 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -33,5 +33,4 @@ serde_bytes = "0.11"
async-trait = "0.1.30"
futures = "0.3"
futures-util = "0.3"
-tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] }
-
+tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
diff --git a/src/model/block.rs b/src/model/block.rs
index 023ed3ab..7185372c 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -5,10 +5,9 @@ use std::time::Duration;
use arc_swap::ArcSwapOption;
use futures::future::*;
use futures::select;
-use futures::stream::*;
use serde::{Deserialize, Serialize};
use tokio::fs;
-use tokio::prelude::*;
+use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::{watch, Mutex, Notify};
use garage_util::data::*;
@@ -134,7 +133,7 @@ impl BlockManager {
let bm2 = self.clone();
let background = self.system.background.clone();
tokio::spawn(async move {
- tokio::time::delay_for(Duration::from_secs(10 * (i + 1))).await;
+ tokio::time::sleep(Duration::from_secs(10 * (i + 1))).await;
background.spawn_worker(format!("block resync worker {}", i), move |must_exit| {
bm2.resync_loop(must_exit)
});
@@ -251,7 +250,7 @@ impl BlockManager {
let mut key = u64::to_be_bytes(when).to_vec();
key.extend(hash.as_ref());
self.resync_queue.insert(key, hash.as_ref())?;
- self.resync_notify.notify();
+ self.resync_notify.notify_waiters();
Ok(())
}
@@ -262,7 +261,7 @@ impl BlockManager {
while !*must_exit.borrow() {
if let Err(e) = self.resync_iter(&mut must_exit).await {
warn!("Error in block resync loop: {}", e);
- tokio::time::delay_for(Duration::from_secs(10)).await;
+ tokio::time::sleep(Duration::from_secs(10)).await;
}
}
}
@@ -283,17 +282,17 @@ impl BlockManager {
self.resync_queue.remove(&time_bytes)?;
res?; // propagate error to delay main loop
} else {
- let delay = tokio::time::delay_for(Duration::from_millis(time_msec - now));
+ let delay = tokio::time::sleep(Duration::from_millis(time_msec - now));
select! {
_ = delay.fuse() => (),
_ = self.resync_notify.notified().fuse() => (),
- _ = must_exit.recv().fuse() => (),
+ _ = must_exit.changed().fuse() => (),
}
}
} else {
select! {
_ = self.resync_notify.notified().fuse() => (),
- _ = must_exit.recv().fuse() => (),
+ _ = must_exit.changed().fuse() => (),
}
}
Ok(())
@@ -467,8 +466,12 @@ impl BlockManager {
// so that we can offload them if necessary and then delete them locally.
async move {
let mut ls_data_dir = fs::read_dir(path).await?;
- while let Some(data_dir_ent) = ls_data_dir.next().await {
- let data_dir_ent = data_dir_ent?;
+ loop {
+ let data_dir_ent = ls_data_dir.next_entry().await?;
+ let data_dir_ent = match data_dir_ent {
+ Some(x) => x,
+ None => break,
+ };
let name = data_dir_ent.file_name();
let name = match name.into_string() {
Ok(x) => x,
diff --git a/src/model/object_table.rs b/src/model/object_table.rs
index 99fad3ce..d08bba70 100644
--- a/src/model/object_table.rs
+++ b/src/model/object_table.rs
@@ -195,7 +195,8 @@ impl TableSchema for ObjectTable {
fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
let version_table = self.version_table.clone();
- self.background.spawn(async move {
+ // TODO not cancellable
+ self.background.spawn_cancellable(async move {
if let (Some(old_v), Some(new_v)) = (old, new) {
// Propagate deletion of old versions
for v in old_v.versions.iter() {
diff --git a/src/model/version_table.rs b/src/model/version_table.rs
index 841fbfea..19343890 100644
--- a/src/model/version_table.rs
+++ b/src/model/version_table.rs
@@ -110,7 +110,8 @@ impl TableSchema for VersionTable {
fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
let block_ref_table = self.block_ref_table.clone();
- self.background.spawn(async move {
+ // TODO not cancellable
+ self.background.spawn_cancellable(async move {
if let (Some(old_v), Some(new_v)) = (old, new) {
// Propagate deletion of version blocks
if new_v.deleted.get() && !old_v.deleted.get() {