aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-10-08 18:35:38 +0200
committerAlex Auvolat <alex@adnab.me>2021-10-11 11:15:47 +0200
commitf3a097abdfe0db09a5a642a8c0f7534e84cc35ac (patch)
tree1cdb29a43850afbaffb0597823f77b2f485bdb70
parent1aed317818a423439292055279dbe1a7023c049a (diff)
downloadgarage-f3a097abdfe0db09a5a642a8c0f7534e84cc35ac.tar.gz
garage-f3a097abdfe0db09a5a642a8c0f7534e84cc35ac.zip
WIP: try to fix #93, and improve S3 ListObjects (v1 and v2) API calls
-rw-r--r--src/api/s3_list.rs218
1 files changed, 160 insertions, 58 deletions
diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs
index 384346e0..a4de388d 100644
--- a/src/api/s3_list.rs
+++ b/src/api/s3_list.rs
@@ -68,26 +68,63 @@ pub async fn handle_list(
let mut result_keys = BTreeMap::<String, ListResultInfo>::new();
let mut result_common_prefixes = BTreeSet::<String>::new();
- let mut next_chunk_start = if query.is_v2 {
+ // Determine the key from where we want to start fetch objects
+ // from the database, and whether the object at this key must
+ // be included or excluded from the response.
+ // This key can be the prefix in the base case, or intermediate
+ // points in the dataset if we are continuing a previous listing.
+ #[allow(clippy::collapsible_else_if)]
+ let (mut next_chunk_start, mut next_chunk_exclude_start) = if query.is_v2 {
if let Some(ct) = &query.continuation_token {
- String::from_utf8(base64::decode(ct.as_bytes())?)?
+ // In V2 mode, the continuation token is defined as an opaque
+ // string in the spec, so we can do whatever we want with it.
+ // In our case, it is defined as either [ or ] (for include
+ // and exclude, respectively), followed by a base64 string
+ // representing the key to start with.
+ let exclude = match &ct[..1] {
+ "[" => false,
+ "]" => true,
+ _ => return Err(Error::BadRequest("Invalid continuation token".to_string())),
+ };
+ (
+ String::from_utf8(base64::decode(ct[1..].as_bytes())?)?,
+ exclude,
+ )
+ } else if let Some(sa) = &query.start_after {
+ // StartAfter has defined semantics in the spec:
+ // start listing at the first key immediately after.
+ (sa.clone(), true)
} else {
- query
- .start_after
- .clone()
- .unwrap_or_else(|| query.prefix.clone())
+ // In the case where neither is specified, we start
+ // listing at the specified prefix. If an object has this
+ // exact same key, we include it. (TODO is this correct?)
+ (query.prefix.clone(), false)
}
} else {
- query.marker.clone().unwrap_or_else(|| query.prefix.clone())
+ if let Some(mk) = &query.marker {
+ // In V1 mode, the spec defines the Marker value to mean
+ // the same thing as the StartAfter value in V2 mode.
+ (mk.clone(), true)
+ } else {
+ // Base case, same as in V2 mode
+ (query.prefix.clone(), false)
+ }
};
debug!(
- "List request: `{:?}` {} `{}`",
- query.delimiter, query.max_keys, query.prefix
+ "List request: `{:?}` {} `{}`, start from {}, exclude first {}",
+ query.delimiter, query.max_keys, query.prefix, next_chunk_start, next_chunk_exclude_start
);
+ // `truncated` is a boolean that determines whether there are
+ // more items to be added.
let truncated;
+ // `last_processed_item` is the key of the last item
+ // that was included in the listing before truncating.
+ let mut last_processed_item = None;
+
'query_loop: loop {
+ // Fetch objects
let objects = garage
.object_table
.get_range(
@@ -103,64 +140,120 @@ pub async fn handle_list(
query.max_keys + 1,
objects.len()
);
+ let current_chunk_start = next_chunk_start.clone();
+ // Iterate on returned objects and add them to the response.
+ // If a delimiter is specified, we take care of grouping objects
+ // into CommonPrefixes.
for object in objects.iter() {
+ // If we have retrieved an object that doesn't start with
+ // the prefix, we know we have finished listing our stuff.
if !object.key.starts_with(&query.prefix) {
- truncated = None;
+ truncated = false;
break 'query_loop;
}
- if query.is_v2 && query.start_after.as_ref() == Some(&object.key) {
+ // Exclude the starting key if we have to.
+ if object.key == next_chunk_start && next_chunk_exclude_start {
continue;
}
- if let Some(version) = object.versions().iter().find(|x| x.is_data()) {
- if result_keys.len() + result_common_prefixes.len() >= query.max_keys {
- truncated = Some(object.key.to_string());
- break 'query_loop;
- }
- let common_prefix = if let Some(delimiter) = &query.delimiter {
- let relative_key = &object.key[query.prefix.len()..];
- relative_key
- .find(delimiter)
- .map(|i| &object.key[..query.prefix.len() + i + delimiter.len()])
- } else {
- None
- };
- if let Some(pfx) = common_prefix {
+ // Find if this object has a currently valid (non-deleted,
+ // non-still-uploading) version. If not, skip it.
+ let version = match object.versions().iter().find(|x| x.is_data()) {
+ Some(v) => v,
+ None => continue,
+ };
+
+ // If we don't have space to add this object to our response,
+ // we will need to stop here and mark the key of this object
+ // as the marker from where
+ // we want to start again in the next list call.
+ let cannot_add = result_keys.len() + result_common_prefixes.len() >= query.max_keys;
+
+ // Determine whether this object should be grouped inside
+ // a CommonPrefix because it contains the delimiter,
+ // or if it should be returned as an object.
+ let common_prefix = match &query.delimiter {
+ Some(delimiter) => object.key[query.prefix.len()..]
+ .find(delimiter)
+ .map(|i| &object.key[..query.prefix.len() + i + delimiter.len()]),
+ None => None,
+ };
+ if let Some(pfx) = common_prefix {
+ // In the case where this object must be grouped in a
+ // common prefix, handle it here.
+ if !result_common_prefixes.contains(pfx) {
+ // Determine the first listing key that starts after
+ // the common prefix, by finding the next possible
+ // string by alphabetical order.
+ let mut first_key_after_prefix = pfx.to_string();
+ let tail = first_key_after_prefix.pop().unwrap();
+ first_key_after_prefix.push(((tail as u8) + 1) as char);
+
+ // If this were the end of the chunk,
+ // the next chunk should start after this prefix
+ next_chunk_start = first_key_after_prefix;
+ next_chunk_exclude_start = false;
+
+ if cannot_add {
+ truncated = true;
+ break 'query_loop;
+ }
result_common_prefixes.insert(pfx.to_string());
- } else {
- let meta = match &version.state {
- ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta,
- ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => {
- meta
- }
- _ => unreachable!(),
- };
- let info = match result_keys.get(&object.key) {
- None => ListResultInfo {
- last_modified: version.timestamp,
- size: meta.size,
- etag: meta.etag.to_string(),
- },
- Some(_lri) => {
- return Err(Error::InternalError(GarageError::Message(format!(
- "Duplicate key?? {}",
- object.key
- ))))
- }
- };
- result_keys.insert(object.key.clone(), info);
- };
+ }
+ last_processed_item = Some(object.key.clone());
+ continue;
+ };
+
+ // This is not a common prefix, we want to add it to our
+ // response directly.
+ next_chunk_start = object.key.clone();
+
+ if cannot_add {
+ truncated = true;
+ next_chunk_exclude_start = false;
+ break 'query_loop;
}
+
+ let meta = match &version.state {
+ ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta,
+ ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta,
+ _ => unreachable!(),
+ };
+ let info = match result_keys.get(&object.key) {
+ None => ListResultInfo {
+ last_modified: version.timestamp,
+ size: meta.size,
+ etag: meta.etag.to_string(),
+ },
+ Some(_lri) => {
+ return Err(Error::InternalError(GarageError::Message(format!(
+ "Duplicate key?? {} (this is a bug, please report it)",
+ object.key
+ ))))
+ }
+ };
+ result_keys.insert(object.key.clone(), info);
+ last_processed_item = Some(object.key.clone());
+ next_chunk_exclude_start = true;
}
+
+ // If our database returned less objects than what we were asking for,
+ // it means that no more objects are in the bucket. So we stop here.
if objects.len() < query.max_keys + 1 {
- truncated = None;
+ truncated = false;
break 'query_loop;
}
- if !objects.is_empty() {
- next_chunk_start = objects[objects.len() - 1].key.clone();
+
+ // Sanity check: we should have added at least an object
+ // or a prefix to our returned result.
+ if next_chunk_start == current_chunk_start || last_processed_item.is_none() {
+ return Err(Error::InternalError(GarageError::Message(format!(
+ "S3 ListObject: made no progress, still starting at {} (this is a bug, please report it)", next_chunk_start))));
}
+
+ // Loop and fetch more objects
}
let mut result = s3_xml::ListBucketResult {
@@ -181,11 +274,10 @@ pub async fn handle_list(
true => Some(s3_xml::Value("url".to_string())),
false => None,
},
-
key_count: Some(s3_xml::IntValue(
result_keys.len() as i64 + result_common_prefixes.len() as i64,
)),
- is_truncated: s3_xml::Value(format!("{}", truncated.is_some())),
+ is_truncated: s3_xml::Value(format!("{}", truncated)),
contents: vec![],
common_prefixes: vec![],
};
@@ -197,16 +289,27 @@ pub async fn handle_list(
if let Some(sa) = &query.start_after {
result.start_after = Some(uriencode_maybe(sa, query.urlencode_resp));
}
- if let Some(nct) = truncated {
- result.next_continuation_token = Some(s3_xml::Value(base64::encode(nct.as_bytes())));
+ if truncated {
+ let b64 = base64::encode(next_chunk_start.as_bytes());
+ let nct = if next_chunk_exclude_start {
+ format!("]{}", b64)
+ } else {
+ format!("[{}", b64)
+ };
+ result.next_continuation_token = Some(s3_xml::Value(nct));
}
} else {
// TODO: are these supposed to be urlencoded when encoding-type is URL??
if let Some(mkr) = &query.marker {
result.marker = Some(uriencode_maybe(mkr, query.urlencode_resp));
}
- if let Some(next_marker) = truncated {
- result.next_marker = Some(uriencode_maybe(&next_marker, query.urlencode_resp));
+ if truncated {
+ if let Some(lpi) = last_processed_item {
+ result.next_marker = Some(uriencode_maybe(&lpi, query.urlencode_resp));
+ } else {
+ return Err(Error::InternalError(GarageError::Message(
+ "S3 ListObject: last_processed_item is None but the response was truncated, indicating that many items were processed (this is a bug, please report it)".to_string())));
+ }
}
}
@@ -221,7 +324,6 @@ pub async fn handle_list(
}
for pfx in result_common_prefixes.iter() {
- //TODO: in V1, are these urlencoded when urlencode_resp is true ?? (proably)
result.common_prefixes.push(s3_xml::CommonPrefix {
prefix: uriencode_maybe(pfx, query.urlencode_resp),
});