From f3a097abdfe0db09a5a642a8c0f7534e84cc35ac Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 8 Oct 2021 18:35:38 +0200 Subject: WIP: try to fix #93, and improve S3 ListObjects (v1 and v2) API calls --- src/api/s3_list.rs | 218 +++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 160 insertions(+), 58 deletions(-) diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs index 384346e0..a4de388d 100644 --- a/src/api/s3_list.rs +++ b/src/api/s3_list.rs @@ -68,26 +68,63 @@ pub async fn handle_list( let mut result_keys = BTreeMap::::new(); let mut result_common_prefixes = BTreeSet::::new(); - let mut next_chunk_start = if query.is_v2 { + // Determine the key from where we want to start fetch objects + // from the database, and whether the object at this key must + // be included or excluded from the response. + // This key can be the prefix in the base case, or intermediate + // points in the dataset if we are continuing a previous listing. + #[allow(clippy::collapsible_else_if)] + let (mut next_chunk_start, mut next_chunk_exclude_start) = if query.is_v2 { if let Some(ct) = &query.continuation_token { - String::from_utf8(base64::decode(ct.as_bytes())?)? + // In V2 mode, the continuation token is defined as an opaque + // string in the spec, so we can do whatever we want with it. + // In our case, it is defined as either [ or ] (for include + // and exclude, respectively), followed by a base64 string + // representing the key to start with. + let exclude = match &ct[..1] { + "[" => false, + "]" => true, + _ => return Err(Error::BadRequest("Invalid continuation token".to_string())), + }; + ( + String::from_utf8(base64::decode(ct[1..].as_bytes())?)?, + exclude, + ) + } else if let Some(sa) = &query.start_after { + // StartAfter has defined semantics in the spec: + // start listing at the first key immediately after. + (sa.clone(), true) } else { - query - .start_after - .clone() - .unwrap_or_else(|| query.prefix.clone()) + // In the case where neither is specified, we start + // listing at the specified prefix. If an object has this + // exact same key, we include it. (TODO is this correct?) + (query.prefix.clone(), false) } } else { - query.marker.clone().unwrap_or_else(|| query.prefix.clone()) + if let Some(mk) = &query.marker { + // In V1 mode, the spec defines the Marker value to mean + // the same thing as the StartAfter value in V2 mode. + (mk.clone(), true) + } else { + // Base case, same as in V2 mode + (query.prefix.clone(), false) + } }; debug!( - "List request: `{:?}` {} `{}`", - query.delimiter, query.max_keys, query.prefix + "List request: `{:?}` {} `{}`, start from {}, exclude first {}", + query.delimiter, query.max_keys, query.prefix, next_chunk_start, next_chunk_exclude_start ); + // `truncated` is a boolean that determines whether there are + // more items to be added. let truncated; + // `last_processed_item` is the key of the last item + // that was included in the listing before truncating. + let mut last_processed_item = None; + 'query_loop: loop { + // Fetch objects let objects = garage .object_table .get_range( @@ -103,64 +140,120 @@ pub async fn handle_list( query.max_keys + 1, objects.len() ); + let current_chunk_start = next_chunk_start.clone(); + // Iterate on returned objects and add them to the response. + // If a delimiter is specified, we take care of grouping objects + // into CommonPrefixes. for object in objects.iter() { + // If we have retrieved an object that doesn't start with + // the prefix, we know we have finished listing our stuff. if !object.key.starts_with(&query.prefix) { - truncated = None; + truncated = false; break 'query_loop; } - if query.is_v2 && query.start_after.as_ref() == Some(&object.key) { + // Exclude the starting key if we have to. + if object.key == next_chunk_start && next_chunk_exclude_start { continue; } - if let Some(version) = object.versions().iter().find(|x| x.is_data()) { - if result_keys.len() + result_common_prefixes.len() >= query.max_keys { - truncated = Some(object.key.to_string()); - break 'query_loop; - } - let common_prefix = if let Some(delimiter) = &query.delimiter { - let relative_key = &object.key[query.prefix.len()..]; - relative_key - .find(delimiter) - .map(|i| &object.key[..query.prefix.len() + i + delimiter.len()]) - } else { - None - }; - if let Some(pfx) = common_prefix { + // Find if this object has a currently valid (non-deleted, + // non-still-uploading) version. If not, skip it. + let version = match object.versions().iter().find(|x| x.is_data()) { + Some(v) => v, + None => continue, + }; + + // If we don't have space to add this object to our response, + // we will need to stop here and mark the key of this object + // as the marker from where + // we want to start again in the next list call. + let cannot_add = result_keys.len() + result_common_prefixes.len() >= query.max_keys; + + // Determine whether this object should be grouped inside + // a CommonPrefix because it contains the delimiter, + // or if it should be returned as an object. + let common_prefix = match &query.delimiter { + Some(delimiter) => object.key[query.prefix.len()..] + .find(delimiter) + .map(|i| &object.key[..query.prefix.len() + i + delimiter.len()]), + None => None, + }; + if let Some(pfx) = common_prefix { + // In the case where this object must be grouped in a + // common prefix, handle it here. + if !result_common_prefixes.contains(pfx) { + // Determine the first listing key that starts after + // the common prefix, by finding the next possible + // string by alphabetical order. + let mut first_key_after_prefix = pfx.to_string(); + let tail = first_key_after_prefix.pop().unwrap(); + first_key_after_prefix.push(((tail as u8) + 1) as char); + + // If this were the end of the chunk, + // the next chunk should start after this prefix + next_chunk_start = first_key_after_prefix; + next_chunk_exclude_start = false; + + if cannot_add { + truncated = true; + break 'query_loop; + } result_common_prefixes.insert(pfx.to_string()); - } else { - let meta = match &version.state { - ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta, - ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => { - meta - } - _ => unreachable!(), - }; - let info = match result_keys.get(&object.key) { - None => ListResultInfo { - last_modified: version.timestamp, - size: meta.size, - etag: meta.etag.to_string(), - }, - Some(_lri) => { - return Err(Error::InternalError(GarageError::Message(format!( - "Duplicate key?? {}", - object.key - )))) - } - }; - result_keys.insert(object.key.clone(), info); - }; + } + last_processed_item = Some(object.key.clone()); + continue; + }; + + // This is not a common prefix, we want to add it to our + // response directly. + next_chunk_start = object.key.clone(); + + if cannot_add { + truncated = true; + next_chunk_exclude_start = false; + break 'query_loop; } + + let meta = match &version.state { + ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta, + ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta, + _ => unreachable!(), + }; + let info = match result_keys.get(&object.key) { + None => ListResultInfo { + last_modified: version.timestamp, + size: meta.size, + etag: meta.etag.to_string(), + }, + Some(_lri) => { + return Err(Error::InternalError(GarageError::Message(format!( + "Duplicate key?? {} (this is a bug, please report it)", + object.key + )))) + } + }; + result_keys.insert(object.key.clone(), info); + last_processed_item = Some(object.key.clone()); + next_chunk_exclude_start = true; } + + // If our database returned less objects than what we were asking for, + // it means that no more objects are in the bucket. So we stop here. if objects.len() < query.max_keys + 1 { - truncated = None; + truncated = false; break 'query_loop; } - if !objects.is_empty() { - next_chunk_start = objects[objects.len() - 1].key.clone(); + + // Sanity check: we should have added at least an object + // or a prefix to our returned result. + if next_chunk_start == current_chunk_start || last_processed_item.is_none() { + return Err(Error::InternalError(GarageError::Message(format!( + "S3 ListObject: made no progress, still starting at {} (this is a bug, please report it)", next_chunk_start)))); } + + // Loop and fetch more objects } let mut result = s3_xml::ListBucketResult { @@ -181,11 +274,10 @@ pub async fn handle_list( true => Some(s3_xml::Value("url".to_string())), false => None, }, - key_count: Some(s3_xml::IntValue( result_keys.len() as i64 + result_common_prefixes.len() as i64, )), - is_truncated: s3_xml::Value(format!("{}", truncated.is_some())), + is_truncated: s3_xml::Value(format!("{}", truncated)), contents: vec![], common_prefixes: vec![], }; @@ -197,16 +289,27 @@ pub async fn handle_list( if let Some(sa) = &query.start_after { result.start_after = Some(uriencode_maybe(sa, query.urlencode_resp)); } - if let Some(nct) = truncated { - result.next_continuation_token = Some(s3_xml::Value(base64::encode(nct.as_bytes()))); + if truncated { + let b64 = base64::encode(next_chunk_start.as_bytes()); + let nct = if next_chunk_exclude_start { + format!("]{}", b64) + } else { + format!("[{}", b64) + }; + result.next_continuation_token = Some(s3_xml::Value(nct)); } } else { // TODO: are these supposed to be urlencoded when encoding-type is URL?? if let Some(mkr) = &query.marker { result.marker = Some(uriencode_maybe(mkr, query.urlencode_resp)); } - if let Some(next_marker) = truncated { - result.next_marker = Some(uriencode_maybe(&next_marker, query.urlencode_resp)); + if truncated { + if let Some(lpi) = last_processed_item { + result.next_marker = Some(uriencode_maybe(&lpi, query.urlencode_resp)); + } else { + return Err(Error::InternalError(GarageError::Message( + "S3 ListObject: last_processed_item is None but the response was truncated, indicating that many items were processed (this is a bug, please report it)".to_string()))); + } } } @@ -221,7 +324,6 @@ pub async fn handle_list( } for pfx in result_common_prefixes.iter() { - //TODO: in V1, are these urlencoded when urlencode_resp is true ?? (proably) result.common_prefixes.push(s3_xml::CommonPrefix { prefix: uriencode_maybe(pfx, query.urlencode_resp), }); -- cgit v1.2.3