From b76c0c102ee07758ecf8ae4dfeef0a9a095c4136 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 26 Feb 2024 18:34:52 +0100 Subject: [refactor-put] add ordering tag to blocks being sent to storage nodes --- src/api/s3/copy.rs | 5 ++++- src/api/s3/put.rs | 8 +++++++- src/block/manager.rs | 12 +++++++++++- 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 7eb1fe60..880ce5f4 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -387,7 +387,10 @@ pub async fn handle_upload_part_copy( // we need to insert that data as a new block. async move { if must_upload { - garage2.block_manager.rpc_put_block(final_hash, data).await + garage2 + .block_manager + .rpc_put_block(final_hash, data, None) + .await } else { Ok(()) } diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 10a018e4..489f1136 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -20,6 +20,7 @@ use opentelemetry::{ }; use garage_net::bytes_buf::BytesBuf; +use garage_rpc::rpc_helper::OrderTag; use garage_table::*; use garage_util::async_hash::*; use garage_util::data::*; @@ -380,6 +381,7 @@ pub(crate) async fn read_and_put_blocks> + let put_blocks = async { // Structure for handling several concurrent writes to storage nodes + let order_stream = OrderTag::stream(); let mut write_futs = FuturesOrdered::new(); let mut written_bytes = 0u64; loop { @@ -421,6 +423,7 @@ pub(crate) async fn read_and_put_blocks> + offset, hash, block, + order_stream.order(written_bytes), )); } while let Some(res) = write_futs.next().await { @@ -450,6 +453,7 @@ async fn put_block_and_meta( offset: u64, hash: Hash, block: Bytes, + order_tag: OrderTag, ) -> Result<(), GarageError> { let mut version = version.clone(); version.blocks.put( @@ -470,7 +474,9 @@ async fn put_block_and_meta( }; futures::try_join!( - garage.block_manager.rpc_put_block(hash, block), + garage + .block_manager + .rpc_put_block(hash, block, Some(order_tag)), garage.version_table.insert(&version), garage.block_ref_table.insert(&block_ref), )?; diff --git a/src/block/manager.rs b/src/block/manager.rs index 817866f6..890ea8b7 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -346,7 +346,12 @@ impl BlockManager { } /// Send block to nodes that should have it - pub async fn rpc_put_block(&self, hash: Hash, data: Bytes) -> Result<(), Error> { + pub async fn rpc_put_block( + &self, + hash: Hash, + data: Bytes, + order_tag: Option, + ) -> Result<(), Error> { let who = self.replication.write_nodes(&hash); let (header, bytes) = DataBlock::from_buffer(data, self.compression_level) @@ -354,6 +359,11 @@ impl BlockManager { .into_parts(); let put_block_rpc = Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes); + let put_block_rpc = if let Some(tag) = order_tag { + put_block_rpc.with_order_tag(tag) + } else { + put_block_rpc + }; self.system .rpc -- cgit v1.2.3