diff options
Diffstat (limited to 'src/api/s3')
-rw-r--r-- | src/api/s3/api_server.rs | 2 | ||||
-rw-r--r-- | src/api/s3/delete.rs | 42 | ||||
-rw-r--r-- | src/api/s3/list.rs | 72 | ||||
-rw-r--r-- | src/api/s3/multipart.rs | 13 | ||||
-rw-r--r-- | src/api/s3/put.rs | 47 |
5 files changed, 101 insertions, 75 deletions
diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index d675ab61..887839dd 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -344,7 +344,7 @@ impl ApiHandler for S3ApiServer { bucket_id, key, upload_id, - part_number_marker: part_number_marker.map(|p| p.clamp(1, 10000)), + part_number_marker: part_number_marker.map(|p| p.min(10000)), max_parts: max_parts.unwrap_or(1000).clamp(1, 1000), }, ) diff --git a/src/api/s3/delete.rs b/src/api/s3/delete.rs index b337155f..1c491eac 100644 --- a/src/api/s3/delete.rs +++ b/src/api/s3/delete.rs @@ -3,12 +3,12 @@ use std::sync::Arc; use hyper::{Body, Request, Response, StatusCode}; use garage_util::data::*; -use garage_util::time::*; use garage_model::garage::Garage; use garage_model::s3::object_table::*; use crate::s3::error::*; +use crate::s3::put::next_timestamp; use crate::s3::xml as s3_xml; use crate::signature::verify_signed_content; @@ -23,40 +23,36 @@ async fn handle_delete_internal( .await? .ok_or(Error::NoSuchKey)?; // No need to delete - let interesting_versions = object.versions().iter().filter(|v| { - !matches!( - v.state, - ObjectVersionState::Aborted - | ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) - ) - }); - - let mut version_to_delete = None; - let mut timestamp = now_msec(); - for v in interesting_versions { - if v.timestamp + 1 > timestamp || version_to_delete.is_none() { - version_to_delete = Some(v.uuid); + let del_timestamp = next_timestamp(Some(&object)); + let del_uuid = gen_uuid(); + + let deleted_version = object + .versions() + .iter() + .rev() + .find(|v| !matches!(&v.state, ObjectVersionState::Aborted)) + .or_else(|| object.versions().iter().rev().next()); + let deleted_version = match deleted_version { + Some(dv) => dv.uuid, + None => { + warn!("Object has no versions: {:?}", object); + Uuid::from([0u8; 32]) } - timestamp = std::cmp::max(timestamp, v.timestamp + 1); - } - - let deleted_version = version_to_delete.ok_or(Error::NoSuchKey)?; - - let version_uuid = gen_uuid(); + }; let object = Object::new( bucket_id, key.into(), vec![ObjectVersion { - uuid: version_uuid, - timestamp, + uuid: del_uuid, + timestamp: del_timestamp, state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker), }], ); garage.object_table.insert(&object).await?; - Ok((deleted_version, version_uuid)) + Ok((deleted_version, del_uuid)) } pub async fn handle_delete( diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs index 33d62518..1b9e8cd5 100644 --- a/src/api/s3/list.rs +++ b/src/api/s3/list.rs @@ -426,8 +426,10 @@ where // Drop the first key if needed // Only AfterKey requires it according to the S3 spec and our implem. match (&cursor, iter.peek()) { - (RangeBegin::AfterKey { key }, Some(object)) if &object.key == key => iter.next(), - (_, _) => None, + (RangeBegin::AfterKey { key }, Some(object)) if &object.key == key => { + iter.next(); + } + _ => (), }; while let Some(object) = iter.peek() { @@ -436,16 +438,22 @@ where return Ok(None); } - cursor = match acc.extract(query, &cursor, &mut iter) { - ExtractionResult::Extracted { key } => RangeBegin::AfterKey { key }, + match acc.extract(query, &cursor, &mut iter) { + ExtractionResult::Extracted { key } => { + cursor = RangeBegin::AfterKey { key }; + } ExtractionResult::SkipTo { key, fallback_key } => { - RangeBegin::IncludingKey { key, fallback_key } + cursor = RangeBegin::IncludingKey { key, fallback_key }; } ExtractionResult::FilledAtUpload { key, upload } => { - return Ok(Some(RangeBegin::AfterUpload { key, upload })) + return Ok(Some(RangeBegin::AfterUpload { key, upload })); + } + ExtractionResult::Filled => { + return Ok(Some(cursor)); + } + ExtractionResult::NoMore => { + return Ok(None); } - ExtractionResult::Filled => return Ok(Some(cursor)), - ExtractionResult::NoMore => return Ok(None), }; } @@ -519,8 +527,8 @@ fn fetch_part_info<'a>( /// This key can be the prefix in the base case, or intermediate /// points in the dataset if we are continuing a previous listing. impl ListObjectsQuery { - fn build_accumulator(&self) -> Accumulator<String, ObjectInfo> { - Accumulator::<String, ObjectInfo>::new(self.common.page_size) + fn build_accumulator(&self) -> ObjectAccumulator { + ObjectAccumulator::new(self.common.page_size) } fn begin(&self) -> Result<RangeBegin, Error> { @@ -529,9 +537,10 @@ impl ListObjectsQuery { // In V2 mode, the continuation token is defined as an opaque // string in the spec, so we can do whatever we want with it. // In our case, it is defined as either [ or ] (for include + // or exclude), followed by a base64-encoded string // representing the key to start with. - (Some(token), _) => match &token[..1] { - "[" => Ok(RangeBegin::IncludingKey { + (Some(token), _) => match &token.get(..1) { + Some("[") => Ok(RangeBegin::IncludingKey { key: String::from_utf8( BASE64_STANDARD .decode(token[1..].as_bytes()) @@ -539,7 +548,7 @@ impl ListObjectsQuery { )?, fallback_key: None, }), - "]" => Ok(RangeBegin::AfterKey { + Some("]") => Ok(RangeBegin::AfterKey { key: String::from_utf8( BASE64_STANDARD .decode(token[1..].as_bytes()) @@ -580,8 +589,8 @@ impl ListObjectsQuery { } impl ListMultipartUploadsQuery { - fn build_accumulator(&self) -> Accumulator<Uuid, UploadInfo> { - Accumulator::<Uuid, UploadInfo>::new(self.common.page_size) + fn build_accumulator(&self) -> UploadAccumulator { + UploadAccumulator::new(self.common.page_size) } fn begin(&self) -> Result<RangeBegin, Error> { @@ -665,6 +674,7 @@ impl<K: std::cmp::Ord, V> Accumulator<K, V> { Some(p) => p, None => return None, }; + assert!(pfx.starts_with(&query.prefix)); // Try to register this prefix // If not possible, we can return early @@ -675,8 +685,11 @@ impl<K: std::cmp::Ord, V> Accumulator<K, V> { // We consume the whole common prefix from the iterator let mut last_pfx_key = &object.key; loop { - last_pfx_key = match objects.peek() { - Some(o) if o.key.starts_with(pfx) => &o.key, + match objects.peek() { + Some(o) if o.key.starts_with(pfx) => { + last_pfx_key = &o.key; + objects.next(); + } Some(_) => { return Some(ExtractionResult::Extracted { key: last_pfx_key.to_owned(), @@ -692,8 +705,6 @@ impl<K: std::cmp::Ord, V> Accumulator<K, V> { } } }; - - objects.next(); } } @@ -708,12 +719,11 @@ impl<K: std::cmp::Ord, V> Accumulator<K, V> { } // Otherwise, we need to check if we can add it - match self.is_full() { - true => false, - false => { - self.common_prefixes.insert(key); - true - } + if self.is_full() { + false + } else { + self.common_prefixes.insert(key); + true } } @@ -721,12 +731,11 @@ impl<K: std::cmp::Ord, V> Accumulator<K, V> { // It is impossible to add twice a key, this is an error assert!(!self.keys.contains_key(&key)); - match self.is_full() { - true => false, - false => { - self.keys.insert(key, value); - true - } + if self.is_full() { + false + } else { + self.keys.insert(key, value); + true } } } @@ -743,6 +752,7 @@ impl ExtractAccumulator for ObjectAccumulator { } let object = objects.next().expect("This iterator can not be empty as it is checked earlier in the code. This is a logic bug, please report it."); + assert!(object.key.starts_with(&query.prefix)); let version = match object.versions().iter().find(|x| x.is_data()) { Some(v) => v, diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index 52ea8e78..6b786318 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -9,7 +9,6 @@ use md5::{Digest as Md5Digest, Md5}; use garage_table::*; use garage_util::async_hash::*; use garage_util::data::*; -use garage_util::time::*; use garage_model::bucket_table::Bucket; use garage_model::garage::Garage; @@ -30,10 +29,13 @@ pub async fn handle_create_multipart_upload( req: &Request<Body>, bucket_name: &str, bucket_id: Uuid, - key: &str, + key: &String, ) -> Result<Response<Body>, Error> { + let existing_object = garage.object_table.get(&bucket_id, &key).await?; + let upload_id = gen_uuid(); - let timestamp = now_msec(); + let timestamp = next_timestamp(existing_object.as_ref()); + let headers = get_headers(req.headers())?; // Create object in object table @@ -233,7 +235,8 @@ pub async fn handle_complete_multipart_upload( // Get object and multipart upload let key = key.to_string(); - let (_, mut object_version, mpu) = get_upload(&garage, &bucket.id, &key, &upload_id).await?; + let (object, mut object_version, mpu) = + get_upload(&garage, &bucket.id, &key, &upload_id).await?; if mpu.parts.is_empty() { return Err(Error::bad_request("No data was uploaded")); @@ -331,7 +334,7 @@ pub async fn handle_complete_multipart_upload( // Calculate total size of final object let total_size = parts.iter().map(|x| x.size.unwrap()).sum(); - if let Err(e) = check_quotas(&garage, bucket, &key, total_size).await { + if let Err(e) = check_quotas(&garage, bucket, total_size, Some(&object)).await { object_version.state = ObjectVersionState::Aborted; let final_object = Object::new(bucket.id, key.clone(), vec![object_version]); garage.object_table.insert(&final_object).await?; diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index c7ac5030..606facc4 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use base64::prelude::*; use futures::prelude::*; +use futures::try_join; use hyper::body::{Body, Bytes}; use hyper::header::{HeaderMap, HeaderValue}; use hyper::{Request, Response}; @@ -35,7 +36,7 @@ pub async fn handle_put( garage: Arc<Garage>, req: Request<Body>, bucket: &Bucket, - key: &str, + key: &String, content_sha256: Option<Hash>, ) -> Result<Response<Body>, Error> { // Retrieve interesting headers from request @@ -68,16 +69,24 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( headers: ObjectVersionHeaders, body: S, bucket: &Bucket, - key: &str, + key: &String, content_md5: Option<String>, content_sha256: Option<FixedBytes32>, ) -> Result<(Uuid, String), Error> { + let mut chunker = StreamChunker::new(body, garage.config.block_size); + let (first_block_opt, existing_object) = try_join!( + chunker.next(), + garage + .object_table + .get(&bucket.id, key) + .map_err(Error::from), + )?; + + let first_block = first_block_opt.unwrap_or_default(); + // Generate identity of new version let version_uuid = gen_uuid(); - let version_timestamp = now_msec(); - - let mut chunker = StreamChunker::new(body, garage.config.block_size); - let first_block = chunker.next().await?.unwrap_or_default(); + let version_timestamp = next_timestamp(existing_object.as_ref()); // If body is small enough, store it directly in the object table // as "inline data". We can then return immediately. @@ -97,7 +106,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( content_sha256, )?; - check_quotas(&garage, bucket, key, size).await?; + check_quotas(&garage, bucket, size, existing_object.as_ref()).await?; let object_version = ObjectVersion { uuid: version_uuid, @@ -176,7 +185,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( content_sha256, )?; - check_quotas(&garage, bucket, key, total_size).await?; + check_quotas(&garage, bucket, total_size, existing_object.as_ref()).await?; // Save final object state, marked as Complete let md5sum_hex = hex::encode(data_md5sum); @@ -229,19 +238,19 @@ pub(crate) fn ensure_checksum_matches( pub(crate) async fn check_quotas( garage: &Arc<Garage>, bucket: &Bucket, - key: &str, size: u64, + prev_object: Option<&Object>, ) -> Result<(), Error> { let quotas = bucket.state.as_option().unwrap().quotas.get(); if quotas.max_objects.is_none() && quotas.max_size.is_none() { return Ok(()); }; - let key = key.to_string(); - let (prev_object, counters) = futures::try_join!( - garage.object_table.get(&bucket.id, &key), - garage.object_counter_table.table.get(&bucket.id, &EmptyKey), - )?; + let counters = garage + .object_counter_table + .table + .get(&bucket.id, &EmptyKey) + .await?; let counters = counters .map(|x| x.filtered_values(&garage.system.ring.borrow())) @@ -275,7 +284,7 @@ pub(crate) async fn check_quotas( if cnt_size_diff > 0 && current_size + cnt_size_diff > ms as i64 { return Err(Error::forbidden(format!( "Bucket size quota is reached, maximum total size of objects for this bucket: {}. The bucket is already {} bytes, and this object would add {} bytes.", - ms, current_size, size + ms, current_size, cnt_size_diff ))); } } @@ -519,3 +528,11 @@ pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<ObjectVers other, }) } + +pub(crate) fn next_timestamp(existing_object: Option<&Object>) -> u64 { + existing_object + .as_ref() + .and_then(|obj| obj.versions().iter().map(|v| v.timestamp).max()) + .map(|t| std::cmp::max(t + 1, now_msec())) + .unwrap_or_else(now_msec) +} |