diff options
Diffstat (limited to 'src/model/block.rs')
-rw-r--r-- | src/model/block.rs | 36 |
1 files changed, 27 insertions, 9 deletions
diff --git a/src/model/block.rs b/src/model/block.rs index 056d9098..56c85c6a 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -334,17 +334,28 @@ impl BlockManager { } if need_nodes.len() > 0 { - trace!("Block {:?} needed by {} nodes, sending", hash, need_nodes.len()); + trace!( + "Block {:?} needed by {} nodes, sending", + hash, + need_nodes.len() + ); let put_block_message = Arc::new(self.read_block(hash).await?); let put_resps = join_all(need_nodes.iter().map(|to| { - self.rpc_client.call_arc(*to, put_block_message.clone(), BLOCK_RW_TIMEOUT) - })).await; + self.rpc_client + .call_arc(*to, put_block_message.clone(), BLOCK_RW_TIMEOUT) + })) + .await; for resp in put_resps { resp?; } } - trace!("Deleting block {:?}, offload finished ({} / {})", hash, need_nodes.len(), who.len()); + trace!( + "Deleting block {:?}, offload finished ({} / {})", + hash, + need_nodes.len(), + who.len() + ); fs::remove_file(path).await?; self.resync_queue.remove(&hash)?; @@ -391,7 +402,7 @@ impl BlockManager { .try_call_many( &who[..], Message::PutBlock(PutBlockMessage { hash, data }), - RequestStrategy::with_quorum(self.replication.write_quorum()) + RequestStrategy::with_quorum(self.replication.write_quorum(&self.system)) .with_timeout(BLOCK_RW_TIMEOUT), ) .await?; @@ -420,12 +431,17 @@ impl BlockManager { } // 2. Repair blocks actually on disk - self.repair_aux_read_dir_rec(&self.data_dir, must_exit).await?; + self.repair_aux_read_dir_rec(&self.data_dir, must_exit) + .await?; Ok(()) } - fn repair_aux_read_dir_rec<'a>(&'a self, path: &'a PathBuf, must_exit: &'a watch::Receiver<bool>) -> BoxFuture<'a, Result<(), Error>> { + fn repair_aux_read_dir_rec<'a>( + &'a self, + path: &'a PathBuf, + must_exit: &'a watch::Receiver<bool>, + ) -> BoxFuture<'a, Result<(), Error>> { // Lists all blocks on disk and adds them to the resync queue. // This allows us to find blocks we are storing but don't actually need, // so that we can offload them if necessary and then delete them locally. @@ -441,7 +457,8 @@ impl BlockManager { let ent_type = data_dir_ent.file_type().await?; if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() { - self.repair_aux_read_dir_rec(&data_dir_ent.path(), must_exit).await?; + self.repair_aux_read_dir_rec(&data_dir_ent.path(), must_exit) + .await?; } else if name.len() == 64 { let hash_bytes = match hex::decode(&name) { Ok(h) => h, @@ -457,7 +474,8 @@ impl BlockManager { } } Ok(()) - }.boxed() + } + .boxed() } } |