aboutsummaryrefslogtreecommitdiff
path: root/src/model/block.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/model/block.rs')
-rw-r--r--src/model/block.rs23
1 files changed, 13 insertions, 10 deletions
diff --git a/src/model/block.rs b/src/model/block.rs
index 023ed3ab..7185372c 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -5,10 +5,9 @@ use std::time::Duration;
use arc_swap::ArcSwapOption;
use futures::future::*;
use futures::select;
-use futures::stream::*;
use serde::{Deserialize, Serialize};
use tokio::fs;
-use tokio::prelude::*;
+use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::{watch, Mutex, Notify};
use garage_util::data::*;
@@ -134,7 +133,7 @@ impl BlockManager {
let bm2 = self.clone();
let background = self.system.background.clone();
tokio::spawn(async move {
- tokio::time::delay_for(Duration::from_secs(10 * (i + 1))).await;
+ tokio::time::sleep(Duration::from_secs(10 * (i + 1))).await;
background.spawn_worker(format!("block resync worker {}", i), move |must_exit| {
bm2.resync_loop(must_exit)
});
@@ -251,7 +250,7 @@ impl BlockManager {
let mut key = u64::to_be_bytes(when).to_vec();
key.extend(hash.as_ref());
self.resync_queue.insert(key, hash.as_ref())?;
- self.resync_notify.notify();
+ self.resync_notify.notify_waiters();
Ok(())
}
@@ -262,7 +261,7 @@ impl BlockManager {
while !*must_exit.borrow() {
if let Err(e) = self.resync_iter(&mut must_exit).await {
warn!("Error in block resync loop: {}", e);
- tokio::time::delay_for(Duration::from_secs(10)).await;
+ tokio::time::sleep(Duration::from_secs(10)).await;
}
}
}
@@ -283,17 +282,17 @@ impl BlockManager {
self.resync_queue.remove(&time_bytes)?;
res?; // propagate error to delay main loop
} else {
- let delay = tokio::time::delay_for(Duration::from_millis(time_msec - now));
+ let delay = tokio::time::sleep(Duration::from_millis(time_msec - now));
select! {
_ = delay.fuse() => (),
_ = self.resync_notify.notified().fuse() => (),
- _ = must_exit.recv().fuse() => (),
+ _ = must_exit.changed().fuse() => (),
}
}
} else {
select! {
_ = self.resync_notify.notified().fuse() => (),
- _ = must_exit.recv().fuse() => (),
+ _ = must_exit.changed().fuse() => (),
}
}
Ok(())
@@ -467,8 +466,12 @@ impl BlockManager {
// so that we can offload them if necessary and then delete them locally.
async move {
let mut ls_data_dir = fs::read_dir(path).await?;
- while let Some(data_dir_ent) = ls_data_dir.next().await {
- let data_dir_ent = data_dir_ent?;
+ loop {
+ let data_dir_ent = ls_data_dir.next_entry().await?;
+ let data_dir_ent = match data_dir_ent {
+ Some(x) => x,
+ None => break,
+ };
let name = data_dir_ent.file_name();
let name = match name.into_string() {
Ok(x) => x,