From b4592a00fee3504b80aab9a8ee46bbacf7612e4a Mon Sep 17 00:00:00 2001 From: Quentin Date: Wed, 12 Jan 2022 19:04:55 +0100 Subject: Implement ListMultipartUploads (#171) Implement ListMultipartUploads, also refactor ListObjects and ListObjectsV2. It took me some times as I wanted to propose the following things: - Using an iterator instead of the loop+goto pattern. I find it easier to read and it should enable some optimizations. For example, when consuming keys of a common prefix, we do many [redundant checks](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/main/src/api/s3_list.rs#L125-L156) while the only thing to do is to [check if the following key is still part of the common prefix](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/feature/s3-multipart-compat/src/api/s3_list.rs#L476). - Try to name things (see ExtractionResult and RangeBegin enums) and to separate concerns (see ListQuery and Accumulator) - An IO closure to make unit tests possibles. - Unit tests, to track regressions and document how to interact with the code - Integration tests with `s3api`. In the future, I would like to move them in Rust with the aws rust SDK. Merging of the logic of ListMultipartUploads and ListObjects was not a goal but a consequence of the previous modifications. Some points that we might want to discuss: - ListObjectsV1, when using pagination and delimiters, has a weird behavior (it lists multiple times the same prefix) with `aws s3api` due to the fact that it can not use our optimization to skip the whole prefix. It is independant from my refactor and can be tested with the commented `s3api` tests in `test-smoke.sh`. It probably has the same weird behavior on the official AWS S3 implementation. - Considering ListMultipartUploads, I had to "abuse" upload id marker to support prefix skipping. I send an `upload-id-marker` with the hardcoded value `include` to emulate your "including" token. - Some ways to test ListMultipartUploads with existing software (my tests are limited to s3api for now). Co-authored-by: Quentin Dufour Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/171 Co-authored-by: Quentin Co-committed-by: Quentin --- src/api/api_server.rs | 55 ++- src/api/s3_bucket.rs | 3 +- src/api/s3_list.rs | 1172 ++++++++++++++++++++++++++++++++++++--------- src/api/s3_put.rs | 2 +- src/api/s3_router.rs | 2 +- src/api/s3_xml.rs | 106 ++++ src/garage/admin.rs | 3 +- src/model/object_table.rs | 14 +- 8 files changed, 1100 insertions(+), 257 deletions(-) (limited to 'src') diff --git a/src/api/api_server.rs b/src/api/api_server.rs index 41aa0046..16156e74 100644 --- a/src/api/api_server.rs +++ b/src/api/api_server.rs @@ -1,3 +1,4 @@ +use std::cmp::{max, min}; use std::net::SocketAddr; use std::sync::Arc; @@ -217,16 +218,18 @@ async fn handler_inner(garage: Arc, req: Request) -> Result, req: Request) -> Result, req: Request) -> Result { + handle_list_multipart_upload( + garage, + &ListMultipartUploadsQuery { + common: ListQueryCommon { + bucket_name: bucket, + bucket_id, + delimiter: delimiter.map(|d| d.to_string()), + page_size: max_uploads.map(|p| min(1000, max(1, p))).unwrap_or(1000), + prefix: prefix.unwrap_or_default(), + urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), + }, + key_marker, + upload_id_marker, + }, + ) + .await + } Endpoint::DeleteObjects { .. } => { handle_delete_objects(garage, bucket_id, req, content_sha256).await } diff --git a/src/api/s3_bucket.rs b/src/api/s3_bucket.rs index 425d2998..494224c8 100644 --- a/src/api/s3_bucket.rs +++ b/src/api/s3_bucket.rs @@ -7,6 +7,7 @@ use garage_model::bucket_alias_table::*; use garage_model::bucket_table::Bucket; use garage_model::garage::Garage; use garage_model::key_table::Key; +use garage_model::object_table::ObjectFilter; use garage_model::permission::BucketKeyPerm; use garage_table::util::*; use garage_util::crdt::*; @@ -226,7 +227,7 @@ pub async fn handle_delete_bucket( // Check bucket is empty let objects = garage .object_table - .get_range(&bucket_id, None, Some(DeletedFilter::NotDeleted), 10) + .get_range(&bucket_id, None, Some(ObjectFilter::IsData), 10) .await?; if !objects.is_empty() { return Err(Error::BucketNotEmpty); diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs index 07efb02d..ddc03375 100644 --- a/src/api/s3_list.rs +++ b/src/api/s3_list.rs @@ -1,4 +1,5 @@ use std::collections::{BTreeMap, BTreeSet}; +use std::iter::{Iterator, Peekable}; use std::sync::Arc; use hyper::{Body, Response}; @@ -10,308 +11,721 @@ use garage_util::time::*; use garage_model::garage::Garage; use garage_model::object_table::*; -use garage_table::DeletedFilter; - use crate::encoding::*; use crate::error::*; +use crate::s3_put; use crate::s3_xml; #[derive(Debug)] -pub struct ListObjectsQuery { - pub is_v2: bool, +pub struct ListQueryCommon { pub bucket_name: String, pub bucket_id: Uuid, pub delimiter: Option, - pub max_keys: usize, + pub page_size: usize, pub prefix: String, + pub urlencode_resp: bool, +} + +#[derive(Debug)] +pub struct ListObjectsQuery { + pub is_v2: bool, pub marker: Option, pub continuation_token: Option, pub start_after: Option, - pub urlencode_resp: bool, + pub common: ListQueryCommon, } #[derive(Debug)] -struct ListResultInfo { - last_modified: u64, - size: u64, - etag: String, +pub struct ListMultipartUploadsQuery { + pub key_marker: Option, + pub upload_id_marker: Option, + pub common: ListQueryCommon, } pub async fn handle_list( garage: Arc, query: &ListObjectsQuery, ) -> Result, Error> { - let mut result_keys = BTreeMap::::new(); - let mut result_common_prefixes = BTreeSet::::new(); - - // Determine the key from where we want to start fetch objects - // from the database, and whether the object at this key must - // be included or excluded from the response. - // This key can be the prefix in the base case, or intermediate - // points in the dataset if we are continuing a previous listing. - #[allow(clippy::collapsible_else_if)] - let (mut next_chunk_start, mut next_chunk_exclude_start) = if query.is_v2 { - if let Some(ct) = &query.continuation_token { - // In V2 mode, the continuation token is defined as an opaque - // string in the spec, so we can do whatever we want with it. - // In our case, it is defined as either [ or ] (for include - // and exclude, respectively), followed by a base64 string - // representing the key to start with. - let exclude = match &ct[..1] { - "[" => false, - "]" => true, - _ => return Err(Error::BadRequest("Invalid continuation token".to_string())), - }; - ( - String::from_utf8(base64::decode(ct[1..].as_bytes())?)?, - exclude, - ) - } else if let Some(sa) = &query.start_after { - // StartAfter has defined semantics in the spec: - // start listing at the first key immediately after. - (sa.clone(), true) - } else { - // In the case where neither is specified, we start - // listing at the specified prefix. If an object has this - // exact same key, we include it. (TODO is this correct?) - (query.prefix.clone(), false) + let io = |bucket, key, count| { + let t = &garage.object_table; + async move { + t.get_range(&bucket, key, Some(ObjectFilter::IsData), count) + .await } - } else { - if let Some(mk) = &query.marker { - // In V1 mode, the spec defines the Marker value to mean - // the same thing as the StartAfter value in V2 mode. - (mk.clone(), true) - } else { - // Base case, same as in V2 mode - (query.prefix.clone(), false) + }; + + let mut acc = query.build_accumulator(); + let pagination = fetch_list_entries(&query.common, query.begin()?, &mut acc, &io).await?; + + let result = s3_xml::ListBucketResult { + xmlns: (), + // Sending back request information + name: s3_xml::Value(query.common.bucket_name.to_string()), + prefix: uriencode_maybe(&query.common.prefix, query.common.urlencode_resp), + max_keys: s3_xml::IntValue(query.common.page_size as i64), + delimiter: query + .common + .delimiter + .as_ref() + .map(|x| uriencode_maybe(x, query.common.urlencode_resp)), + encoding_type: match query.common.urlencode_resp { + true => Some(s3_xml::Value("url".to_string())), + false => None, + }, + marker: match (!query.is_v2, &query.marker) { + (true, Some(k)) => Some(uriencode_maybe(k, query.common.urlencode_resp)), + _ => None, + }, + start_after: match (query.is_v2, &query.start_after) { + (true, Some(sa)) => Some(uriencode_maybe(sa, query.common.urlencode_resp)), + _ => None, + }, + continuation_token: match (query.is_v2, &query.continuation_token) { + (true, Some(ct)) => Some(s3_xml::Value(ct.to_string())), + _ => None, + }, + + // Pagination + is_truncated: s3_xml::Value(format!("{}", pagination.is_some())), + key_count: Some(s3_xml::IntValue( + acc.keys.len() as i64 + acc.common_prefixes.len() as i64, + )), + next_marker: match (!query.is_v2, &pagination) { + (true, Some(RangeBegin::AfterKey { key: k })) + | ( + true, + Some(RangeBegin::IncludingKey { + fallback_key: Some(k), + .. + }), + ) => Some(uriencode_maybe(k, query.common.urlencode_resp)), + _ => None, + }, + next_continuation_token: match (query.is_v2, &pagination) { + (true, Some(RangeBegin::AfterKey { key })) => Some(s3_xml::Value(format!( + "]{}", + base64::encode(key.as_bytes()) + ))), + (true, Some(RangeBegin::IncludingKey { key, .. })) => Some(s3_xml::Value(format!( + "[{}", + base64::encode(key.as_bytes()) + ))), + _ => None, + }, + + // Body + contents: acc + .keys + .iter() + .map(|(key, info)| s3_xml::ListBucketItem { + key: uriencode_maybe(key, query.common.urlencode_resp), + last_modified: s3_xml::Value(msec_to_rfc3339(info.last_modified)), + size: s3_xml::IntValue(info.size as i64), + etag: s3_xml::Value(info.etag.to_string()), + storage_class: s3_xml::Value("STANDARD".to_string()), + }) + .collect(), + common_prefixes: acc + .common_prefixes + .iter() + .map(|pfx| s3_xml::CommonPrefix { + prefix: uriencode_maybe(pfx, query.common.urlencode_resp), + }) + .collect(), + }; + + let xml = s3_xml::to_xml_with_header(&result)?; + Ok(Response::builder() + .header("Content-Type", "application/xml") + .body(Body::from(xml.into_bytes()))?) +} + +pub async fn handle_list_multipart_upload( + garage: Arc, + query: &ListMultipartUploadsQuery, +) -> Result, Error> { + let io = |bucket, key, count| { + let t = &garage.object_table; + async move { + t.get_range(&bucket, key, Some(ObjectFilter::IsUploading), count) + .await } }; - debug!( - "List request: `{:?}` {} `{}`, start from {}, exclude first {}", - query.delimiter, query.max_keys, query.prefix, next_chunk_start, next_chunk_exclude_start - ); + let mut acc = query.build_accumulator(); + let pagination = fetch_list_entries(&query.common, query.begin()?, &mut acc, &io).await?; + + let result = s3_xml::ListMultipartUploadsResult { + xmlns: (), + + // Sending back some information about the request + bucket: s3_xml::Value(query.common.bucket_name.to_string()), + prefix: uriencode_maybe(&query.common.prefix, query.common.urlencode_resp), + delimiter: query + .common + .delimiter + .as_ref() + .map(|d| uriencode_maybe(d, query.common.urlencode_resp)), + max_uploads: s3_xml::IntValue(query.common.page_size as i64), + key_marker: query + .key_marker + .as_ref() + .map(|m| uriencode_maybe(m, query.common.urlencode_resp)), + upload_id_marker: query + .upload_id_marker + .as_ref() + .map(|m| s3_xml::Value(m.to_string())), + encoding_type: match query.common.urlencode_resp { + true => Some(s3_xml::Value("url".to_string())), + false => None, + }, + + // Handling pagination + is_truncated: s3_xml::Value(format!("{}", pagination.is_some())), + next_key_marker: match &pagination { + None => None, + Some(RangeBegin::AfterKey { key }) + | Some(RangeBegin::AfterUpload { key, .. }) + | Some(RangeBegin::IncludingKey { key, .. }) => { + Some(uriencode_maybe(key, query.common.urlencode_resp)) + } + }, + next_upload_id_marker: match pagination { + Some(RangeBegin::AfterUpload { upload, .. }) => { + Some(s3_xml::Value(hex::encode(upload))) + } + Some(RangeBegin::IncludingKey { .. }) => Some(s3_xml::Value("include".to_string())), + _ => None, + }, + + // Result body + upload: acc + .keys + .iter() + .map(|(uuid, info)| s3_xml::ListMultipartItem { + initiated: s3_xml::Value(msec_to_rfc3339(info.timestamp)), + key: uriencode_maybe(&info.key, query.common.urlencode_resp), + upload_id: s3_xml::Value(hex::encode(uuid)), + storage_class: s3_xml::Value("STANDARD".to_string()), + initiator: s3_xml::Initiator { + display_name: s3_xml::Value("Dummy Key".to_string()), + id: s3_xml::Value("GKDummyKey".to_string()), + }, + owner: s3_xml::Owner { + display_name: s3_xml::Value("Dummy Key".to_string()), + id: s3_xml::Value("GKDummyKey".to_string()), + }, + }) + .collect(), + common_prefixes: acc + .common_prefixes + .iter() + .map(|c| s3_xml::CommonPrefix { + prefix: s3_xml::Value(c.to_string()), + }) + .collect(), + }; + + let xml = s3_xml::to_xml_with_header(&result)?; - // `truncated` is a boolean that determines whether there are - // more items to be added. - let truncated; - // `last_processed_item` is the key of the last item - // that was included in the listing before truncating. - let mut last_processed_item = None; + Ok(Response::builder() + .header("Content-Type", "application/xml") + .body(Body::from(xml.into_bytes()))?) +} + +/* + * Private enums and structs + */ + +#[derive(Debug)] +struct ObjectInfo { + last_modified: u64, + size: u64, + etag: String, +} + +#[derive(Debug, PartialEq)] +struct UploadInfo { + key: String, + timestamp: u64, +} + +enum ExtractionResult { + NoMore, + Filled, + FilledAtUpload { + key: String, + upload: Uuid, + }, + Extracted { + key: String, + }, + // Fallback key is used for legacy APIs that only support + // exlusive pagination (and not inclusive one). + SkipTo { + key: String, + fallback_key: Option, + }, +} + +#[derive(PartialEq, Clone, Debug)] +enum RangeBegin { + // Fallback key is used for legacy APIs that only support + // exlusive pagination (and not inclusive one). + IncludingKey { + key: String, + fallback_key: Option, + }, + AfterKey { + key: String, + }, + AfterUpload { + key: String, + upload: Uuid, + }, +} +type Pagination = Option; + +/* + * Fetch list entries + */ + +async fn fetch_list_entries( + query: &ListQueryCommon, + begin: RangeBegin, + acc: &mut impl ExtractAccumulator, + mut io: F, +) -> Result +where + R: futures::Future, GarageError>>, + F: FnMut(Uuid, Option, usize) -> R, +{ + let mut cursor = begin; + // +1 is needed as we may need to skip the 1st key + // (range is inclusive while most S3 requests are exclusive) + let count = query.page_size + 1; + + loop { + let start_key = match cursor { + RangeBegin::AfterKey { ref key } + | RangeBegin::AfterUpload { ref key, .. } + | RangeBegin::IncludingKey { ref key, .. } => Some(key.clone()), + }; - 'query_loop: loop { // Fetch objects - let objects = garage - .object_table - .get_range( - &query.bucket_id, - Some(next_chunk_start.clone()), - Some(DeletedFilter::NotDeleted), - query.max_keys + 1, - ) - .await?; + let objects = io(query.bucket_id, start_key.clone(), count).await?; + debug!( - "List: get range {} (max {}), results: {}", - next_chunk_start, - query.max_keys + 1, + "List: get range {:?} (max {}), results: {}", + start_key, + count, objects.len() ); - let current_chunk_start = next_chunk_start.clone(); - - // Iterate on returned objects and add them to the response. - // If a delimiter is specified, we take care of grouping objects - // into CommonPrefixes. - for object in objects.iter() { - // If we have retrieved an object that doesn't start with - // the prefix, we know we have finished listing our stuff. - if !object.key.starts_with(&query.prefix) { - truncated = false; - break 'query_loop; - } + let server_more = objects.len() >= count; - // Exclude the starting key if we have to. - if object.key == next_chunk_start && next_chunk_exclude_start { - continue; - } + let prev_req_cursor = cursor.clone(); + let mut iter = objects.iter().peekable(); - // Find if this object has a currently valid (non-deleted, - // non-still-uploading) version. If not, skip it. - let version = match object.versions().iter().find(|x| x.is_data()) { - Some(v) => v, - None => continue, - }; + // Drop the first key if needed + // Only AfterKey requires it according to the S3 spec and our implem. + match (&cursor, iter.peek()) { + (RangeBegin::AfterKey { key }, Some(object)) if &object.key == key => iter.next(), + (_, _) => None, + }; - // If we don't have space to add this object to our response, - // we will need to stop here and mark the key of this object - // as the marker from where - // we want to start again in the next list call. - let cannot_add = result_keys.len() + result_common_prefixes.len() >= query.max_keys; - - // Determine whether this object should be grouped inside - // a CommonPrefix because it contains the delimiter, - // or if it should be returned as an object. - let common_prefix = match &query.delimiter { - Some(delimiter) => object.key[query.prefix.len()..] - .find(delimiter) - .map(|i| &object.key[..query.prefix.len() + i + delimiter.len()]), - None => None, - }; - if let Some(pfx) = common_prefix { - // In the case where this object must be grouped in a - // common prefix, handle it here. - if !result_common_prefixes.contains(pfx) { - // Determine the first listing key that starts after - // the common prefix, by finding the next possible - // string by alphabetical order. - let mut first_key_after_prefix = pfx.to_string(); - let tail = first_key_after_prefix.pop().unwrap(); - first_key_after_prefix.push(((tail as u8) + 1) as char); - - // If this were the end of the chunk, - // the next chunk should start after this prefix - next_chunk_start = first_key_after_prefix; - next_chunk_exclude_start = false; - - if cannot_add { - truncated = true; - break 'query_loop; - } - result_common_prefixes.insert(pfx.to_string()); + while let Some(object) = iter.peek() { + if !object.key.starts_with(&query.prefix) { + // If the key is not in the requested prefix, we're done + return Ok(None); + } + + cursor = match acc.extract(query, &cursor, &mut iter) { + ExtractionResult::Extracted { key } => RangeBegin::AfterKey { key }, + ExtractionResult::SkipTo { key, fallback_key } => { + RangeBegin::IncludingKey { key, fallback_key } } - last_processed_item = Some(object.key.clone()); - continue; + ExtractionResult::FilledAtUpload { key, upload } => { + return Ok(Some(RangeBegin::AfterUpload { key, upload })) + } + ExtractionResult::Filled => return Ok(Some(cursor)), + ExtractionResult::NoMore => return Ok(None), }; + } - // This is not a common prefix, we want to add it to our - // response directly. - next_chunk_start = object.key.clone(); + if !server_more { + // We did not fully fill the accumulator despite exhausting all the data we have, + // we're done + return Ok(None); + } - if cannot_add { - truncated = true; - next_chunk_exclude_start = false; - break 'query_loop; - } + if prev_req_cursor == cursor { + unreachable!("No progress has been done in the loop. This is a bug, please report it."); + } + } +} - let meta = match &version.state { - ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta, - ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta, - _ => unreachable!(), - }; - let info = match result_keys.get(&object.key) { - None => ListResultInfo { - last_modified: version.timestamp, - size: meta.size, - etag: meta.etag.to_string(), +/* + * ListQuery logic + */ + +/// Determine the key from where we want to start fetch objects from the database +/// +/// We choose whether the object at this key must +/// be included or excluded from the response. +/// This key can be the prefix in the base case, or intermediate +/// points in the dataset if we are continuing a previous listing. +impl ListObjectsQuery { + fn build_accumulator(&self) -> Accumulator { + Accumulator::::new(self.common.page_size) + } + + fn begin(&self) -> Result { + if self.is_v2 { + match (&self.continuation_token, &self.start_after) { + // In V2 mode, the continuation token is defined as an opaque + // string in the spec, so we can do whatever we want with it. + // In our case, it is defined as either [ or ] (for include + // representing the key to start with. + (Some(token), _) => match &token[..1] { + "[" => Ok(RangeBegin::IncludingKey { + key: String::from_utf8(base64::decode(token[1..].as_bytes())?)?, + fallback_key: None, + }), + "]" => Ok(RangeBegin::AfterKey { + key: String::from_utf8(base64::decode(token[1..].as_bytes())?)?, + }), + _ => Err(Error::BadRequest("Invalid continuation token".to_string())), }, - Some(_lri) => { - return Err(Error::InternalError(GarageError::Message(format!( - "Duplicate key?? {} (this is a bug, please report it)", - object.key - )))) + + // StartAfter has defined semantics in the spec: + // start listing at the first key immediately after. + (_, Some(key)) => Ok(RangeBegin::AfterKey { + key: key.to_string(), + }), + + // In the case where neither is specified, we start + // listing at the specified prefix. If an object has this + // exact same key, we include it. (@TODO is this correct?) + _ => Ok(RangeBegin::IncludingKey { + key: self.common.prefix.to_string(), + fallback_key: None, + }), + } + } else { + match &self.marker { + // In V1 mode, the spec defines the Marker value to mean + // the same thing as the StartAfter value in V2 mode. + Some(key) => Ok(RangeBegin::AfterKey { + key: key.to_string(), + }), + _ => Ok(RangeBegin::IncludingKey { + key: self.common.prefix.to_string(), + fallback_key: None, + }), + } + } + } +} + +impl ListMultipartUploadsQuery { + fn build_accumulator(&self) -> Accumulator { + Accumulator::::new(self.common.page_size) + } + + fn begin(&self) -> Result { + match (&self.upload_id_marker, &self.key_marker) { + // If both the upload id marker and the key marker are sets, + // the spec specifies that we must start listing uploads INCLUDING the given key, + // AFTER the specified upload id (sorted in a lexicographic order). + // To enable some optimisations, we emulate "IncludingKey" by extending the upload id + // semantic. We base our reasoning on the hypothesis that S3's upload ids are opaques + // while Garage's ones are 32 bytes hex encoded which enables us to extend this query + // with a specific "include" upload id. + (Some(up_marker), Some(key_marker)) => match &up_marker[..] { + "include" => Ok(RangeBegin::IncludingKey { + key: key_marker.to_string(), + fallback_key: None, + }), + uuid => Ok(RangeBegin::AfterUpload { + key: key_marker.to_string(), + upload: s3_put::decode_upload_id(uuid)?, + }), + }, + + // If only the key marker is specified, the spec says that we must start listing + // uploads AFTER the specified key. + (None, Some(key_marker)) => Ok(RangeBegin::AfterKey { + key: key_marker.to_string(), + }), + _ => Ok(RangeBegin::IncludingKey { + key: self.common.prefix.to_string(), + fallback_key: None, + }), + } + } +} + +/* + * Accumulator logic + */ + +trait ExtractAccumulator { + fn extract<'a>( + &mut self, + query: &ListQueryCommon, + cursor: &RangeBegin, + iter: &mut Peekable>, + ) -> ExtractionResult; +} + +struct Accumulator { + common_prefixes: BTreeSet, + keys: BTreeMap, + max_capacity: usize, +} + +type ObjectAccumulator = Accumulator; +type UploadAccumulator = Accumulator; + +impl Accumulator { + fn new(page_size: usize) -> Accumulator { + Accumulator { + common_prefixes: BTreeSet::::new(), + keys: BTreeMap::::new(), + max_capacity: page_size, + } + } + + /// Observe the Object iterator and try to extract a single common prefix + /// + /// This function can consume an arbitrary number of items as long as they share the same + /// common prefix. + fn extract_common_prefix<'a>( + &mut self, + objects: &mut Peekable>, + query: &ListQueryCommon, + ) -> Option { + // Get the next object from the iterator + let object = objects.peek().expect("This iterator can not be empty as it is checked earlier in the code. This is a logic bug, please report it."); + + // Check if this is a common prefix (requires a passed delimiter and its value in the key) + let pfx = match common_prefix(object, query) { + Some(p) => p, + None => return None, + }; + + // Try to register this prefix + // If not possible, we can return early + if !self.try_insert_common_prefix(pfx.to_string()) { + return Some(ExtractionResult::Filled); + } + + // We consume the whole common prefix from the iterator + let mut last_pfx_key = &object.key; + loop { + last_pfx_key = match objects.peek() { + Some(o) if o.key.starts_with(pfx) => &o.key, + Some(_) => { + return Some(ExtractionResult::Extracted { + key: last_pfx_key.to_owned(), + }) + } + None => { + return match key_after_prefix(pfx) { + Some(next) => Some(ExtractionResult::SkipTo { + key: next, + fallback_key: Some(last_pfx_key.to_owned()), + }), + None => Some(ExtractionResult::NoMore), + } } }; - result_keys.insert(object.key.clone(), info); - last_processed_item = Some(object.key.clone()); - next_chunk_exclude_start = true; + + objects.next(); } + } - // If our database returned less objects than what we were asking for, - // it means that no more objects are in the bucket. So we stop here. - if objects.len() < query.max_keys + 1 { - truncated = false; - break 'query_loop; + fn is_full(&mut self) -> bool { + self.keys.len() + self.common_prefixes.len() >= self.max_capacity + } + + fn try_insert_common_prefix(&mut self, key: String) -> bool { + // If we already have an entry, we can continue + if self.common_prefixes.contains(&key) { + return true; } - // Sanity check: we should have added at least an object - // or a prefix to our returned result. - if next_chunk_start == current_chunk_start || last_processed_item.is_none() { - return Err(Error::InternalError(GarageError::Message(format!( - "S3 ListObject: made no progress, still starting at {} (this is a bug, please report it)", next_chunk_start)))); + // Otherwise, we need to check if we can add it + match self.is_full() { + true => false, + false => { + self.common_prefixes.insert(key); + true + } } + } + + fn try_insert_entry(&mut self, key: K, value: V) -> bool { + // It is impossible to add twice a key, this is an error + assert!(!self.keys.contains_key(&key)); - // Loop and fetch more objects + match self.is_full() { + true => false, + false => { + self.keys.insert(key, value); + true + } + } } +} - let mut result = s3_xml::ListBucketResult { - xmlns: (), - name: s3_xml::Value(query.bucket_name.to_string()), - prefix: uriencode_maybe(&query.prefix, query.urlencode_resp), - marker: None, - next_marker: None, - start_after: None, - continuation_token: None, - next_continuation_token: None, - max_keys: s3_xml::IntValue(query.max_keys as i64), - delimiter: query - .delimiter - .as_ref() - .map(|x| uriencode_maybe(x, query.urlencode_resp)), - encoding_type: match query.urlencode_resp { - true => Some(s3_xml::Value("url".to_string())), - false => None, - }, - key_count: Some(s3_xml::IntValue( - result_keys.len() as i64 + result_common_prefixes.len() as i64, - )), - is_truncated: s3_xml::Value(format!("{}", truncated)), - contents: vec![], - common_prefixes: vec![], - }; +impl ExtractAccumulator for ObjectAccumulator { + fn extract<'a>( + &mut self, + query: &ListQueryCommon, + _cursor: &RangeBegin, + objects: &mut Peekable>, + ) -> ExtractionResult { + if let Some(e) = self.extract_common_prefix(objects, query) { + return e; + } - if query.is_v2 { - if let Some(ct) = &query.continuation_token { - result.continuation_token = Some(s3_xml::Value(ct.to_string())); + let object = objects.next().expect("This iterator can not be empty as it is checked earlier in the code. This is a logic bug, please report it."); + + let version = match object.versions().iter().find(|x| x.is_data()) { + Some(v) => v, + None => unreachable!( + "Expect to have objects having data due to earlier filtering. This is a logic bug." + ), + }; + + let meta = match &version.state { + ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta, + ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta, + _ => unreachable!(), + }; + let info = ObjectInfo { + last_modified: version.timestamp, + size: meta.size, + etag: meta.etag.to_string(), + }; + + match self.try_insert_entry(object.key.clone(), info) { + true => ExtractionResult::Extracted { + key: object.key.clone(), + }, + false => ExtractionResult::Filled, } - if let Some(sa) = &query.start_after { - result.start_after = Some(uriencode_maybe(sa, query.urlencode_resp)); + } +} + +impl ExtractAccumulator for UploadAccumulator { + /// Observe the iterator, process a single key, and try to extract one or more upload entries + /// + /// This function processes a single object from the iterator that can contain an arbitrary + /// number of versions, and thus "uploads". + fn extract<'a>( + &mut self, + query: &ListQueryCommon, + cursor: &RangeBegin, + objects: &mut Peekable>, + ) -> ExtractionResult { + if let Some(e) = self.extract_common_prefix(objects, query) { + return e; } - if truncated { - let b64 = base64::encode(next_chunk_start.as_bytes()); - let nct = if next_chunk_exclude_start { - format!("]{}", b64) - } else { - format!("[{}", b64) + + // Get the next object from the iterator + let object = objects.next().expect("This iterator can not be empty as it is checked earlier in the code. This is a logic bug, please report it."); + + let mut uploads_for_key = object + .versions() + .iter() + .filter(|x| x.is_uploading()) + .collect::>(); + + // S3 logic requires lexicographically sorted upload ids. + uploads_for_key.sort_unstable_by_key(|e| e.uuid); + + // Skip results if an upload marker is provided + if let RangeBegin::AfterUpload { upload, .. } = cursor { + // Because our data are sorted, we can use a binary search to find the UUID + // or to find where it should have been added. Once this position is found, + // we use it to discard the first part of the array. + let idx = match uploads_for_key.binary_search_by(|e| e.uuid.cmp(upload)) { + // we start after the found uuid so we need to discard the pointed value. + // In the worst case, the UUID is the last element, which lead us to an empty array + // but we are never out of bound. + Ok(i) => i + 1, + // if the UUID is not found, the upload may have been discarded between the 2 request, + // this function returns where it could have been inserted, + // the pointed value is thus greater than our marker and we need to keep it. + Err(i) => i, }; - result.next_continuation_token = Some(s3_xml::Value(nct)); + uploads_for_key = uploads_for_key[idx..].to_vec(); } - } else { - // TODO: are these supposed to be urlencoded when encoding-type is URL?? - if let Some(mkr) = &query.marker { - result.marker = Some(uriencode_maybe(mkr, query.urlencode_resp)); + + let mut iter = uploads_for_key.iter(); + + // The first entry is a specific case + // as it changes our result enum type + let first_upload = match iter.next() { + Some(u) => u, + None => { + return ExtractionResult::Extracted { + key: object.key.clone(), + } + } + }; + let first_up_info = UploadInfo { + key: object.key.to_string(), + timestamp: first_upload.timestamp, + }; + if !self.try_insert_entry(first_upload.uuid, first_up_info) { + return ExtractionResult::Filled; } - if truncated { - if let Some(lpi) = last_processed_item { - result.next_marker = Some(uriencode_maybe(&lpi, query.urlencode_resp)); - } else { - return Err(Error::InternalError(GarageError::Message( - "S3 ListObject: last_processed_item is None but the response was truncated, indicating that many items were processed (this is a bug, please report it)".to_string()))); + + // We can then collect the remaining uploads in a loop + let mut prev_uuid = first_upload.uuid; + for upload in iter { + let up_info = UploadInfo { + key: object.key.to_string(), + timestamp: upload.timestamp, + }; + + // Insert data in our accumulator + // If it is full, return information to paginate. + if !self.try_insert_entry(upload.uuid, up_info) { + return ExtractionResult::FilledAtUpload { + key: object.key.clone(), + upload: prev_uuid, + }; } + // Update our last added UUID + prev_uuid = upload.uuid; } - } - for (key, info) in result_keys.iter() { - result.contents.push(s3_xml::ListBucketItem { - key: uriencode_maybe(key, query.urlencode_resp), - last_modified: s3_xml::Value(msec_to_rfc3339(info.last_modified)), - size: s3_xml::IntValue(info.size as i64), - etag: s3_xml::Value(info.etag.to_string()), - storage_class: s3_xml::Value("STANDARD".to_string()), - }); + // We successfully collected all the uploads + ExtractionResult::Extracted { + key: object.key.clone(), + } } +} - for pfx in result_common_prefixes.iter() { - result.common_prefixes.push(s3_xml::CommonPrefix { - prefix: uriencode_maybe(pfx, query.urlencode_resp), - }); +/* + * Utility functions + */ + +/// Returns the common prefix of the object given the query prefix and delimiter +fn common_prefix<'a>(object: &'a Object, query: &ListQueryCommon) -> Option<&'a str> { + match &query.delimiter { + Some(delimiter) => object.key[query.prefix.len()..] + .find(delimiter) + .map(|i| &object.key[..query.prefix.len() + i + delimiter.len()]), + None => None, } - - let xml = s3_xml::to_xml_with_header(&result)?; - - Ok(Response::builder() - .header("Content-Type", "application/xml") - .body(Body::from(xml.into_bytes()))?) } +/// URIencode a value if needed fn uriencode_maybe(s: &str, yes: bool) -> s3_xml::Value { if yes { s3_xml::Value(uri_encode(s, true)) @@ -319,3 +733,285 @@ fn uriencode_maybe(s: &str, yes: bool) -> s3_xml::Value { s3_xml::Value(s.to_string()) } } + +const UTF8_BEFORE_LAST_CHAR: char = '\u{10FFFE}'; + +/// Compute the key after the prefix +fn key_after_prefix(pfx: &str) -> Option { + let mut next = pfx.to_string(); + while !next.is_empty() { + let tail = next.pop().unwrap(); + if tail >= char::MAX { + continue; + } + + // Circumvent a limitation of RangeFrom that overflow earlier than needed + // See: https://doc.rust-lang.org/core/ops/struct.RangeFrom.html + let new_tail = if tail == UTF8_BEFORE_LAST_CHAR { + char::MAX + } else { + (tail..).nth(1).unwrap() + }; + + next.push(new_tail); + return Some(next); + } + + None +} + +/* + * Unit tests of this module + */ +#[cfg(test)] +mod tests { + use super::*; + use std::iter::FromIterator; + + const TS: u64 = 1641394898314; + + fn bucket() -> Uuid { + Uuid::from([0x42; 32]) + } + + fn query() -> ListMultipartUploadsQuery { + ListMultipartUploadsQuery { + common: ListQueryCommon { + prefix: "".to_string(), + delimiter: Some("/".to_string()), + page_size: 1000, + urlencode_resp: false, + bucket_name: "a".to_string(), + bucket_id: Uuid::from([0x00; 32]), + }, + key_marker: None, + upload_id_marker: None, + } + } + + fn objs() -> Vec { + vec![ + Object::new( + bucket(), + "a/b/c".to_string(), + vec![objup_version([0x01; 32])], + ), + Object::new(bucket(), "d".to_string(), vec![objup_version([0x01; 32])]), + ] + } + + fn objup_version(uuid: [u8; 32]) -> ObjectVersion { + ObjectVersion { + uuid: Uuid::from(uuid), + timestamp: TS, + state: ObjectVersionState::Uploading(ObjectVersionHeaders { + content_type: "text/plain".to_string(), + other: BTreeMap::::new(), + }), + } + } + + #[test] + fn test_key_after_prefix() { + assert_eq!(UTF8_BEFORE_LAST_CHAR as u32, (char::MAX as u32) - 1); + assert_eq!(key_after_prefix("a/b/").unwrap().as_str(), "a/b0"); + assert_eq!(key_after_prefix("€").unwrap().as_str(), "₭"); + assert_eq!( + key_after_prefix("􏿽").unwrap().as_str(), + String::from(char::from_u32(0x10FFFE).unwrap()) + ); + + // When the last character is the biggest UTF8 char + let a = String::from_iter(['a', char::MAX].iter()); + assert_eq!(key_after_prefix(a.as_str()).unwrap().as_str(), "b"); + + // When all characters are the biggest UTF8 char + let b = String::from_iter([char::MAX; 3].iter()); + assert!(key_after_prefix(b.as_str()).is_none()); + + // Check utf8 surrogates + let c = String::from('\u{D7FF}'); + assert_eq!( + key_after_prefix(c.as_str()).unwrap().as_str(), + String::from('\u{E000}') + ); + + // Check the character before the biggest one + let d = String::from('\u{10FFFE}'); + assert_eq!( + key_after_prefix(d.as_str()).unwrap().as_str(), + String::from(char::MAX) + ); + } + + #[test] + fn test_common_prefixes() { + let mut query = query(); + let objs = objs(); + + query.common.prefix = "a/".to_string(); + assert_eq!( + common_prefix(&objs.get(0).unwrap(), &query.common), + Some("a/b/") + ); + + query.common.prefix = "a/b/".to_string(); + assert_eq!(common_prefix(&objs.get(0).unwrap(), &query.common), None); + } + + #[test] + fn test_extract_common_prefix() { + let mut query = query(); + query.common.prefix = "a/".to_string(); + let objs = objs(); + let mut acc = UploadAccumulator::new(query.common.page_size); + + let mut iter = objs.iter().peekable(); + match acc.extract_common_prefix(&mut iter, &query.common) { + Some(ExtractionResult::Extracted { key }) => assert_eq!(key, "a/b/c".to_string()), + _ => panic!("wrong result"), + } + assert_eq!(acc.common_prefixes.len(), 1); + assert_eq!(acc.common_prefixes.iter().next().unwrap(), "a/b/"); + } + + #[test] + fn test_extract_upload() { + let objs = vec![ + Object::new( + bucket(), + "b".to_string(), + vec![ + objup_version([0x01; 32]), + objup_version([0x80; 32]), + objup_version([0x8f; 32]), + objup_version([0xdd; 32]), + ], + ), + Object::new(bucket(), "c".to_string(), vec![]), + ]; + + let mut acc = UploadAccumulator::new(2); + let mut start = RangeBegin::AfterUpload { + key: "b".to_string(), + upload: Uuid::from([0x01; 32]), + }; + + let mut iter = objs.iter().peekable(); + + // Check the case where we skip some uploads + match acc.extract(&(query().common), &start, &mut iter) { + ExtractionResult::FilledAtUpload { key, upload } => { + assert_eq!(key, "b"); + assert_eq!(upload, Uuid::from([0x8f; 32])); + } + _ => panic!("wrong result"), + }; + + assert_eq!(acc.keys.len(), 2); + assert_eq!( + acc.keys.get(&Uuid::from([0x80; 32])).unwrap(), + &UploadInfo { + timestamp: TS, + key: "b".to_string() + } + ); + assert_eq!( + acc.keys.get(&Uuid::from([0x8f; 32])).unwrap(), + &UploadInfo { + timestamp: TS, + key: "b".to_string() + } + ); + + acc = UploadAccumulator::new(2); + start = RangeBegin::AfterUpload { + key: "b".to_string(), + upload: Uuid::from([0xff; 32]), + }; + iter = objs.iter().peekable(); + + // Check the case where we skip all the uploads + match acc.extract(&(query().common), &start, &mut iter) { + ExtractionResult::Extracted { key } if key.as_str() == "b" => (), + _ => panic!("wrong result"), + }; + } + + #[tokio::test] + async fn test_fetch_uploads_no_result() -> Result<(), Error> { + let query = query(); + let mut acc = query.build_accumulator(); + let page = fetch_list_entries( + &query.common, + query.begin()?, + &mut acc, + |_, _, _| async move { Ok(vec![]) }, + ) + .await?; + assert_eq!(page, None); + assert_eq!(acc.common_prefixes.len(), 0); + assert_eq!(acc.keys.len(), 0); + + Ok(()) + } + + #[tokio::test] + async fn test_fetch_uploads_basic() -> Result<(), Error> { + let query = query(); + let mut acc = query.build_accumulator(); + let mut fake_io = |_, _, _| async move { Ok(objs()) }; + let page = + fetch_list_entries(&query.common, query.begin()?, &mut acc, &mut fake_io).await?; + assert_eq!(page, None); + assert_eq!(acc.common_prefixes.len(), 1); + assert_eq!(acc.keys.len(), 1); + assert!(acc.common_prefixes.contains("a/")); + + Ok(()) + } + + #[tokio::test] + async fn test_fetch_uploads_advanced() -> Result<(), Error> { + let mut query = query(); + query.common.page_size = 2; + + let mut fake_io = |_, k: Option, _| async move { + Ok(match k.as_deref() { + Some("") => vec![ + Object::new(bucket(), "b/a".to_string(), vec![objup_version([0x01; 32])]), + Object::new(bucket(), "b/b".to_string(), vec![objup_version([0x01; 32])]), + Object::new(bucket(), "b/c".to_string(), vec![objup_version([0x01; 32])]), + ], + Some("b0") => vec![ + Object::new(bucket(), "c/a".to_string(), vec![objup_version([0x01; 32])]), + Object::new(bucket(), "c/b".to_string(), vec![objup_version([0x01; 32])]), + Object::new(bucket(), "c/c".to_string(), vec![objup_version([0x02; 32])]), + ], + Some("c0") => vec![Object::new( + bucket(), + "d".to_string(), + vec![objup_version([0x01; 32])], + )], + _ => panic!("wrong value {:?}", k), + }) + }; + + let mut acc = query.build_accumulator(); + let page = + fetch_list_entries(&query.common, query.begin()?, &mut acc, &mut fake_io).await?; + assert_eq!( + page, + Some(RangeBegin::IncludingKey { + key: "c0".to_string(), + fallback_key: Some("c/c".to_string()) + }) + ); + assert_eq!(acc.common_prefixes.len(), 2); + assert_eq!(acc.keys.len(), 0); + assert!(acc.common_prefixes.contains("b/")); + assert!(acc.common_prefixes.contains("c/")); + + Ok(()) + } +} diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index bb92c252..d7ee5893 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -610,7 +610,7 @@ pub(crate) fn get_headers(req: &Request) -> Result Result { +pub fn decode_upload_id(id: &str) -> Result { let id_bin = hex::decode(id).map_err(|_| Error::NoSuchUpload)?; if id_bin.len() != 32 { return Err(Error::NoSuchUpload); diff --git a/src/api/s3_router.rs b/src/api/s3_router.rs index 234f77f0..a8ac0086 100644 --- a/src/api/s3_router.rs +++ b/src/api/s3_router.rs @@ -350,7 +350,7 @@ pub enum Endpoint { delimiter: Option, encoding_type: Option, key_marker: Option, - max_uploads: Option, + max_uploads: Option, prefix: Option, upload_id_marker: Option, }, diff --git a/src/api/s3_xml.rs b/src/api/s3_xml.rs index 9b5a0202..98c63d57 100644 --- a/src/api/s3_xml.rs +++ b/src/api/s3_xml.rs @@ -141,6 +141,60 @@ pub struct CompleteMultipartUploadResult { pub etag: Value, } +#[derive(Debug, Serialize, PartialEq)] +pub struct Initiator { + #[serde(rename = "DisplayName")] + pub display_name: Value, + #[serde(rename = "ID")] + pub id: Value, +} + +#[derive(Debug, Serialize, PartialEq)] +pub struct ListMultipartItem { + #[serde(rename = "Initiated")] + pub initiated: Value, + #[serde(rename = "Initiator")] + pub initiator: Initiator, + #[serde(rename = "Key")] + pub key: Value, + #[serde(rename = "UploadId")] + pub upload_id: Value, + #[serde(rename = "Owner")] + pub owner: Owner, + #[serde(rename = "StorageClass")] + pub storage_class: Value, +} + +#[derive(Debug, Serialize, PartialEq)] +pub struct ListMultipartUploadsResult { + #[serde(serialize_with = "xmlns_tag")] + pub xmlns: (), + #[serde(rename = "Bucket")] + pub bucket: Value, + #[serde(rename = "KeyMarker")] + pub key_marker: Option, + #[serde(rename = "UploadIdMarker")] + pub upload_id_marker: Option, + #[serde(rename = "NextKeyMarker")] + pub next_key_marker: Option, + #[serde(rename = "NextUploadIdMarker")] + pub next_upload_id_marker: Option, + #[serde(rename = "Prefix")] + pub prefix: Value, + #[serde(rename = "Delimiter")] + pub delimiter: Option, + #[serde(rename = "MaxUploads")] + pub max_uploads: IntValue, + #[serde(rename = "IsTruncated")] + pub is_truncated: Value, + #[serde(rename = "Upload")] + pub upload: Vec, + #[serde(rename = "CommonPrefixes")] + pub common_prefixes: Vec, + #[serde(rename = "EncodingType")] + pub encoding_type: Option, +} + #[derive(Debug, Serialize, PartialEq)] pub struct ListBucketItem { #[serde(rename = "Key")] @@ -432,6 +486,58 @@ mod tests { Ok(()) } + #[test] + fn list_multipart_uploads_result() -> Result<(), ApiError> { + let result = ListMultipartUploadsResult { + xmlns: (), + bucket: Value("example-bucket".to_string()), + key_marker: None, + next_key_marker: None, + upload_id_marker: None, + encoding_type: None, + next_upload_id_marker: None, + upload: vec![], + delimiter: Some(Value("/".to_string())), + prefix: Value("photos/2006/".to_string()), + max_uploads: IntValue(1000), + is_truncated: Value("false".to_string()), + common_prefixes: vec![ + CommonPrefix { + prefix: Value("photos/2006/February/".to_string()), + }, + CommonPrefix { + prefix: Value("photos/2006/January/".to_string()), + }, + CommonPrefix { + prefix: Value("photos/2006/March/".to_string()), + }, + ], + }; + + assert_eq!( + to_xml_with_header(&result)?, + "\ +\ + example-bucket\ + photos/2006/\ + /\ + 1000\ + false\ + \ + photos/2006/February/\ + \ + \ + photos/2006/January/\ + \ + \ + photos/2006/March/\ + \ +" + ); + + Ok(()) + } + #[test] fn list_objects_v1_1() -> Result<(), ApiError> { let result = ListBucketResult { diff --git a/src/garage/admin.rs b/src/garage/admin.rs index f315c4dc..4a53792d 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -21,6 +21,7 @@ use garage_model::garage::Garage; use garage_model::helper::error::{Error, OkOrBadRequest}; use garage_model::key_table::*; use garage_model::migrate::Migrate; +use garage_model::object_table::ObjectFilter; use garage_model::permission::*; use crate::cli::*; @@ -209,7 +210,7 @@ impl AdminRpcHandler { let objects = self .garage .object_table - .get_range(&bucket_id, None, Some(DeletedFilter::NotDeleted), 10) + .get_range(&bucket_id, None, Some(ObjectFilter::IsData), 10) .await?; if !objects.is_empty() { return Err(Error::BadRequest(format!( diff --git a/src/model/object_table.rs b/src/model/object_table.rs index 0c6c3a6d..da53878e 100644 --- a/src/model/object_table.rs +++ b/src/model/object_table.rs @@ -218,13 +218,19 @@ pub struct ObjectTable { pub version_table: Arc>, } +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +pub enum ObjectFilter { + IsData, + IsUploading, +} + impl TableSchema for ObjectTable { const TABLE_NAME: &'static str = "object"; type P = Uuid; type S = String; type E = Object; - type Filter = DeletedFilter; + type Filter = ObjectFilter; fn updated(&self, old: Option, new: Option) { let version_table = self.version_table.clone(); @@ -254,8 +260,10 @@ impl TableSchema for ObjectTable { } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { - let deleted = !entry.versions.iter().any(|v| v.is_data()); - filter.apply(deleted) + match filter { + ObjectFilter::IsData => entry.versions.iter().any(|v| v.is_data()), + ObjectFilter::IsUploading => entry.versions.iter().any(|v| v.is_uploading()), + } } fn try_migrate(bytes: &[u8]) -> Option { -- cgit v1.2.3