aboutsummaryrefslogtreecommitdiff
path: root/src/block/manager.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r--src/block/manager.rs55
1 files changed, 39 insertions, 16 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index b8fe4c74..b9f6fc0f 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -33,6 +33,7 @@ use garage_util::metrics::RecordDuration;
use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer;
+use garage_rpc::rpc_helper::OrderTag;
use garage_rpc::system::System;
use garage_rpc::*;
@@ -70,7 +71,7 @@ pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600);
pub enum BlockRpc {
Ok,
/// Message to ask for a block of data, by hash
- GetBlock(Hash),
+ GetBlock(Hash, Option<OrderTag>),
/// Message to send a block of data, either because requested, of for first delivery of new
/// block
PutBlock {
@@ -183,15 +184,18 @@ impl BlockManager {
async fn rpc_get_raw_block_streaming(
&self,
hash: &Hash,
+ order_tag: Option<OrderTag>,
) -> Result<(DataBlockHeader, ByteStream), Error> {
let who = self.replication.read_nodes(hash);
//let who = self.system.rpc.request_order(&who);
for node in who.iter() {
let node_id = NodeID::from(*node);
- let rpc =
- self.endpoint
- .call_streaming(&node_id, BlockRpc::GetBlock(*hash), PRIO_NORMAL);
+ let rpc = self.endpoint.call_streaming(
+ &node_id,
+ BlockRpc::GetBlock(*hash, order_tag),
+ PRIO_NORMAL,
+ );
tokio::select! {
res = rpc => {
let res = match res {
@@ -224,15 +228,21 @@ impl BlockManager {
/// Ask nodes that might have a (possibly compressed) block for it
/// Return its entire body
- async fn rpc_get_raw_block(&self, hash: &Hash) -> Result<DataBlock, Error> {
+ async fn rpc_get_raw_block(
+ &self,
+ hash: &Hash,
+ order_tag: Option<OrderTag>,
+ ) -> Result<DataBlock, Error> {
let who = self.replication.read_nodes(hash);
//let who = self.system.rpc.request_order(&who);
for node in who.iter() {
let node_id = NodeID::from(*node);
- let rpc =
- self.endpoint
- .call_streaming(&node_id, BlockRpc::GetBlock(*hash), PRIO_NORMAL);
+ let rpc = self.endpoint.call_streaming(
+ &node_id,
+ BlockRpc::GetBlock(*hash, order_tag),
+ PRIO_NORMAL,
+ );
tokio::select! {
res = rpc => {
let res = match res {
@@ -275,11 +285,12 @@ impl BlockManager {
pub async fn rpc_get_block_streaming(
&self,
hash: &Hash,
+ order_tag: Option<OrderTag>,
) -> Result<
Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static>>,
Error,
> {
- let (header, stream) = self.rpc_get_raw_block_streaming(hash).await?;
+ let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
match header {
DataBlockHeader::Plain => Ok(Box::pin(stream.map_err(|_| {
std::io::Error::new(std::io::ErrorKind::Other, "netapp stream error")
@@ -295,8 +306,14 @@ impl BlockManager {
}
/// Ask nodes that might have a block for it
- pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Bytes, Error> {
- self.rpc_get_raw_block(hash).await?.verify_get(*hash)
+ pub async fn rpc_get_block(
+ &self,
+ hash: &Hash,
+ order_tag: Option<OrderTag>,
+ ) -> Result<Bytes, Error> {
+ self.rpc_get_raw_block(hash, order_tag)
+ .await?
+ .verify_get(*hash)
}
/// Send block to nodes that should have it
@@ -441,7 +458,7 @@ impl BlockManager {
Ok(())
}
- async fn handle_get_block(&self, hash: &Hash) -> Resp<BlockRpc> {
+ async fn handle_get_block(&self, hash: &Hash, order_tag: Option<OrderTag>) -> Resp<BlockRpc> {
let block = match self.read_block(hash).await {
Ok(data) => data,
Err(e) => return Resp::new(Err(e)),
@@ -449,11 +466,17 @@ impl BlockManager {
let (header, data) = block.into_parts();
- Resp::new(Ok(BlockRpc::PutBlock {
+ let resp = Resp::new(Ok(BlockRpc::PutBlock {
hash: *hash,
header,
}))
- .with_stream_from_buffer(data)
+ .with_stream_from_buffer(data);
+
+ if let Some(order_tag) = order_tag {
+ resp.with_order_tag(order_tag)
+ } else {
+ resp
+ }
}
/// Read block from disk, verifying it's integrity
@@ -841,7 +864,7 @@ impl BlockManager {
hash
);
- let block_data = self.rpc_get_raw_block(hash).await?;
+ let block_data = self.rpc_get_raw_block(hash, None).await?;
self.metrics.resync_recv_counter.add(1);
@@ -861,7 +884,7 @@ impl StreamingEndpointHandler<BlockRpc> for BlockManager {
.await
.map(|_| BlockRpc::Ok),
),
- BlockRpc::GetBlock(h) => self.handle_get_block(h).await,
+ BlockRpc::GetBlock(h, order_tag) => self.handle_get_block(h, *order_tag).await,
BlockRpc::NeedBlockQuery(h) => {
Resp::new(self.need_block(h).await.map(BlockRpc::NeedBlockReply))
}