From 45b0453d0f5b08f44dbd010c084daa87c2876945 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 18 Apr 2023 18:03:10 +0200 Subject: Ensure increasing version timestamps in PutObject --- src/api/s3/put.rs | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) (limited to 'src/api/s3/put.rs') diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index c7ac5030..bd032165 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use base64::prelude::*; use futures::prelude::*; +use futures::try_join; use hyper::body::{Body, Bytes}; use hyper::header::{HeaderMap, HeaderValue}; use hyper::{Request, Response}; @@ -35,7 +36,7 @@ pub async fn handle_put( garage: Arc, req: Request, bucket: &Bucket, - key: &str, + key: &String, content_sha256: Option, ) -> Result, Error> { // Retrieve interesting headers from request @@ -68,16 +69,27 @@ pub(crate) async fn save_stream> + Unpin>( headers: ObjectVersionHeaders, body: S, bucket: &Bucket, - key: &str, + key: &String, content_md5: Option, content_sha256: Option, ) -> Result<(Uuid, String), Error> { + let mut chunker = StreamChunker::new(body, garage.config.block_size); + let (first_block_opt, existing_object) = try_join!( + chunker.next(), + garage + .object_table + .get(&bucket.id, key) + .map_err(Error::from), + )?; + + let first_block = first_block_opt.unwrap_or_default(); + // Generate identity of new version let version_uuid = gen_uuid(); - let version_timestamp = now_msec(); - - let mut chunker = StreamChunker::new(body, garage.config.block_size); - let first_block = chunker.next().await?.unwrap_or_default(); + let version_timestamp = existing_object + .and_then(|obj| obj.versions().iter().map(|v| v.timestamp).max()) + .map(|t| std::cmp::max(t + 1, now_msec())) + .unwrap_or_else(now_msec); // If body is small enough, store it directly in the object table // as "inline data". We can then return immediately. -- cgit v1.2.3 From 3d6ed63824ac2190ba0522d897ef9addb8823140 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 18 Oct 2023 16:36:48 +0200 Subject: check_quotas: avoid re-fetching object from object table --- src/api/s3/put.rs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) (limited to 'src/api/s3/put.rs') diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index bd032165..a3fe9cca 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -87,6 +87,7 @@ pub(crate) async fn save_stream> + Unpin>( // Generate identity of new version let version_uuid = gen_uuid(); let version_timestamp = existing_object + .as_ref() .and_then(|obj| obj.versions().iter().map(|v| v.timestamp).max()) .map(|t| std::cmp::max(t + 1, now_msec())) .unwrap_or_else(now_msec); @@ -109,7 +110,7 @@ pub(crate) async fn save_stream> + Unpin>( content_sha256, )?; - check_quotas(&garage, bucket, key, size).await?; + check_quotas(&garage, bucket, key, size, existing_object.as_ref()).await?; let object_version = ObjectVersion { uuid: version_uuid, @@ -188,7 +189,7 @@ pub(crate) async fn save_stream> + Unpin>( content_sha256, )?; - check_quotas(&garage, bucket, key, total_size).await?; + check_quotas(&garage, bucket, key, total_size, existing_object.as_ref()).await?; // Save final object state, marked as Complete let md5sum_hex = hex::encode(data_md5sum); @@ -243,17 +244,18 @@ pub(crate) async fn check_quotas( bucket: &Bucket, key: &str, size: u64, + prev_object: Option<&Object>, ) -> Result<(), Error> { let quotas = bucket.state.as_option().unwrap().quotas.get(); if quotas.max_objects.is_none() && quotas.max_size.is_none() { return Ok(()); }; - let key = key.to_string(); - let (prev_object, counters) = futures::try_join!( - garage.object_table.get(&bucket.id, &key), - garage.object_counter_table.table.get(&bucket.id, &EmptyKey), - )?; + let counters = garage + .object_counter_table + .table + .get(&bucket.id, &EmptyKey) + .await?; let counters = counters .map(|x| x.filtered_values(&garage.system.ring.borrow())) @@ -287,7 +289,7 @@ pub(crate) async fn check_quotas( if cnt_size_diff > 0 && current_size + cnt_size_diff > ms as i64 { return Err(Error::forbidden(format!( "Bucket size quota is reached, maximum total size of objects for this bucket: {}. The bucket is already {} bytes, and this object would add {} bytes.", - ms, current_size, size + ms, current_size, cnt_size_diff ))); } } -- cgit v1.2.3 From c6cde1f1437a6cab90b22df6fe0641e5ad34c287 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 20 Oct 2023 13:20:47 +0200 Subject: remove now-unused key parameter in check_quotas --- src/api/s3/put.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'src/api/s3/put.rs') diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index a3fe9cca..62a1f76a 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -110,7 +110,7 @@ pub(crate) async fn save_stream> + Unpin>( content_sha256, )?; - check_quotas(&garage, bucket, key, size, existing_object.as_ref()).await?; + check_quotas(&garage, bucket, size, existing_object.as_ref()).await?; let object_version = ObjectVersion { uuid: version_uuid, @@ -189,7 +189,7 @@ pub(crate) async fn save_stream> + Unpin>( content_sha256, )?; - check_quotas(&garage, bucket, key, total_size, existing_object.as_ref()).await?; + check_quotas(&garage, bucket, total_size, existing_object.as_ref()).await?; // Save final object state, marked as Complete let md5sum_hex = hex::encode(data_md5sum); @@ -242,7 +242,6 @@ pub(crate) fn ensure_checksum_matches( pub(crate) async fn check_quotas( garage: &Arc, bucket: &Bucket, - key: &str, size: u64, prev_object: Option<&Object>, ) -> Result<(), Error> { -- cgit v1.2.3 From 8686cfd0b10a49048021102a08d637b0d4fe6a91 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 20 Oct 2023 13:37:37 +0200 Subject: s3 api: also ensure increasing timestamps for create_multipart_upload --- src/api/s3/put.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) (limited to 'src/api/s3/put.rs') diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 62a1f76a..c4df7561 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -86,11 +86,7 @@ pub(crate) async fn save_stream> + Unpin>( // Generate identity of new version let version_uuid = gen_uuid(); - let version_timestamp = existing_object - .as_ref() - .and_then(|obj| obj.versions().iter().map(|v| v.timestamp).max()) - .map(|t| std::cmp::max(t + 1, now_msec())) - .unwrap_or_else(now_msec); + let version_timestamp = next_timestamp(&existing_object); // If body is small enough, store it directly in the object table // as "inline data". We can then return immediately. @@ -532,3 +528,11 @@ pub(crate) fn get_headers(headers: &HeaderMap) -> Result) -> u64 { + existing_object + .as_ref() + .and_then(|obj| obj.versions().iter().map(|v| v.timestamp).max()) + .map(|t| std::cmp::max(t + 1, now_msec())) + .unwrap_or_else(now_msec) +} -- cgit v1.2.3 From c82d91c6bccf307186332b6c5c6fc0b128b1b2b1 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 20 Oct 2023 13:55:34 +0200 Subject: DeleteObject: always insert a deletion marker with a bigger timestamp than everything before --- src/api/s3/put.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/api/s3/put.rs') diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index c4df7561..606facc4 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -86,7 +86,7 @@ pub(crate) async fn save_stream> + Unpin>( // Generate identity of new version let version_uuid = gen_uuid(); - let version_timestamp = next_timestamp(&existing_object); + let version_timestamp = next_timestamp(existing_object.as_ref()); // If body is small enough, store it directly in the object table // as "inline data". We can then return immediately. @@ -529,7 +529,7 @@ pub(crate) fn get_headers(headers: &HeaderMap) -> Result) -> u64 { +pub(crate) fn next_timestamp(existing_object: Option<&Object>) -> u64 { existing_object .as_ref() .and_then(|obj| obj.versions().iter().map(|v| v.timestamp).max()) -- cgit v1.2.3