aboutsummaryrefslogtreecommitdiff
path: root/src/model/block.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-03-05 15:09:18 +0100
committerAlex Auvolat <alex@adnab.me>2021-03-05 15:09:18 +0100
commit3882d5ba36f48751fdf6e5b82eae0dd990238655 (patch)
treedbf412ac4643c27617cd613eea04682da25d6a66 /src/model/block.rs
parentd7e148d3027b7092d7de7e561665d2667199e9bc (diff)
downloadgarage-3882d5ba36f48751fdf6e5b82eae0dd990238655.tar.gz
garage-3882d5ba36f48751fdf6e5b82eae0dd990238655.zip
Remove epidemic propagation for fully replicated stuff: write directly to all nodes
Diffstat (limited to 'src/model/block.rs')
-rw-r--r--src/model/block.rs36
1 files changed, 27 insertions, 9 deletions
diff --git a/src/model/block.rs b/src/model/block.rs
index 056d9098..56c85c6a 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -334,17 +334,28 @@ impl BlockManager {
}
if need_nodes.len() > 0 {
- trace!("Block {:?} needed by {} nodes, sending", hash, need_nodes.len());
+ trace!(
+ "Block {:?} needed by {} nodes, sending",
+ hash,
+ need_nodes.len()
+ );
let put_block_message = Arc::new(self.read_block(hash).await?);
let put_resps = join_all(need_nodes.iter().map(|to| {
- self.rpc_client.call_arc(*to, put_block_message.clone(), BLOCK_RW_TIMEOUT)
- })).await;
+ self.rpc_client
+ .call_arc(*to, put_block_message.clone(), BLOCK_RW_TIMEOUT)
+ }))
+ .await;
for resp in put_resps {
resp?;
}
}
- trace!("Deleting block {:?}, offload finished ({} / {})", hash, need_nodes.len(), who.len());
+ trace!(
+ "Deleting block {:?}, offload finished ({} / {})",
+ hash,
+ need_nodes.len(),
+ who.len()
+ );
fs::remove_file(path).await?;
self.resync_queue.remove(&hash)?;
@@ -391,7 +402,7 @@ impl BlockManager {
.try_call_many(
&who[..],
Message::PutBlock(PutBlockMessage { hash, data }),
- RequestStrategy::with_quorum(self.replication.write_quorum())
+ RequestStrategy::with_quorum(self.replication.write_quorum(&self.system))
.with_timeout(BLOCK_RW_TIMEOUT),
)
.await?;
@@ -420,12 +431,17 @@ impl BlockManager {
}
// 2. Repair blocks actually on disk
- self.repair_aux_read_dir_rec(&self.data_dir, must_exit).await?;
+ self.repair_aux_read_dir_rec(&self.data_dir, must_exit)
+ .await?;
Ok(())
}
- fn repair_aux_read_dir_rec<'a>(&'a self, path: &'a PathBuf, must_exit: &'a watch::Receiver<bool>) -> BoxFuture<'a, Result<(), Error>> {
+ fn repair_aux_read_dir_rec<'a>(
+ &'a self,
+ path: &'a PathBuf,
+ must_exit: &'a watch::Receiver<bool>,
+ ) -> BoxFuture<'a, Result<(), Error>> {
// Lists all blocks on disk and adds them to the resync queue.
// This allows us to find blocks we are storing but don't actually need,
// so that we can offload them if necessary and then delete them locally.
@@ -441,7 +457,8 @@ impl BlockManager {
let ent_type = data_dir_ent.file_type().await?;
if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
- self.repair_aux_read_dir_rec(&data_dir_ent.path(), must_exit).await?;
+ self.repair_aux_read_dir_rec(&data_dir_ent.path(), must_exit)
+ .await?;
} else if name.len() == 64 {
let hash_bytes = match hex::decode(&name) {
Ok(h) => h,
@@ -457,7 +474,8 @@ impl BlockManager {
}
}
Ok(())
- }.boxed()
+ }
+ .boxed()
}
}