aboutsummaryrefslogtreecommitdiff
path: root/src/block/resync.rs
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2022-09-13 15:26:08 +0200
committerAlex <alex@adnab.me>2022-09-13 15:26:08 +0200
commit11bdc971e2aaa1ef90358b7d9c1bd6a8e9743bbf (patch)
treeca460324ea6848c61c6c698a51456ff27485ea7b /src/block/resync.rs
parent309d7aef3f05657e2b969ab72442b2f2c350da03 (diff)
parentff30891999b5be5421b80b89da1037e943179d2d (diff)
downloadgarage-11bdc971e2aaa1ef90358b7d9c1bd6a8e9743bbf.tar.gz
garage-11bdc971e2aaa1ef90358b7d9c1bd6a8e9743bbf.zip
Merge pull request 'use netapp streaming body' (#343) from netapp-stream-body into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/343
Diffstat (limited to 'src/block/resync.rs')
-rw-r--r--src/block/resync.rs35
1 files changed, 22 insertions, 13 deletions
diff --git a/src/block/resync.rs b/src/block/resync.rs
index 39e4d50f..bde3e98c 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -7,7 +7,6 @@ use arc_swap::ArcSwap;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
-use futures::future::*;
use tokio::select;
use tokio::sync::{watch, Notify};
@@ -36,7 +35,11 @@ use crate::manager::*;
// Timeout for RPCs that ask other nodes whether they need a copy
// of a given block before we delete it locally
-pub(crate) const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
+// The timeout here is relatively low because we don't want to block
+// the entire resync loop when some nodes are not responding.
+// Nothing will be deleted if the nodes don't answer the queries,
+// we will just retry later.
+const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(15);
// The delay between the time where a resync operation fails
// and the time when it is retried, with exponential backoff
@@ -336,24 +339,24 @@ impl BlockResyncManager {
}
who.retain(|id| *id != manager.system.id);
- let msg = Arc::new(BlockRpc::NeedBlockQuery(*hash));
- let who_needs_fut = who.iter().map(|to| {
- manager.system.rpc.call_arc(
+ let who_needs_resps = manager
+ .system
+ .rpc
+ .call_many(
&manager.endpoint,
- *to,
- msg.clone(),
+ &who,
+ BlockRpc::NeedBlockQuery(*hash),
RequestStrategy::with_priority(PRIO_BACKGROUND)
.with_timeout(NEED_BLOCK_QUERY_TIMEOUT),
)
- });
- let who_needs_resps = join_all(who_needs_fut).await;
+ .await?;
let mut need_nodes = vec![];
- for (node, needed) in who.iter().zip(who_needs_resps.into_iter()) {
+ for (node, needed) in who_needs_resps {
match needed.err_context("NeedBlockQuery RPC")? {
BlockRpc::NeedBlockReply(needed) => {
if needed {
- need_nodes.push(*node);
+ need_nodes.push(node);
}
}
m => {
@@ -376,7 +379,13 @@ impl BlockResyncManager {
.add(1, &[KeyValue::new("to", format!("{:?}", node))]);
}
- let put_block_message = manager.read_block(hash).await?;
+ let block = manager.read_block(hash).await?;
+ let (header, bytes) = block.into_parts();
+ let put_block_message = Req::new(BlockRpc::PutBlock {
+ hash: *hash,
+ header,
+ })?
+ .with_stream_from_buffer(bytes);
manager
.system
.rpc
@@ -409,7 +418,7 @@ impl BlockResyncManager {
hash
);
- let block_data = manager.rpc_get_raw_block(hash).await?;
+ let block_data = manager.rpc_get_raw_block(hash, None).await?;
manager.metrics.resync_recv_counter.add(1);