From 45b0453d0f5b08f44dbd010c084daa87c2876945 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 18 Apr 2023 18:03:10 +0200 Subject: Ensure increasing version timestamps in PutObject --- src/api/s3/put.rs | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index c7ac5030..bd032165 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use base64::prelude::*; use futures::prelude::*; +use futures::try_join; use hyper::body::{Body, Bytes}; use hyper::header::{HeaderMap, HeaderValue}; use hyper::{Request, Response}; @@ -35,7 +36,7 @@ pub async fn handle_put( garage: Arc, req: Request, bucket: &Bucket, - key: &str, + key: &String, content_sha256: Option, ) -> Result, Error> { // Retrieve interesting headers from request @@ -68,16 +69,27 @@ pub(crate) async fn save_stream> + Unpin>( headers: ObjectVersionHeaders, body: S, bucket: &Bucket, - key: &str, + key: &String, content_md5: Option, content_sha256: Option, ) -> Result<(Uuid, String), Error> { + let mut chunker = StreamChunker::new(body, garage.config.block_size); + let (first_block_opt, existing_object) = try_join!( + chunker.next(), + garage + .object_table + .get(&bucket.id, key) + .map_err(Error::from), + )?; + + let first_block = first_block_opt.unwrap_or_default(); + // Generate identity of new version let version_uuid = gen_uuid(); - let version_timestamp = now_msec(); - - let mut chunker = StreamChunker::new(body, garage.config.block_size); - let first_block = chunker.next().await?.unwrap_or_default(); + let version_timestamp = existing_object + .and_then(|obj| obj.versions().iter().map(|v| v.timestamp).max()) + .map(|t| std::cmp::max(t + 1, now_msec())) + .unwrap_or_else(now_msec); // If body is small enough, store it directly in the object table // as "inline data". We can then return immediately. -- cgit v1.2.3