From 3882d5ba36f48751fdf6e5b82eae0dd990238655 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 5 Mar 2021 15:09:18 +0100 Subject: Remove epidemic propagation for fully replicated stuff: write directly to all nodes --- src/model/block.rs | 36 +++++++++++++++++++++++++++--------- src/model/garage.rs | 5 +---- 2 files changed, 28 insertions(+), 13 deletions(-) (limited to 'src/model') diff --git a/src/model/block.rs b/src/model/block.rs index 056d9098..56c85c6a 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -334,17 +334,28 @@ impl BlockManager { } if need_nodes.len() > 0 { - trace!("Block {:?} needed by {} nodes, sending", hash, need_nodes.len()); + trace!( + "Block {:?} needed by {} nodes, sending", + hash, + need_nodes.len() + ); let put_block_message = Arc::new(self.read_block(hash).await?); let put_resps = join_all(need_nodes.iter().map(|to| { - self.rpc_client.call_arc(*to, put_block_message.clone(), BLOCK_RW_TIMEOUT) - })).await; + self.rpc_client + .call_arc(*to, put_block_message.clone(), BLOCK_RW_TIMEOUT) + })) + .await; for resp in put_resps { resp?; } } - trace!("Deleting block {:?}, offload finished ({} / {})", hash, need_nodes.len(), who.len()); + trace!( + "Deleting block {:?}, offload finished ({} / {})", + hash, + need_nodes.len(), + who.len() + ); fs::remove_file(path).await?; self.resync_queue.remove(&hash)?; @@ -391,7 +402,7 @@ impl BlockManager { .try_call_many( &who[..], Message::PutBlock(PutBlockMessage { hash, data }), - RequestStrategy::with_quorum(self.replication.write_quorum()) + RequestStrategy::with_quorum(self.replication.write_quorum(&self.system)) .with_timeout(BLOCK_RW_TIMEOUT), ) .await?; @@ -420,12 +431,17 @@ impl BlockManager { } // 2. Repair blocks actually on disk - self.repair_aux_read_dir_rec(&self.data_dir, must_exit).await?; + self.repair_aux_read_dir_rec(&self.data_dir, must_exit) + .await?; Ok(()) } - fn repair_aux_read_dir_rec<'a>(&'a self, path: &'a PathBuf, must_exit: &'a watch::Receiver) -> BoxFuture<'a, Result<(), Error>> { + fn repair_aux_read_dir_rec<'a>( + &'a self, + path: &'a PathBuf, + must_exit: &'a watch::Receiver, + ) -> BoxFuture<'a, Result<(), Error>> { // Lists all blocks on disk and adds them to the resync queue. // This allows us to find blocks we are storing but don't actually need, // so that we can offload them if necessary and then delete them locally. @@ -441,7 +457,8 @@ impl BlockManager { let ent_type = data_dir_ent.file_type().await?; if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() { - self.repair_aux_read_dir_rec(&data_dir_ent.path(), must_exit).await?; + self.repair_aux_read_dir_rec(&data_dir_ent.path(), must_exit) + .await?; } else if name.len() == 64 { let hash_bytes = match hex::decode(&name) { Ok(h) => h, @@ -457,7 +474,8 @@ impl BlockManager { } } Ok(()) - }.boxed() + } + .boxed() } } diff --git a/src/model/garage.rs b/src/model/garage.rs index 46e0d02f..467d0aec 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -65,10 +65,7 @@ impl Garage { read_quorum: (config.meta_replication_factor + 1) / 2, }; - let control_rep_param = TableFullReplication::new( - config.meta_epidemic_fanout, - (config.meta_epidemic_fanout + 1) / 2, - ); + let control_rep_param = TableFullReplication::new(config.control_write_max_faults); info!("Initialize block manager..."); let block_manager = BlockManager::new( -- cgit v1.2.3