aboutsummaryrefslogtreecommitdiff
path: root/src/block.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-22 20:32:58 +0000
committerAlex Auvolat <alex@adnab.me>2020-04-22 20:32:58 +0000
commit897fafa8db61ee94c40d233332940e3e470d1d03 (patch)
tree36bbc47179a56a17d14e699526804dc78b3d99fd /src/block.rs
parent2556a1e3835cc8ba71eca27182bca4a3fa4b4083 (diff)
downloadgarage-897fafa8db61ee94c40d233332940e3e470d1d03.tar.gz
garage-897fafa8db61ee94c40d233332940e3e470d1d03.zip
Improvements to block resync queue & worker
Diffstat (limited to 'src/block.rs')
-rw-r--r--src/block.rs68
1 files changed, 47 insertions, 21 deletions
diff --git a/src/block.rs b/src/block.rs
index bc3e9292..ea4f7c10 100644
--- a/src/block.rs
+++ b/src/block.rs
@@ -4,11 +4,12 @@ use std::time::Duration;
use arc_swap::ArcSwapOption;
use futures::future::*;
+use futures::select;
use futures::stream::*;
use serde::{Deserialize, Serialize};
use tokio::fs;
use tokio::prelude::*;
-use tokio::sync::{watch, Mutex};
+use tokio::sync::{watch, Mutex, Notify};
use crate::data;
use crate::data::*;
@@ -47,9 +48,13 @@ impl RpcMessage for Message {}
pub struct BlockManager {
pub data_dir: PathBuf,
+ pub data_dir_lock: Mutex<()>,
+
pub rc: sled::Tree,
+
pub resync_queue: sled::Tree,
- pub lock: Mutex<()>,
+ pub resync_notify: Notify,
+
pub system: Arc<System>,
rpc_client: Arc<RpcClient<Message>>,
pub garage: ArcSwapOption<Garage>,
@@ -75,10 +80,11 @@ impl BlockManager {
let rpc_client = system.rpc_client::<Message>(rpc_path);
let block_manager = Arc::new(Self {
+ data_dir,
+ data_dir_lock: Mutex::new(()),
rc,
resync_queue,
- data_dir,
- lock: Mutex::new(()),
+ resync_notify: Notify::new(),
system,
rpc_client,
garage: ArcSwapOption::from(None),
@@ -109,17 +115,20 @@ impl BlockManager {
// Launch 2 simultaneous workers for background resync loop preprocessing
for i in 0..2usize {
let bm2 = self.clone();
- self.system
- .background
- .spawn_worker(format!("block resync worker {}", i), move |must_exit| {
- bm2.resync_loop(must_exit)
- })
- .await;
+ let background = self.system.background.clone();
+ tokio::spawn(async move {
+ tokio::time::delay_for(Duration::from_secs(10)).await;
+ background
+ .spawn_worker(format!("block resync worker {}", i), move |must_exit| {
+ bm2.resync_loop(must_exit)
+ })
+ .await;
+ });
}
}
pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
- let _lock = self.lock.lock().await;
+ let _lock = self.data_dir_lock.lock().await;
let mut path = self.block_dir(hash);
fs::create_dir_all(&path).await?;
@@ -152,7 +161,7 @@ impl BlockManager {
drop(f);
if data::hash(&data[..]) != *hash {
- let _lock = self.lock.lock().await;
+ let _lock = self.data_dir_lock.lock().await;
warn!("Block {:?} is corrupted. Deleting and resyncing.", hash);
fs::remove_file(path).await?;
self.put_to_resync(&hash, 0)?;
@@ -212,19 +221,20 @@ impl BlockManager {
let mut key = u64::to_be_bytes(when).to_vec();
key.extend(hash.as_ref());
self.resync_queue.insert(key, hash.as_ref())?;
+ self.resync_notify.notify();
Ok(())
}
- async fn resync_loop(self: Arc<Self>, must_exit: watch::Receiver<bool>) -> Result<(), Error> {
+ async fn resync_loop(
+ self: Arc<Self>,
+ mut must_exit: watch::Receiver<bool>,
+ ) -> Result<(), Error> {
+ let mut n_failures = 0usize;
while !*must_exit.borrow() {
if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? {
let time_msec = u64_from_bytes(&time_bytes[0..8]);
- trace!(
- "First in resync queue: {} (now = {})",
- time_msec,
- now_msec()
- );
- if now_msec() >= time_msec {
+ let now = now_msec();
+ if now >= time_msec {
let mut hash = [0u8; 32];
hash.copy_from_slice(hash_bytes.as_ref());
let hash = Hash::from(hash);
@@ -232,13 +242,29 @@ impl BlockManager {
if let Err(e) = self.resync_iter(&hash).await {
warn!("Failed to resync block {:?}, retrying later: {}", hash, e);
self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT.as_millis() as u64)?;
+ n_failures += 1;
+ if n_failures >= 10 {
+ warn!("Too many resync failures, throttling.");
+ tokio::time::delay_for(Duration::from_secs(1)).await;
+ }
+ } else {
+ n_failures = 0;
}
- continue;
} else {
self.resync_queue.insert(time_bytes, hash_bytes)?;
+ let delay = tokio::time::delay_for(Duration::from_millis(time_msec - now));
+ select! {
+ _ = delay.fuse() => (),
+ _ = self.resync_notify.notified().fuse() => (),
+ _ = must_exit.recv().fuse() => (),
+ }
+ }
+ } else {
+ select! {
+ _ = self.resync_notify.notified().fuse() => (),
+ _ = must_exit.recv().fuse() => (),
}
}
- tokio::time::delay_for(Duration::from_secs(1)).await;
}
Ok(())
}