From 0cd5b2ae19965b8c1f3176afeb8f678c4d8366dd Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 15 Mar 2021 22:36:41 +0100 Subject: WIP migrate to tokio 1 --- src/model/Cargo.toml | 3 +-- src/model/block.rs | 23 +++++++++++++---------- src/model/object_table.rs | 3 ++- src/model/version_table.rs | 3 ++- 4 files changed, 18 insertions(+), 14 deletions(-) (limited to 'src/model') diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index caeed66c..8f36cf2e 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -33,5 +33,4 @@ serde_bytes = "0.11" async-trait = "0.1.30" futures = "0.3" futures-util = "0.3" -tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] } - +tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } diff --git a/src/model/block.rs b/src/model/block.rs index 023ed3ab..7185372c 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -5,10 +5,9 @@ use std::time::Duration; use arc_swap::ArcSwapOption; use futures::future::*; use futures::select; -use futures::stream::*; use serde::{Deserialize, Serialize}; use tokio::fs; -use tokio::prelude::*; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::{watch, Mutex, Notify}; use garage_util::data::*; @@ -134,7 +133,7 @@ impl BlockManager { let bm2 = self.clone(); let background = self.system.background.clone(); tokio::spawn(async move { - tokio::time::delay_for(Duration::from_secs(10 * (i + 1))).await; + tokio::time::sleep(Duration::from_secs(10 * (i + 1))).await; background.spawn_worker(format!("block resync worker {}", i), move |must_exit| { bm2.resync_loop(must_exit) }); @@ -251,7 +250,7 @@ impl BlockManager { let mut key = u64::to_be_bytes(when).to_vec(); key.extend(hash.as_ref()); self.resync_queue.insert(key, hash.as_ref())?; - self.resync_notify.notify(); + self.resync_notify.notify_waiters(); Ok(()) } @@ -262,7 +261,7 @@ impl BlockManager { while !*must_exit.borrow() { if let Err(e) = self.resync_iter(&mut must_exit).await { warn!("Error in block resync loop: {}", e); - tokio::time::delay_for(Duration::from_secs(10)).await; + tokio::time::sleep(Duration::from_secs(10)).await; } } } @@ -283,17 +282,17 @@ impl BlockManager { self.resync_queue.remove(&time_bytes)?; res?; // propagate error to delay main loop } else { - let delay = tokio::time::delay_for(Duration::from_millis(time_msec - now)); + let delay = tokio::time::sleep(Duration::from_millis(time_msec - now)); select! { _ = delay.fuse() => (), _ = self.resync_notify.notified().fuse() => (), - _ = must_exit.recv().fuse() => (), + _ = must_exit.changed().fuse() => (), } } } else { select! { _ = self.resync_notify.notified().fuse() => (), - _ = must_exit.recv().fuse() => (), + _ = must_exit.changed().fuse() => (), } } Ok(()) @@ -467,8 +466,12 @@ impl BlockManager { // so that we can offload them if necessary and then delete them locally. async move { let mut ls_data_dir = fs::read_dir(path).await?; - while let Some(data_dir_ent) = ls_data_dir.next().await { - let data_dir_ent = data_dir_ent?; + loop { + let data_dir_ent = ls_data_dir.next_entry().await?; + let data_dir_ent = match data_dir_ent { + Some(x) => x, + None => break, + }; let name = data_dir_ent.file_name(); let name = match name.into_string() { Ok(x) => x, diff --git a/src/model/object_table.rs b/src/model/object_table.rs index 99fad3ce..d08bba70 100644 --- a/src/model/object_table.rs +++ b/src/model/object_table.rs @@ -195,7 +195,8 @@ impl TableSchema for ObjectTable { fn updated(&self, old: Option, new: Option) { let version_table = self.version_table.clone(); - self.background.spawn(async move { + // TODO not cancellable + self.background.spawn_cancellable(async move { if let (Some(old_v), Some(new_v)) = (old, new) { // Propagate deletion of old versions for v in old_v.versions.iter() { diff --git a/src/model/version_table.rs b/src/model/version_table.rs index 841fbfea..19343890 100644 --- a/src/model/version_table.rs +++ b/src/model/version_table.rs @@ -110,7 +110,8 @@ impl TableSchema for VersionTable { fn updated(&self, old: Option, new: Option) { let block_ref_table = self.block_ref_table.clone(); - self.background.spawn(async move { + // TODO not cancellable + self.background.spawn_cancellable(async move { if let (Some(old_v), Some(new_v)) = (old, new) { // Propagate deletion of version blocks if new_v.deleted.get() && !old_v.deleted.get() { -- cgit v1.2.3