diff options
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r-- | src/block/manager.rs | 55 |
1 files changed, 39 insertions, 16 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs index b8fe4c74..b9f6fc0f 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -33,6 +33,7 @@ use garage_util::metrics::RecordDuration; use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; +use garage_rpc::rpc_helper::OrderTag; use garage_rpc::system::System; use garage_rpc::*; @@ -70,7 +71,7 @@ pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600); pub enum BlockRpc { Ok, /// Message to ask for a block of data, by hash - GetBlock(Hash), + GetBlock(Hash, Option<OrderTag>), /// Message to send a block of data, either because requested, of for first delivery of new /// block PutBlock { @@ -183,15 +184,18 @@ impl BlockManager { async fn rpc_get_raw_block_streaming( &self, hash: &Hash, + order_tag: Option<OrderTag>, ) -> Result<(DataBlockHeader, ByteStream), Error> { let who = self.replication.read_nodes(hash); //let who = self.system.rpc.request_order(&who); for node in who.iter() { let node_id = NodeID::from(*node); - let rpc = - self.endpoint - .call_streaming(&node_id, BlockRpc::GetBlock(*hash), PRIO_NORMAL); + let rpc = self.endpoint.call_streaming( + &node_id, + BlockRpc::GetBlock(*hash, order_tag), + PRIO_NORMAL, + ); tokio::select! { res = rpc => { let res = match res { @@ -224,15 +228,21 @@ impl BlockManager { /// Ask nodes that might have a (possibly compressed) block for it /// Return its entire body - async fn rpc_get_raw_block(&self, hash: &Hash) -> Result<DataBlock, Error> { + async fn rpc_get_raw_block( + &self, + hash: &Hash, + order_tag: Option<OrderTag>, + ) -> Result<DataBlock, Error> { let who = self.replication.read_nodes(hash); //let who = self.system.rpc.request_order(&who); for node in who.iter() { let node_id = NodeID::from(*node); - let rpc = - self.endpoint - .call_streaming(&node_id, BlockRpc::GetBlock(*hash), PRIO_NORMAL); + let rpc = self.endpoint.call_streaming( + &node_id, + BlockRpc::GetBlock(*hash, order_tag), + PRIO_NORMAL, + ); tokio::select! { res = rpc => { let res = match res { @@ -275,11 +285,12 @@ impl BlockManager { pub async fn rpc_get_block_streaming( &self, hash: &Hash, + order_tag: Option<OrderTag>, ) -> Result< Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static>>, Error, > { - let (header, stream) = self.rpc_get_raw_block_streaming(hash).await?; + let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?; match header { DataBlockHeader::Plain => Ok(Box::pin(stream.map_err(|_| { std::io::Error::new(std::io::ErrorKind::Other, "netapp stream error") @@ -295,8 +306,14 @@ impl BlockManager { } /// Ask nodes that might have a block for it - pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Bytes, Error> { - self.rpc_get_raw_block(hash).await?.verify_get(*hash) + pub async fn rpc_get_block( + &self, + hash: &Hash, + order_tag: Option<OrderTag>, + ) -> Result<Bytes, Error> { + self.rpc_get_raw_block(hash, order_tag) + .await? + .verify_get(*hash) } /// Send block to nodes that should have it @@ -441,7 +458,7 @@ impl BlockManager { Ok(()) } - async fn handle_get_block(&self, hash: &Hash) -> Resp<BlockRpc> { + async fn handle_get_block(&self, hash: &Hash, order_tag: Option<OrderTag>) -> Resp<BlockRpc> { let block = match self.read_block(hash).await { Ok(data) => data, Err(e) => return Resp::new(Err(e)), @@ -449,11 +466,17 @@ impl BlockManager { let (header, data) = block.into_parts(); - Resp::new(Ok(BlockRpc::PutBlock { + let resp = Resp::new(Ok(BlockRpc::PutBlock { hash: *hash, header, })) - .with_stream_from_buffer(data) + .with_stream_from_buffer(data); + + if let Some(order_tag) = order_tag { + resp.with_order_tag(order_tag) + } else { + resp + } } /// Read block from disk, verifying it's integrity @@ -841,7 +864,7 @@ impl BlockManager { hash ); - let block_data = self.rpc_get_raw_block(hash).await?; + let block_data = self.rpc_get_raw_block(hash, None).await?; self.metrics.resync_recv_counter.add(1); @@ -861,7 +884,7 @@ impl StreamingEndpointHandler<BlockRpc> for BlockManager { .await .map(|_| BlockRpc::Ok), ), - BlockRpc::GetBlock(h) => self.handle_get_block(h).await, + BlockRpc::GetBlock(h, order_tag) => self.handle_get_block(h, *order_tag).await, BlockRpc::NeedBlockQuery(h) => { Resp::new(self.need_block(h).await.map(BlockRpc::NeedBlockReply)) } |