aboutsummaryrefslogtreecommitdiff
path: root/src/block/resync.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/block/resync.rs')
-rw-r--r--src/block/resync.rs35
1 files changed, 22 insertions, 13 deletions
diff --git a/src/block/resync.rs b/src/block/resync.rs
index 39e4d50f..bde3e98c 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -7,7 +7,6 @@ use arc_swap::ArcSwap;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
-use futures::future::*;
use tokio::select;
use tokio::sync::{watch, Notify};
@@ -36,7 +35,11 @@ use crate::manager::*;
// Timeout for RPCs that ask other nodes whether they need a copy
// of a given block before we delete it locally
-pub(crate) const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
+// The timeout here is relatively low because we don't want to block
+// the entire resync loop when some nodes are not responding.
+// Nothing will be deleted if the nodes don't answer the queries,
+// we will just retry later.
+const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(15);
// The delay between the time where a resync operation fails
// and the time when it is retried, with exponential backoff
@@ -336,24 +339,24 @@ impl BlockResyncManager {
}
who.retain(|id| *id != manager.system.id);
- let msg = Arc::new(BlockRpc::NeedBlockQuery(*hash));
- let who_needs_fut = who.iter().map(|to| {
- manager.system.rpc.call_arc(
+ let who_needs_resps = manager
+ .system
+ .rpc
+ .call_many(
&manager.endpoint,
- *to,
- msg.clone(),
+ &who,
+ BlockRpc::NeedBlockQuery(*hash),
RequestStrategy::with_priority(PRIO_BACKGROUND)
.with_timeout(NEED_BLOCK_QUERY_TIMEOUT),
)
- });
- let who_needs_resps = join_all(who_needs_fut).await;
+ .await?;
let mut need_nodes = vec![];
- for (node, needed) in who.iter().zip(who_needs_resps.into_iter()) {
+ for (node, needed) in who_needs_resps {
match needed.err_context("NeedBlockQuery RPC")? {
BlockRpc::NeedBlockReply(needed) => {
if needed {
- need_nodes.push(*node);
+ need_nodes.push(node);
}
}
m => {
@@ -376,7 +379,13 @@ impl BlockResyncManager {
.add(1, &[KeyValue::new("to", format!("{:?}", node))]);
}
- let put_block_message = manager.read_block(hash).await?;
+ let block = manager.read_block(hash).await?;
+ let (header, bytes) = block.into_parts();
+ let put_block_message = Req::new(BlockRpc::PutBlock {
+ hash: *hash,
+ header,
+ })?
+ .with_stream_from_buffer(bytes);
manager
.system
.rpc
@@ -409,7 +418,7 @@ impl BlockResyncManager {
hash
);
- let block_data = manager.rpc_get_raw_block(hash).await?;
+ let block_data = manager.rpc_get_raw_block(hash, None).await?;
manager.metrics.resync_recv_counter.add(1);