diff options
Diffstat (limited to 'src/block/resync.rs')
-rw-r--r-- | src/block/resync.rs | 35 |
1 files changed, 22 insertions, 13 deletions
diff --git a/src/block/resync.rs b/src/block/resync.rs index 39e4d50f..bde3e98c 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -7,7 +7,6 @@ use arc_swap::ArcSwap; use async_trait::async_trait; use serde::{Deserialize, Serialize}; -use futures::future::*; use tokio::select; use tokio::sync::{watch, Notify}; @@ -36,7 +35,11 @@ use crate::manager::*; // Timeout for RPCs that ask other nodes whether they need a copy // of a given block before we delete it locally -pub(crate) const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); +// The timeout here is relatively low because we don't want to block +// the entire resync loop when some nodes are not responding. +// Nothing will be deleted if the nodes don't answer the queries, +// we will just retry later. +const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(15); // The delay between the time where a resync operation fails // and the time when it is retried, with exponential backoff @@ -336,24 +339,24 @@ impl BlockResyncManager { } who.retain(|id| *id != manager.system.id); - let msg = Arc::new(BlockRpc::NeedBlockQuery(*hash)); - let who_needs_fut = who.iter().map(|to| { - manager.system.rpc.call_arc( + let who_needs_resps = manager + .system + .rpc + .call_many( &manager.endpoint, - *to, - msg.clone(), + &who, + BlockRpc::NeedBlockQuery(*hash), RequestStrategy::with_priority(PRIO_BACKGROUND) .with_timeout(NEED_BLOCK_QUERY_TIMEOUT), ) - }); - let who_needs_resps = join_all(who_needs_fut).await; + .await?; let mut need_nodes = vec![]; - for (node, needed) in who.iter().zip(who_needs_resps.into_iter()) { + for (node, needed) in who_needs_resps { match needed.err_context("NeedBlockQuery RPC")? { BlockRpc::NeedBlockReply(needed) => { if needed { - need_nodes.push(*node); + need_nodes.push(node); } } m => { @@ -376,7 +379,13 @@ impl BlockResyncManager { .add(1, &[KeyValue::new("to", format!("{:?}", node))]); } - let put_block_message = manager.read_block(hash).await?; + let block = manager.read_block(hash).await?; + let (header, bytes) = block.into_parts(); + let put_block_message = Req::new(BlockRpc::PutBlock { + hash: *hash, + header, + })? + .with_stream_from_buffer(bytes); manager .system .rpc @@ -409,7 +418,7 @@ impl BlockResyncManager { hash ); - let block_data = manager.rpc_get_raw_block(hash).await?; + let block_data = manager.rpc_get_raw_block(hash, None).await?; manager.metrics.resync_recv_counter.add(1); |