diff options
-rw-r--r-- | src/block.rs | 31 | ||||
-rw-r--r-- | src/table.rs | 10 |
2 files changed, 20 insertions, 21 deletions
diff --git a/src/block.rs b/src/block.rs index ff38d65c..200b4201 100644 --- a/src/block.rs +++ b/src/block.rs @@ -18,6 +18,7 @@ use crate::rpc_client::*; use crate::server::Garage; const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); +const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10); pub struct BlockManager { pub data_dir: PathBuf, @@ -50,11 +51,14 @@ impl BlockManager { } pub async fn spawn_background_worker(self: Arc<Self>) { - let bm2 = self.clone(); - self.system - .background - .spawn_worker(move |must_exit| bm2.resync_loop(must_exit)) - .await; + // Launch 2 simultaneous workers for background resync loop preprocessing + for _i in 0..2usize { + let bm2 = self.clone(); + self.system + .background + .spawn_worker(move |must_exit| bm2.resync_loop(must_exit)) + .await; + } } pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> { @@ -158,7 +162,7 @@ impl BlockManager { async fn resync_loop(self: Arc<Self>, must_exit: watch::Receiver<bool>) -> Result<(), Error> { while !*must_exit.borrow() { - if let Some((time_bytes, hash_bytes)) = self.resync_queue.get_gt(&[])? { + if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? { let time_msec = u64_from_bytes(&time_bytes[0..8]); eprintln!( "First in resync queue: {} (now = {})", @@ -170,18 +174,13 @@ impl BlockManager { hash.copy_from_slice(hash_bytes.as_ref()); let hash = Hash::from(hash); - match self.resync_iter(&hash).await { - Ok(_) => { - self.resync_queue.remove(&time_bytes)?; - } - Err(e) => { - eprintln!( - "Failed to resync hash {:?}, leaving it in queue: {}", - hash, e - ); - } + if let Err(e) = self.resync_iter(&hash).await { + eprintln!("Failed to resync block {:?}, retrying later: {}", hash, e); + self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT.as_millis() as u64)?; } continue; + } else { + self.resync_queue.insert(time_bytes, hash_bytes)?; } } tokio::time::delay_for(Duration::from_secs(1)).await; diff --git a/src/table.rs b/src/table.rs index 9b67ad94..d2c7de94 100644 --- a/src/table.rs +++ b/src/table.rs @@ -326,7 +326,7 @@ impl<F: TableSchema + 'static> Table<F> { rpc: &TableRPC<F>, quorum: usize, ) -> Result<Vec<TableRPC<F>>, Error> { - eprintln!("Table RPC to {:?}: {}", who, serde_json::to_string(&rpc)?); + //eprintln!("Table RPC to {:?}: {}", who, serde_json::to_string(&rpc)?); let rpc_bytes = rmp_to_vec_all_named(rpc)?; let rpc_msg = Message::TableRPC(self.name.to_string(), rpc_bytes); @@ -353,10 +353,10 @@ impl<F: TableSchema + 'static> Table<F> { resp ))); } - eprintln!( - "Table RPC responses: {}", - serde_json::to_string(&resps_vals)? - ); + //eprintln!( + // "Table RPC responses: {}", + // serde_json::to_string(&resps_vals)? + //); Ok(resps_vals) } |