aboutsummaryrefslogtreecommitdiff
path: root/src/block.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-17 20:58:10 +0200
committerAlex Auvolat <alex@adnab.me>2020-04-17 20:58:10 +0200
commit40c48e6a590c887586664957da2dca0f368cdcb2 (patch)
tree6b99550c514e99bdd4b045a32863a9e8804cced8 /src/block.rs
parent29a1e94f233af943243e4334447db6209e3e80f1 (diff)
downloadgarage-40c48e6a590c887586664957da2dca0f368cdcb2.tar.gz
garage-40c48e6a590c887586664957da2dca0f368cdcb2.zip
Several resync workers; add delay on retry resync
Diffstat (limited to 'src/block.rs')
-rw-r--r--src/block.rs31
1 files changed, 15 insertions, 16 deletions
diff --git a/src/block.rs b/src/block.rs
index ff38d65c..200b4201 100644
--- a/src/block.rs
+++ b/src/block.rs
@@ -18,6 +18,7 @@ use crate::rpc_client::*;
use crate::server::Garage;
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
+const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
pub struct BlockManager {
pub data_dir: PathBuf,
@@ -50,11 +51,14 @@ impl BlockManager {
}
pub async fn spawn_background_worker(self: Arc<Self>) {
- let bm2 = self.clone();
- self.system
- .background
- .spawn_worker(move |must_exit| bm2.resync_loop(must_exit))
- .await;
+ // Launch 2 simultaneous workers for background resync loop preprocessing
+ for _i in 0..2usize {
+ let bm2 = self.clone();
+ self.system
+ .background
+ .spawn_worker(move |must_exit| bm2.resync_loop(must_exit))
+ .await;
+ }
}
pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
@@ -158,7 +162,7 @@ impl BlockManager {
async fn resync_loop(self: Arc<Self>, must_exit: watch::Receiver<bool>) -> Result<(), Error> {
while !*must_exit.borrow() {
- if let Some((time_bytes, hash_bytes)) = self.resync_queue.get_gt(&[])? {
+ if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? {
let time_msec = u64_from_bytes(&time_bytes[0..8]);
eprintln!(
"First in resync queue: {} (now = {})",
@@ -170,18 +174,13 @@ impl BlockManager {
hash.copy_from_slice(hash_bytes.as_ref());
let hash = Hash::from(hash);
- match self.resync_iter(&hash).await {
- Ok(_) => {
- self.resync_queue.remove(&time_bytes)?;
- }
- Err(e) => {
- eprintln!(
- "Failed to resync hash {:?}, leaving it in queue: {}",
- hash, e
- );
- }
+ if let Err(e) = self.resync_iter(&hash).await {
+ eprintln!("Failed to resync block {:?}, retrying later: {}", hash, e);
+ self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT.as_millis() as u64)?;
}
continue;
+ } else {
+ self.resync_queue.insert(time_bytes, hash_bytes)?;
}
}
tokio::time::delay_for(Duration::from_secs(1)).await;