aboutsummaryrefslogtreecommitdiff
path: root/src/block/manager.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r--src/block/manager.rs34
1 files changed, 14 insertions, 20 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 890ea8b7..c7e4df17 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -261,8 +261,10 @@ impl BlockManager {
F: Fn(DataBlockStream) -> Fut,
Fut: futures::Future<Output = Result<T, Error>>,
{
- let who = self.replication.read_nodes(hash);
- let who = self.system.rpc.request_order(&who);
+ let who = self
+ .system
+ .rpc_helper()
+ .block_read_nodes_of(hash, self.system.rpc_helper());
for node in who.iter() {
let node_id = NodeID::from(*node);
@@ -302,7 +304,7 @@ impl BlockManager {
// if the first one doesn't succeed rapidly
// TODO: keep first request running when initiating a new one and take the
// one that finishes earlier
- _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => {
+ _ = tokio::time::sleep(self.system.rpc_helper().rpc_timeout()) => {
debug!("Get block {:?}: node {:?} didn't return block in time, trying next.", hash, node);
}
};
@@ -335,26 +337,18 @@ impl BlockManager {
}
}
- /// Ask nodes that might have a block for it, return it as one big Bytes
- pub async fn rpc_get_block(
- &self,
- hash: &Hash,
- order_tag: Option<OrderTag>,
- ) -> Result<Bytes, Error> {
- let stream = self.rpc_get_block_streaming(hash, order_tag).await?;
- Ok(read_stream_to_end(stream).await?.into_bytes())
- }
-
/// Send block to nodes that should have it
pub async fn rpc_put_block(
&self,
hash: Hash,
data: Bytes,
+ prevent_compression: bool,
order_tag: Option<OrderTag>,
) -> Result<(), Error> {
- let who = self.replication.write_nodes(&hash);
+ let who = self.replication.write_sets(&hash);
- let (header, bytes) = DataBlock::from_buffer(data, self.compression_level)
+ let compression_level = self.compression_level.filter(|_| !prevent_compression);
+ let (header, bytes) = DataBlock::from_buffer(data, compression_level)
.await
.into_parts();
let put_block_rpc =
@@ -366,10 +360,10 @@ impl BlockManager {
};
self.system
- .rpc
- .try_call_many(
+ .rpc_helper()
+ .try_write_many_sets(
&self.endpoint,
- &who[..],
+ who.as_ref(),
put_block_rpc,
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
.with_quorum(self.replication.write_quorum()),
@@ -442,7 +436,7 @@ impl BlockManager {
tokio::spawn(async move {
if let Err(e) = this
.resync
- .put_to_resync(&hash, 2 * this.system.rpc.rpc_timeout())
+ .put_to_resync(&hash, 2 * this.system.rpc_helper().rpc_timeout())
{
error!("Block {:?} could not be put in resync queue: {}.", hash, e);
}
@@ -536,7 +530,7 @@ impl BlockManager {
None => {
// Not found but maybe we should have had it ??
self.resync
- .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?;
+ .put_to_resync(hash, 2 * self.system.rpc_helper().rpc_timeout())?;
return Err(Error::Message(format!(
"block {:?} not found on node",
hash