diff options
author | Alex Auvolat <alex@adnab.me> | 2024-02-26 18:34:52 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2024-02-26 18:35:11 +0100 |
commit | b76c0c102ee07758ecf8ae4dfeef0a9a095c4136 (patch) | |
tree | 0926357ab8255924cbb4c857dd0687c5eadbd223 /src/api/s3/put.rs | |
parent | babccd2ad39c0a626d82521b2d4128ec6f194814 (diff) | |
download | garage-b76c0c102ee07758ecf8ae4dfeef0a9a095c4136.tar.gz garage-b76c0c102ee07758ecf8ae4dfeef0a9a095c4136.zip |
[refactor-put] add ordering tag to blocks being sent to storage nodes
Diffstat (limited to 'src/api/s3/put.rs')
-rw-r--r-- | src/api/s3/put.rs | 8 |
1 files changed, 7 insertions, 1 deletions
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 10a018e4..489f1136 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -20,6 +20,7 @@ use opentelemetry::{ }; use garage_net::bytes_buf::BytesBuf; +use garage_rpc::rpc_helper::OrderTag; use garage_table::*; use garage_util::async_hash::*; use garage_util::data::*; @@ -380,6 +381,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + let put_blocks = async { // Structure for handling several concurrent writes to storage nodes + let order_stream = OrderTag::stream(); let mut write_futs = FuturesOrdered::new(); let mut written_bytes = 0u64; loop { @@ -421,6 +423,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + offset, hash, block, + order_stream.order(written_bytes), )); } while let Some(res) = write_futs.next().await { @@ -450,6 +453,7 @@ async fn put_block_and_meta( offset: u64, hash: Hash, block: Bytes, + order_tag: OrderTag, ) -> Result<(), GarageError> { let mut version = version.clone(); version.blocks.put( @@ -470,7 +474,9 @@ async fn put_block_and_meta( }; futures::try_join!( - garage.block_manager.rpc_put_block(hash, block), + garage + .block_manager + .rpc_put_block(hash, block, Some(order_tag)), garage.version_table.insert(&version), garage.block_ref_table.insert(&block_ref), )?; |