aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3/put.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2024-02-26 18:34:52 +0100
committerAlex Auvolat <alex@adnab.me>2024-02-26 18:35:11 +0100
commitb76c0c102ee07758ecf8ae4dfeef0a9a095c4136 (patch)
tree0926357ab8255924cbb4c857dd0687c5eadbd223 /src/api/s3/put.rs
parentbabccd2ad39c0a626d82521b2d4128ec6f194814 (diff)
downloadgarage-b76c0c102ee07758ecf8ae4dfeef0a9a095c4136.tar.gz
garage-b76c0c102ee07758ecf8ae4dfeef0a9a095c4136.zip
[refactor-put] add ordering tag to blocks being sent to storage nodes
Diffstat (limited to 'src/api/s3/put.rs')
-rw-r--r--src/api/s3/put.rs8
1 files changed, 7 insertions, 1 deletions
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index 10a018e4..489f1136 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -20,6 +20,7 @@ use opentelemetry::{
};
use garage_net::bytes_buf::BytesBuf;
+use garage_rpc::rpc_helper::OrderTag;
use garage_table::*;
use garage_util::async_hash::*;
use garage_util::data::*;
@@ -380,6 +381,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
let put_blocks = async {
// Structure for handling several concurrent writes to storage nodes
+ let order_stream = OrderTag::stream();
let mut write_futs = FuturesOrdered::new();
let mut written_bytes = 0u64;
loop {
@@ -421,6 +423,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
offset,
hash,
block,
+ order_stream.order(written_bytes),
));
}
while let Some(res) = write_futs.next().await {
@@ -450,6 +453,7 @@ async fn put_block_and_meta(
offset: u64,
hash: Hash,
block: Bytes,
+ order_tag: OrderTag,
) -> Result<(), GarageError> {
let mut version = version.clone();
version.blocks.put(
@@ -470,7 +474,9 @@ async fn put_block_and_meta(
};
futures::try_join!(
- garage.block_manager.rpc_put_block(hash, block),
+ garage
+ .block_manager
+ .rpc_put_block(hash, block, Some(order_tag)),
garage.version_table.insert(&version),
garage.block_ref_table.insert(&block_ref),
)?;