aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorQuentin Dufour <quentin@deuxfleurs.fr>2021-11-24 10:00:07 +0100
committerQuentin Dufour <quentin@deuxfleurs.fr>2022-01-12 17:07:40 +0100
commitce357b9a123369f953be3fbcb22e87d3304e2e4b (patch)
tree29357956ccc86becf0c62ee07fbbb1908efa34d2 /src
parent9cb2e9e57ce1aab23d8c3b2aaa7581c8e8b78253 (diff)
downloadgarage-ce357b9a123369f953be3fbcb22e87d3304e2e4b.tar.gz
garage-ce357b9a123369f953be3fbcb22e87d3304e2e4b.zip
Add ListMultipartUploads + Refactor ListObjects
Diffstat (limited to 'src')
-rw-r--r--src/api/api_server.rs55
-rw-r--r--src/api/s3_bucket.rs3
-rw-r--r--src/api/s3_list.rs1115
-rw-r--r--src/api/s3_put.rs2
-rw-r--r--src/api/s3_router.rs2
-rw-r--r--src/api/s3_xml.rs106
-rw-r--r--src/garage/admin.rs3
-rw-r--r--src/model/object_table.rs14
8 files changed, 1043 insertions, 257 deletions
diff --git a/src/api/api_server.rs b/src/api/api_server.rs
index 41aa0046..16156e74 100644
--- a/src/api/api_server.rs
+++ b/src/api/api_server.rs
@@ -1,3 +1,4 @@
+use std::cmp::{max, min};
use std::net::SocketAddr;
use std::sync::Arc;
@@ -217,16 +218,18 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
handle_list(
garage,
&ListObjectsQuery {
+ common: ListQueryCommon {
+ bucket_name: bucket,
+ bucket_id,
+ delimiter: delimiter.map(|d| d.to_string()),
+ page_size: max_keys.map(|p| min(1000, max(1, p))).unwrap_or(1000),
+ prefix: prefix.unwrap_or_default(),
+ urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
+ },
is_v2: false,
- bucket_name: bucket,
- bucket_id,
- delimiter: delimiter.map(|d| d.to_string()),
- max_keys: max_keys.unwrap_or(1000),
- prefix: prefix.unwrap_or_default(),
marker,
continuation_token: None,
start_after: None,
- urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
},
)
.await
@@ -246,16 +249,18 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
handle_list(
garage,
&ListObjectsQuery {
+ common: ListQueryCommon {
+ bucket_name: bucket,
+ bucket_id,
+ delimiter: delimiter.map(|d| d.to_string()),
+ page_size: max_keys.map(|p| min(1000, max(1, p))).unwrap_or(1000),
+ urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
+ prefix: prefix.unwrap_or_default(),
+ },
is_v2: true,
- bucket_name: bucket,
- bucket_id,
- delimiter: delimiter.map(|d| d.to_string()),
- max_keys: max_keys.unwrap_or(1000),
- prefix: prefix.unwrap_or_default(),
marker: None,
continuation_token,
start_after,
- urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
},
)
.await
@@ -266,6 +271,32 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
)))
}
}
+ Endpoint::ListMultipartUploads {
+ bucket,
+ delimiter,
+ encoding_type,
+ key_marker,
+ max_uploads,
+ prefix,
+ upload_id_marker,
+ } => {
+ handle_list_multipart_upload(
+ garage,
+ &ListMultipartUploadsQuery {
+ common: ListQueryCommon {
+ bucket_name: bucket,
+ bucket_id,
+ delimiter: delimiter.map(|d| d.to_string()),
+ page_size: max_uploads.map(|p| min(1000, max(1, p))).unwrap_or(1000),
+ prefix: prefix.unwrap_or_default(),
+ urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
+ },
+ key_marker,
+ upload_id_marker,
+ },
+ )
+ .await
+ }
Endpoint::DeleteObjects { .. } => {
handle_delete_objects(garage, bucket_id, req, content_sha256).await
}
diff --git a/src/api/s3_bucket.rs b/src/api/s3_bucket.rs
index 425d2998..494224c8 100644
--- a/src/api/s3_bucket.rs
+++ b/src/api/s3_bucket.rs
@@ -7,6 +7,7 @@ use garage_model::bucket_alias_table::*;
use garage_model::bucket_table::Bucket;
use garage_model::garage::Garage;
use garage_model::key_table::Key;
+use garage_model::object_table::ObjectFilter;
use garage_model::permission::BucketKeyPerm;
use garage_table::util::*;
use garage_util::crdt::*;
@@ -226,7 +227,7 @@ pub async fn handle_delete_bucket(
// Check bucket is empty
let objects = garage
.object_table
- .get_range(&bucket_id, None, Some(DeletedFilter::NotDeleted), 10)
+ .get_range(&bucket_id, None, Some(ObjectFilter::IsData), 10)
.await?;
if !objects.is_empty() {
return Err(Error::BucketNotEmpty);
diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs
index 07efb02d..0b08069a 100644
--- a/src/api/s3_list.rs
+++ b/src/api/s3_list.rs
@@ -1,4 +1,5 @@
use std::collections::{BTreeMap, BTreeSet};
+use std::iter::{Iterator, Peekable};
use std::sync::Arc;
use hyper::{Body, Response};
@@ -10,308 +11,716 @@ use garage_util::time::*;
use garage_model::garage::Garage;
use garage_model::object_table::*;
-use garage_table::DeletedFilter;
-
use crate::encoding::*;
use crate::error::*;
+use crate::s3_put;
use crate::s3_xml;
#[derive(Debug)]
-pub struct ListObjectsQuery {
- pub is_v2: bool,
+pub struct ListQueryCommon {
pub bucket_name: String,
pub bucket_id: Uuid,
pub delimiter: Option<String>,
- pub max_keys: usize,
+ pub page_size: usize,
pub prefix: String,
+ pub urlencode_resp: bool,
+}
+
+#[derive(Debug)]
+pub struct ListObjectsQuery {
+ pub is_v2: bool,
pub marker: Option<String>,
pub continuation_token: Option<String>,
pub start_after: Option<String>,
- pub urlencode_resp: bool,
+ pub common: ListQueryCommon,
}
#[derive(Debug)]
-struct ListResultInfo {
- last_modified: u64,
- size: u64,
- etag: String,
+pub struct ListMultipartUploadsQuery {
+ pub key_marker: Option<String>,
+ pub upload_id_marker: Option<String>,
+ pub common: ListQueryCommon,
}
pub async fn handle_list(
garage: Arc<Garage>,
query: &ListObjectsQuery,
) -> Result<Response<Body>, Error> {
- let mut result_keys = BTreeMap::<String, ListResultInfo>::new();
- let mut result_common_prefixes = BTreeSet::<String>::new();
-
- // Determine the key from where we want to start fetch objects
- // from the database, and whether the object at this key must
- // be included or excluded from the response.
- // This key can be the prefix in the base case, or intermediate
- // points in the dataset if we are continuing a previous listing.
- #[allow(clippy::collapsible_else_if)]
- let (mut next_chunk_start, mut next_chunk_exclude_start) = if query.is_v2 {
- if let Some(ct) = &query.continuation_token {
- // In V2 mode, the continuation token is defined as an opaque
- // string in the spec, so we can do whatever we want with it.
- // In our case, it is defined as either [ or ] (for include
- // and exclude, respectively), followed by a base64 string
- // representing the key to start with.
- let exclude = match &ct[..1] {
- "[" => false,
- "]" => true,
- _ => return Err(Error::BadRequest("Invalid continuation token".to_string())),
- };
- (
- String::from_utf8(base64::decode(ct[1..].as_bytes())?)?,
- exclude,
- )
- } else if let Some(sa) = &query.start_after {
- // StartAfter has defined semantics in the spec:
- // start listing at the first key immediately after.
- (sa.clone(), true)
- } else {
- // In the case where neither is specified, we start
- // listing at the specified prefix. If an object has this
- // exact same key, we include it. (TODO is this correct?)
- (query.prefix.clone(), false)
+ let io = |bucket, key, count| {
+ let t = &garage.object_table;
+ async move {
+ t.get_range(&bucket, key, Some(ObjectFilter::IsData), count)
+ .await
}
- } else {
- if let Some(mk) = &query.marker {
- // In V1 mode, the spec defines the Marker value to mean
- // the same thing as the StartAfter value in V2 mode.
- (mk.clone(), true)
- } else {
- // Base case, same as in V2 mode
- (query.prefix.clone(), false)
+ };
+
+ let mut acc = query.build_accumulator();
+ let pagination = fetch_list_entries(&query.common, query.begin()?, &mut acc, &io).await?;
+
+ let result = s3_xml::ListBucketResult {
+ xmlns: (),
+ // Sending back request information
+ name: s3_xml::Value(query.common.bucket_name.to_string()),
+ prefix: uriencode_maybe(&query.common.prefix, query.common.urlencode_resp),
+ max_keys: s3_xml::IntValue(query.common.page_size as i64),
+ delimiter: query
+ .common
+ .delimiter
+ .as_ref()
+ .map(|x| uriencode_maybe(x, query.common.urlencode_resp)),
+ encoding_type: match query.common.urlencode_resp {
+ true => Some(s3_xml::Value("url".to_string())),
+ false => None,
+ },
+ marker: match (!query.is_v2, &query.marker) {
+ (true, Some(k)) => Some(uriencode_maybe(k, query.common.urlencode_resp)),
+ _ => None,
+ },
+ start_after: match (query.is_v2, &query.start_after) {
+ (true, Some(sa)) => Some(uriencode_maybe(sa, query.common.urlencode_resp)),
+ _ => None,
+ },
+ continuation_token: match (query.is_v2, &query.continuation_token) {
+ (true, Some(ct)) => Some(s3_xml::Value(ct.to_string())),
+ _ => None,
+ },
+
+ // Pagination
+ is_truncated: s3_xml::Value(format!("{}", pagination.is_some())),
+ key_count: Some(s3_xml::IntValue(
+ acc.keys.len() as i64 + acc.common_prefixes.len() as i64,
+ )),
+ next_marker: match (!query.is_v2, &pagination) {
+ (true, Some(RangeBegin::AfterKey { key: k }))
+ | (
+ true,
+ Some(RangeBegin::IncludingKey {
+ fallback_key: Some(k),
+ ..
+ }),
+ ) => Some(uriencode_maybe(k, query.common.urlencode_resp)),
+ _ => None,
+ },
+ next_continuation_token: match (query.is_v2, &pagination) {
+ (true, Some(RangeBegin::AfterKey { key })) => Some(s3_xml::Value(format!(
+ "]{}",
+ base64::encode(key.as_bytes())
+ ))),
+ (true, Some(RangeBegin::IncludingKey { key, .. })) => Some(s3_xml::Value(format!(
+ "[{}",
+ base64::encode(key.as_bytes())
+ ))),
+ _ => None,
+ },
+
+ // Body
+ contents: acc
+ .keys
+ .iter()
+ .map(|(key, info)| s3_xml::ListBucketItem {
+ key: uriencode_maybe(key, query.common.urlencode_resp),
+ last_modified: s3_xml::Value(msec_to_rfc3339(info.last_modified)),
+ size: s3_xml::IntValue(info.size as i64),
+ etag: s3_xml::Value(info.etag.to_string()),
+ storage_class: s3_xml::Value("STANDARD".to_string()),
+ })
+ .collect(),
+ common_prefixes: acc
+ .common_prefixes
+ .iter()
+ .map(|pfx| s3_xml::CommonPrefix {
+ prefix: uriencode_maybe(pfx, query.common.urlencode_resp),
+ })
+ .collect(),
+ };
+
+ let xml = s3_xml::to_xml_with_header(&result)?;
+ Ok(Response::builder()
+ .header("Content-Type", "application/xml")
+ .body(Body::from(xml.into_bytes()))?)
+}
+
+pub async fn handle_list_multipart_upload(
+ garage: Arc<Garage>,
+ query: &ListMultipartUploadsQuery,
+) -> Result<Response<Body>, Error> {
+ let io = |bucket, key, count| {
+ let t = &garage.object_table;
+ async move {
+ t.get_range(&bucket, key, Some(ObjectFilter::IsUploading), count)
+ .await
}
};
- debug!(
- "List request: `{:?}` {} `{}`, start from {}, exclude first {}",
- query.delimiter, query.max_keys, query.prefix, next_chunk_start, next_chunk_exclude_start
- );
+ let mut acc = query.build_accumulator();
+ let pagination = fetch_list_entries(&query.common, query.begin()?, &mut acc, &io).await?;
- // `truncated` is a boolean that determines whether there are
- // more items to be added.
- let truncated;
- // `last_processed_item` is the key of the last item
- // that was included in the listing before truncating.
- let mut last_processed_item = None;
+ let result = s3_xml::ListMultipartUploadsResult {
+ xmlns: (),
+
+ // Sending back some information about the request
+ bucket: s3_xml::Value(query.common.bucket_name.to_string()),
+ prefix: uriencode_maybe(&query.common.prefix, query.common.urlencode_resp),
+ delimiter: query
+ .common
+ .delimiter
+ .as_ref()
+ .map(|d| uriencode_maybe(d, query.common.urlencode_resp)),
+ max_uploads: s3_xml::IntValue(query.common.page_size as i64),
+ key_marker: query
+ .key_marker
+ .as_ref()
+ .map(|m| uriencode_maybe(m, query.common.urlencode_resp)),
+ upload_id_marker: query
+ .upload_id_marker
+ .as_ref()
+ .map(|m| s3_xml::Value(m.to_string())),
+ encoding_type: match query.common.urlencode_resp {
+ true => Some(s3_xml::Value("url".to_string())),
+ false => None,
+ },
+
+ // Handling pagination
+ is_truncated: s3_xml::Value(format!("{}", pagination.is_some())),
+ next_key_marker: match &pagination {
+ None => None,
+ Some(RangeBegin::AfterKey { key })
+ | Some(RangeBegin::AfterUpload { key, .. })
+ | Some(RangeBegin::IncludingKey { key, .. }) => {
+ Some(uriencode_maybe(key, query.common.urlencode_resp))
+ }
+ },
+ next_upload_id_marker: match pagination {
+ Some(RangeBegin::AfterUpload { upload, .. }) => {
+ Some(s3_xml::Value(hex::encode(upload)))
+ }
+ Some(RangeBegin::IncludingKey { .. }) => Some(s3_xml::Value("include".to_string())),
+ _ => None,
+ },
+
+ // Result body
+ upload: acc
+ .keys
+ .iter()
+ .map(|(uuid, info)| s3_xml::ListMultipartItem {
+ initiated: s3_xml::Value(msec_to_rfc3339(info.timestamp)),
+ key: uriencode_maybe(&info.key, query.common.urlencode_resp),
+ upload_id: s3_xml::Value(hex::encode(uuid)),
+ storage_class: s3_xml::Value("STANDARD".to_string()),
+ initiator: s3_xml::Initiator {
+ display_name: s3_xml::Value("Dummy Key".to_string()),
+ id: s3_xml::Value("GKDummyKey".to_string()),
+ },
+ owner: s3_xml::Owner {
+ display_name: s3_xml::Value("Dummy Key".to_string()),
+ id: s3_xml::Value("GKDummyKey".to_string()),
+ },
+ })
+ .collect(),
+ common_prefixes: acc
+ .common_prefixes
+ .iter()
+ .map(|c| s3_xml::CommonPrefix {
+ prefix: s3_xml::Value(c.to_string()),
+ })
+ .collect(),
+ };
+
+ let xml = s3_xml::to_xml_with_header(&result)?;
+
+ Ok(Response::builder()
+ .header("Content-Type", "application/xml")
+ .body(Body::from(xml.into_bytes()))?)
+}
+
+/*
+ * Private enums and structs
+ */
+
+#[derive(Debug)]
+struct ObjectInfo {
+ last_modified: u64,
+ size: u64,
+ etag: String,
+}
+
+#[derive(Debug, PartialEq)]
+struct UploadInfo {
+ key: String,
+ timestamp: u64,
+}
+
+enum ExtractionResult {
+ Stopped,
+ StoppedAtUpload {
+ key: String,
+ upload: Uuid,
+ },
+ Extracted {
+ key: String,
+ },
+ // Fallback key is used for legacy APIs that only support
+ // exlusive pagination (and not inclusive one).
+ SkipTo {
+ key: String,
+ fallback_key: Option<String>,
+ },
+}
+
+#[derive(PartialEq, Clone, Debug)]
+enum RangeBegin {
+ // Fallback key is used for legacy APIs that only support
+ // exlusive pagination (and not inclusive one).
+ IncludingKey {
+ key: String,
+ fallback_key: Option<String>,
+ },
+ AfterKey {
+ key: String,
+ },
+ AfterUpload {
+ key: String,
+ upload: Uuid,
+ },
+}
+type Pagination = Option<RangeBegin>;
+
+/*
+ * Fetch list entries
+ */
+
+async fn fetch_list_entries<R, F>(
+ query: &ListQueryCommon,
+ begin: RangeBegin,
+ acc: &mut impl ExtractAccumulator,
+ mut io: F,
+) -> Result<Pagination, Error>
+where
+ R: futures::Future<Output = Result<Vec<Object>, GarageError>>,
+ F: FnMut(Uuid, Option<String>, usize) -> R,
+{
+ let mut cursor = begin;
+ // +1 is needed as we may need to skip the 1st key
+ // (range is inclusive while most S3 requests are exclusive)
+ let count = query.page_size + 1;
+
+ loop {
+ let start_key = match cursor {
+ RangeBegin::AfterKey { ref key }
+ | RangeBegin::AfterUpload { ref key, .. }
+ | RangeBegin::IncludingKey { ref key, .. } => Some(key.clone()),
+ };
- 'query_loop: loop {
// Fetch objects
- let objects = garage
- .object_table
- .get_range(
- &query.bucket_id,
- Some(next_chunk_start.clone()),
- Some(DeletedFilter::NotDeleted),
- query.max_keys + 1,
- )
- .await?;
+ let objects = io(query.bucket_id, start_key.clone(), count).await?;
+
debug!(
- "List: get range {} (max {}), results: {}",
- next_chunk_start,
- query.max_keys + 1,
+ "List: get range {:?} (max {}), results: {}",
+ start_key,
+ count,
objects.len()
);
- let current_chunk_start = next_chunk_start.clone();
-
- // Iterate on returned objects and add them to the response.
- // If a delimiter is specified, we take care of grouping objects
- // into CommonPrefixes.
- for object in objects.iter() {
- // If we have retrieved an object that doesn't start with
- // the prefix, we know we have finished listing our stuff.
- if !object.key.starts_with(&query.prefix) {
- truncated = false;
- break 'query_loop;
- }
+ let server_more = objects.len() >= count;
- // Exclude the starting key if we have to.
- if object.key == next_chunk_start && next_chunk_exclude_start {
- continue;
- }
+ let prev_req_cursor = cursor.clone();
+ let mut iter = objects.iter().peekable();
- // Find if this object has a currently valid (non-deleted,
- // non-still-uploading) version. If not, skip it.
- let version = match object.versions().iter().find(|x| x.is_data()) {
- Some(v) => v,
- None => continue,
- };
+ // Drop the first key if needed
+ // Only AfterKey requires it according to the S3 spec and our implem.
+ match (&cursor, iter.peek()) {
+ (RangeBegin::AfterKey { key }, Some(object)) if &object.key == key => iter.next(),
+ (_, _) => None,
+ };
- // If we don't have space to add this object to our response,
- // we will need to stop here and mark the key of this object
- // as the marker from where
- // we want to start again in the next list call.
- let cannot_add = result_keys.len() + result_common_prefixes.len() >= query.max_keys;
-
- // Determine whether this object should be grouped inside
- // a CommonPrefix because it contains the delimiter,
- // or if it should be returned as an object.
- let common_prefix = match &query.delimiter {
- Some(delimiter) => object.key[query.prefix.len()..]
- .find(delimiter)
- .map(|i| &object.key[..query.prefix.len() + i + delimiter.len()]),
- None => None,
- };
- if let Some(pfx) = common_prefix {
- // In the case where this object must be grouped in a
- // common prefix, handle it here.
- if !result_common_prefixes.contains(pfx) {
- // Determine the first listing key that starts after
- // the common prefix, by finding the next possible
- // string by alphabetical order.
- let mut first_key_after_prefix = pfx.to_string();
- let tail = first_key_after_prefix.pop().unwrap();
- first_key_after_prefix.push(((tail as u8) + 1) as char);
-
- // If this were the end of the chunk,
- // the next chunk should start after this prefix
- next_chunk_start = first_key_after_prefix;
- next_chunk_exclude_start = false;
-
- if cannot_add {
- truncated = true;
- break 'query_loop;
- }
- result_common_prefixes.insert(pfx.to_string());
+ while let Some(object) = iter.peek() {
+ if !object.key.starts_with(&query.prefix) {
+ // If the key is not in the requested prefix, we're done
+ return Ok(None);
+ }
+
+ cursor = match acc.extract(query, &cursor, &mut iter) {
+ ExtractionResult::Extracted { key } => RangeBegin::AfterKey { key },
+ ExtractionResult::SkipTo { key, fallback_key } => {
+ RangeBegin::IncludingKey { key, fallback_key }
}
- last_processed_item = Some(object.key.clone());
- continue;
+ ExtractionResult::StoppedAtUpload { key, upload } => {
+ return Ok(Some(RangeBegin::AfterUpload { key, upload }))
+ }
+ ExtractionResult::Stopped => return Ok(Some(cursor)),
};
+ }
- // This is not a common prefix, we want to add it to our
- // response directly.
- next_chunk_start = object.key.clone();
+ if !server_more {
+ // We did not fully fill the accumulator despite exhausting all the data we have,
+ // we're done
+ return Ok(None);
+ }
- if cannot_add {
- truncated = true;
- next_chunk_exclude_start = false;
- break 'query_loop;
- }
+ if prev_req_cursor == cursor {
+ unreachable!("No progress has been done in the loop. This is a bug, please report it.");
+ }
+ }
+}
- let meta = match &version.state {
- ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta,
- ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta,
- _ => unreachable!(),
- };
- let info = match result_keys.get(&object.key) {
- None => ListResultInfo {
- last_modified: version.timestamp,
- size: meta.size,
- etag: meta.etag.to_string(),
+/*
+ * ListQuery logic
+ */
+
+/// Determine the key from where we want to start fetch objects from the database
+///
+/// We choose whether the object at this key must
+/// be included or excluded from the response.
+/// This key can be the prefix in the base case, or intermediate
+/// points in the dataset if we are continuing a previous listing.
+impl ListObjectsQuery {
+ fn build_accumulator(&self) -> Accumulator<String, ObjectInfo> {
+ Accumulator::<String, ObjectInfo>::new(self.common.page_size)
+ }
+
+ fn begin(&self) -> Result<RangeBegin, Error> {
+ if self.is_v2 {
+ match (&self.continuation_token, &self.start_after) {
+ // In V2 mode, the continuation token is defined as an opaque
+ // string in the spec, so we can do whatever we want with it.
+ // In our case, it is defined as either [ or ] (for include
+ // representing the key to start with.
+ (Some(token), _) => match &token[..1] {
+ "[" => Ok(RangeBegin::IncludingKey {
+ key: String::from_utf8(base64::decode(token[1..].as_bytes())?)?,
+ fallback_key: None,
+ }),
+ "]" => Ok(RangeBegin::AfterKey {
+ key: String::from_utf8(base64::decode(token[1..].as_bytes())?)?,
+ }),
+ _ => Err(Error::BadRequest("Invalid continuation token".to_string())),
},
- Some(_lri) => {
- return Err(Error::InternalError(GarageError::Message(format!(
- "Duplicate key?? {} (this is a bug, please report it)",
- object.key
- ))))
+
+ // StartAfter has defined semantics in the spec:
+ // start listing at the first key immediately after.
+ (_, Some(key)) => Ok(RangeBegin::AfterKey {
+ key: key.to_string(),
+ }),
+
+ // In the case where neither is specified, we start
+ // listing at the specified prefix. If an object has this
+ // exact same key, we include it. (@TODO is this correct?)
+ _ => Ok(RangeBegin::IncludingKey {
+ key: self.common.prefix.to_string(),
+ fallback_key: None,
+ }),
+ }
+ } else {
+ match &self.marker {
+ // In V1 mode, the spec defines the Marker value to mean
+ // the same thing as the StartAfter value in V2 mode.
+ Some(key) => Ok(RangeBegin::AfterKey {
+ key: key.to_string(),
+ }),
+ _ => Ok(RangeBegin::IncludingKey {
+ key: self.common.prefix.to_string(),
+ fallback_key: None,
+ }),
+ }
+ }
+ }
+}
+
+impl ListMultipartUploadsQuery {
+ fn build_accumulator(&self) -> Accumulator<Uuid, UploadInfo> {
+ Accumulator::<Uuid, UploadInfo>::new(self.common.page_size)
+ }
+
+ fn begin(&self) -> Result<RangeBegin, Error> {
+ match (&self.upload_id_marker, &self.key_marker) {
+ // If both the upload id marker and the key marker are sets,
+ // the spec specifies that we must start listing uploads INCLUDING the given key,
+ // AFTER the specified upload id (sorted in a lexicographic order).
+ // To enable some optimisations, we emulate "IncludingKey" by extending the upload id
+ // semantic. We base our reasoning on the hypothesis that S3's upload ids are opaques
+ // while Garage's ones are 32 bytes hex encoded which enables us to extend this query
+ // with a specific "include" upload id.
+ (Some(up_marker), Some(key_marker)) => match &up_marker[..] {
+ "include" => Ok(RangeBegin::IncludingKey {
+ key: key_marker.to_string(),
+ fallback_key: None,
+ }),
+ uuid => Ok(RangeBegin::AfterUpload {
+ key: key_marker.to_string(),
+ upload: s3_put::decode_upload_id(uuid)?,
+ }),
+ },
+
+ // If only the key marker is specified, the spec says that we must start listing
+ // uploads AFTER the specified key.
+ (None, Some(key_marker)) => Ok(RangeBegin::AfterKey {
+ key: key_marker.to_string(),
+ }),
+ _ => Ok(RangeBegin::IncludingKey {
+ key: self.common.prefix.to_string(),
+ fallback_key: None,
+ }),
+ }
+ }
+}
+
+/*
+ * Accumulator logic
+ */
+
+trait ExtractAccumulator {
+ fn extract<'a>(
+ &mut self,
+ query: &ListQueryCommon,
+ cursor: &RangeBegin,
+ iter: &mut Peekable<impl Iterator<Item = &'a Object>>,
+ ) -> ExtractionResult;
+}
+
+struct Accumulator<K, V> {
+ common_prefixes: BTreeSet<String>,
+ keys: BTreeMap<K, V>,
+ max_capacity: usize,
+}
+
+type ObjectAccumulator = Accumulator<String, ObjectInfo>;
+type UploadAccumulator = Accumulator<Uuid, UploadInfo>;
+
+impl<K: std::cmp::Ord, V> Accumulator<K, V> {
+ fn new(page_size: usize) -> Accumulator<K, V> {
+ Accumulator {
+ common_prefixes: BTreeSet::<String>::new(),
+ keys: BTreeMap::<K, V>::new(),
+ max_capacity: page_size,
+ }
+ }
+
+ /// Observe the Object iterator and try to extract a single common prefix
+ ///
+ /// This function can consume an arbitrary number of items as long as they share the same
+ /// common prefix.
+ fn extract_common_prefix<'a>(
+ &mut self,
+ objects: &mut Peekable<impl Iterator<Item = &'a Object>>,
+ query: &ListQueryCommon,
+ ) -> Option<ExtractionResult> {
+ // Get the next object from the iterator
+ let object = objects.peek().expect("This iterator can not be empty as it is checked earlier in the code. This is a logic bug, please report it.");
+
+ // Check if this is a common prefix (requires a passed delimiter and its value in the key)
+ let pfx = match common_prefix(object, query) {
+ Some(p) => p,
+ None => return None,
+ };
+
+ // Try to register this prefix
+ // If not possible, we can return early
+ if !self.try_insert_common_prefix(pfx.to_string()) {
+ return Some(ExtractionResult::Stopped);
+ }
+
+ // We consume the whole common prefix from the iterator
+ let mut last_pfx_key = &object.key;
+ loop {
+ last_pfx_key = match objects.peek() {
+ Some(o) if o.key.starts_with(pfx) => &o.key,
+ Some(_) => {
+ return Some(ExtractionResult::Extracted {
+ key: last_pfx_key.to_owned(),
+ })
+ }
+ None => {
+ return Some(ExtractionResult::SkipTo {
+ key: key_after_prefix(pfx),
+ fallback_key: Some(last_pfx_key.to_owned()),
+ })
}
};
- result_keys.insert(object.key.clone(), info);
- last_processed_item = Some(object.key.clone());
- next_chunk_exclude_start = true;
+
+ objects.next();
}
+ }
- // If our database returned less objects than what we were asking for,
- // it means that no more objects are in the bucket. So we stop here.
- if objects.len() < query.max_keys + 1 {
- truncated = false;
- break 'query_loop;
+ fn is_full(&mut self) -> bool {
+ self.keys.len() + self.common_prefixes.len() >= self.max_capacity
+ }
+
+ fn try_insert_common_prefix(&mut self, key: String) -> bool {
+ // If we already have an entry, we can continue
+ if self.common_prefixes.contains(&key) {
+ return true;
}
- // Sanity check: we should have added at least an object
- // or a prefix to our returned result.
- if next_chunk_start == current_chunk_start || last_processed_item.is_none() {
- return Err(Error::InternalError(GarageError::Message(format!(
- "S3 ListObject: made no progress, still starting at {} (this is a bug, please report it)", next_chunk_start))));
+ // Otherwise, we need to check if we can add it
+ match self.is_full() {
+ true => false,
+ false => {
+ self.common_prefixes.insert(key);
+ true
+ }
}
+ }
+
+ fn try_insert_entry(&mut self, key: K, value: V) -> bool {
+ // It is impossible to add twice a key, this is an error
+ assert!(!self.keys.contains_key(&key));
- // Loop and fetch more objects
+ match self.is_full() {
+ true => false,
+ false => {
+ self.keys.insert(key, value);
+ true
+ }
+ }
}
+}
- let mut result = s3_xml::ListBucketResult {
- xmlns: (),
- name: s3_xml::Value(query.bucket_name.to_string()),
- prefix: uriencode_maybe(&query.prefix, query.urlencode_resp),
- marker: None,
- next_marker: None,
- start_after: None,
- continuation_token: None,
- next_continuation_token: None,
- max_keys: s3_xml::IntValue(query.max_keys as i64),
- delimiter: query
- .delimiter
- .as_ref()
- .map(|x| uriencode_maybe(x, query.urlencode_resp)),
- encoding_type: match query.urlencode_resp {
- true => Some(s3_xml::Value("url".to_string())),
- false => None,
- },
- key_count: Some(s3_xml::IntValue(
- result_keys.len() as i64 + result_common_prefixes.len() as i64,
- )),
- is_truncated: s3_xml::Value(format!("{}", truncated)),
- contents: vec![],
- common_prefixes: vec![],
- };
+impl ExtractAccumulator for ObjectAccumulator {
+ fn extract<'a>(
+ &mut self,
+ query: &ListQueryCommon,
+ _cursor: &RangeBegin,
+ objects: &mut Peekable<impl Iterator<Item = &'a Object>>,
+ ) -> ExtractionResult {
+ if let Some(e) = self.extract_common_prefix(objects, query) {
+ return e;
+ }
- if query.is_v2 {
- if let Some(ct) = &query.continuation_token {
- result.continuation_token = Some(s3_xml::Value(ct.to_string()));
+ let object = objects.next().expect("This iterator can not be empty as it is checked earlier in the code. This is a logic bug, please report it.");
+
+ let version = match object.versions().iter().find(|x| x.is_data()) {
+ Some(v) => v,
+ None => unreachable!(
+ "Expect to have objects having data due to earlier filtering. This is a logic bug."
+ ),
+ };
+
+ let meta = match &version.state {
+ ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta,
+ ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta,
+ _ => unreachable!(),
+ };
+ let info = ObjectInfo {
+ last_modified: version.timestamp,
+ size: meta.size,
+ etag: meta.etag.to_string(),
+ };
+
+ match self.try_insert_entry(object.key.clone(), info) {
+ true => ExtractionResult::Extracted {
+ key: object.key.clone(),
+ },
+ false => ExtractionResult::Stopped,
}
- if let Some(sa) = &query.start_after {
- result.start_after = Some(uriencode_maybe(sa, query.urlencode_resp));
+ }
+}
+
+impl ExtractAccumulator for UploadAccumulator {
+ /// Observe the iterator, process a single key, and try to extract one or more upload entries
+ ///
+ /// This function processes a single object from the iterator that can contain an arbitrary
+ /// number of versions, and thus "uploads".
+ fn extract<'a>(
+ &mut self,
+ query: &ListQueryCommon,
+ cursor: &RangeBegin,
+ objects: &mut Peekable<impl Iterator<Item = &'a Object>>,
+ ) -> ExtractionResult {
+ if let Some(e) = self.extract_common_prefix(objects, query) {
+ return e;
}
- if truncated {
- let b64 = base64::encode(next_chunk_start.as_bytes());
- let nct = if next_chunk_exclude_start {
- format!("]{}", b64)
- } else {
- format!("[{}", b64)
+
+ // Get the next object from the iterator
+ let object = objects.next().expect("This iterator can not be empty as it is checked earlier in the code. This is a logic bug, please report it.");
+
+ let mut uploads_for_key = object
+ .versions()
+ .iter()
+ .filter(|x| x.is_uploading())
+ .collect::<Vec<&ObjectVersion>>();
+
+ // S3 logic requires lexicographically sorted upload ids.
+ uploads_for_key.sort_unstable_by_key(|e| e.uuid);
+
+ // Skip results if an upload marker is provided
+ if let RangeBegin::AfterUpload { upload, .. } = cursor {
+ // Because our data are sorted, we can use a binary search to find the UUID
+ // or to find where it should have been added. Once this position is found,
+ // we use it to discard the first part of the array.
+ let idx = match uploads_for_key.binary_search_by(|e| e.uuid.cmp(upload)) {
+ // we start after the found uuid so we need to discard the pointed value.
+ // In the worst case, the UUID is the last element, which lead us to an empty array
+ // but we are never out of bound.
+ Ok(i) => i + 1,
+ // if the UUID is not found, the upload may have been discarded between the 2 request,
+ // this function returns where it could have been inserted,
+ // the pointed value is thus greater than our marker and we need to keep it.
+ Err(i) => i,
};
- result.next_continuation_token = Some(s3_xml::Value(nct));
+ uploads_for_key = uploads_for_key[idx..].to_vec();
}
- } else {
- // TODO: are these supposed to be urlencoded when encoding-type is URL??
- if let Some(mkr) = &query.marker {
- result.marker = Some(uriencode_maybe(mkr, query.urlencode_resp));
+
+ let mut iter = uploads_for_key.iter();
+
+ // The first entry is a specific case
+ // as it changes our result enum type
+ let first_upload = match iter.next() {
+ Some(u) => u,
+ None => {
+ return ExtractionResult::Extracted {
+ key: object.key.clone(),
+ }
+ }
+ };
+ let first_up_info = UploadInfo {
+ key: object.key.to_string(),
+ timestamp: first_upload.timestamp,
+ };
+ if !self.try_insert_entry(first_upload.uuid, first_up_info) {
+ return ExtractionResult::Stopped;
}
- if truncated {
- if let Some(lpi) = last_processed_item {
- result.next_marker = Some(uriencode_maybe(&lpi, query.urlencode_resp));
- } else {
- return Err(Error::InternalError(GarageError::Message(
- "S3 ListObject: last_processed_item is None but the response was truncated, indicating that many items were processed (this is a bug, please report it)".to_string())));
+
+ // We can then collect the remaining uploads in a loop
+ let mut prev_uuid = first_upload.uuid;
+ for upload in iter {
+ let up_info = UploadInfo {
+ key: object.key.to_string(),
+ timestamp: upload.timestamp,
+ };
+
+ // Insert data in our accumulator
+ // If it is full, return information to paginate.
+ if !self.try_insert_entry(upload.uuid, up_info) {
+ return ExtractionResult::StoppedAtUpload {
+ key: object.key.clone(),
+ upload: prev_uuid,
+ };
}
+ // Update our last added UUID
+ prev_uuid = upload.uuid;
}
- }
- for (key, info) in result_keys.iter() {
- result.contents.push(s3_xml::ListBucketItem {
- key: uriencode_maybe(key, query.urlencode_resp),
- last_modified: s3_xml::Value(msec_to_rfc3339(info.last_modified)),
- size: s3_xml::IntValue(info.size as i64),
- etag: s3_xml::Value(info.etag.to_string()),
- storage_class: s3_xml::Value("STANDARD".to_string()),
- });
+ // We successfully collected all the uploads
+ ExtractionResult::Extracted {
+ key: object.key.clone(),
+ }
}
+}
- for pfx in result_common_prefixes.iter() {
- result.common_prefixes.push(s3_xml::CommonPrefix {
- prefix: uriencode_maybe(pfx, query.urlencode_resp),
- });
+/*
+ * Utility functions
+ */
+
+/// Returns the common prefix of the object given the query prefix and delimiter
+fn common_prefix<'a>(object: &'a Object, query: &ListQueryCommon) -> Option<&'a str> {
+ match &query.delimiter {
+ Some(delimiter) => object.key[query.prefix.len()..]
+ .find(delimiter)
+ .map(|i| &object.key[..query.prefix.len() + i + delimiter.len()]),
+ None => None,
}
-
- let xml = s3_xml::to_xml_with_header(&result)?;
-
- Ok(Response::builder()
- .header("Content-Type", "application/xml")
- .body(Body::from(xml.into_bytes()))?)
}
+/// URIencode a value if needed
fn uriencode_maybe(s: &str, yes: bool) -> s3_xml::Value {
if yes {
s3_xml::Value(uri_encode(s, true))
@@ -319,3 +728,233 @@ fn uriencode_maybe(s: &str, yes: bool) -> s3_xml::Value {
s3_xml::Value(s.to_string())
}
}
+
+/// Compute the key after the prefix
+fn key_after_prefix(pfx: &str) -> String {
+ let mut first_key_after_prefix = pfx.to_string();
+ let tail = first_key_after_prefix.pop().unwrap();
+ first_key_after_prefix.push(((tail as u8) + 1) as char);
+ first_key_after_prefix
+}
+
+/*
+ * Unit tests of this module
+ */
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ const TS: u64 = 1641394898314;
+
+ fn bucket() -> Uuid {
+ Uuid::from([0x42; 32])
+ }
+
+ fn query() -> ListMultipartUploadsQuery {
+ ListMultipartUploadsQuery {
+ common: ListQueryCommon {
+ prefix: "".to_string(),
+ delimiter: Some("/".to_string()),
+ page_size: 1000,
+ urlencode_resp: false,
+ bucket_name: "a".to_string(),
+ bucket_id: Uuid::from([0x00; 32]),
+ },
+ key_marker: None,
+ upload_id_marker: None,
+ }
+ }
+
+ fn objs() -> Vec<Object> {
+ vec![
+ Object::new(
+ bucket(),
+ "a/b/c".to_string(),
+ vec![objup_version([0x01; 32])],
+ ),
+ Object::new(bucket(), "d".to_string(), vec![objup_version([0x01; 32])]),
+ ]
+ }
+
+ fn objup_version(uuid: [u8; 32]) -> ObjectVersion {
+ ObjectVersion {
+ uuid: Uuid::from(uuid),
+ timestamp: TS,
+ state: ObjectVersionState::Uploading(ObjectVersionHeaders {
+ content_type: "text/plain".to_string(),
+ other: BTreeMap::<String, String>::new(),
+ }),
+ }
+ }
+
+ #[test]
+ fn test_common_prefixes() {
+ let mut query = query();
+ let objs = objs();
+
+ query.common.prefix = "a/".to_string();
+ assert_eq!(
+ common_prefix(&objs.get(0).unwrap(), &query.common),
+ Some("a/b/")
+ );
+
+ query.common.prefix = "a/b/".to_string();
+ assert_eq!(common_prefix(&objs.get(0).unwrap(), &query.common), None);
+ }
+
+ #[test]
+ fn test_extract_common_prefix() {
+ let mut query = query();
+ query.common.prefix = "a/".to_string();
+ let objs = objs();
+ let mut acc = UploadAccumulator::new(query.common.page_size);
+
+ let mut iter = objs.iter().peekable();
+ match acc.extract_common_prefix(&mut iter, &query.common) {
+ Some(ExtractionResult::Extracted { key }) => assert_eq!(key, "a/b/c".to_string()),
+ _ => panic!("wrong result"),
+ }
+ assert_eq!(acc.common_prefixes.len(), 1);
+ assert_eq!(acc.common_prefixes.iter().next().unwrap(), "a/b/");
+ }
+
+ #[test]
+ fn test_extract_upload() {
+ let objs = vec![
+ Object::new(
+ bucket(),
+ "b".to_string(),
+ vec![
+ objup_version([0x01; 32]),
+ objup_version([0x80; 32]),
+ objup_version([0x8f; 32]),
+ objup_version([0xdd; 32]),
+ ],
+ ),
+ Object::new(bucket(), "c".to_string(), vec![]),
+ ];
+
+ let mut acc = UploadAccumulator::new(2);
+ let mut start = RangeBegin::AfterUpload {
+ key: "b".to_string(),
+ upload: Uuid::from([0x01; 32]),
+ };
+
+ let mut iter = objs.iter().peekable();
+
+ // Check the case where we skip some uploads
+ match acc.extract(&(query().common), &start, &mut iter) {
+ ExtractionResult::StoppedAtUpload { key, upload } => {
+ assert_eq!(key, "b");
+ assert_eq!(upload, Uuid::from([0x8f; 32]));
+ }
+ _ => panic!("wrong result"),
+ };
+
+ assert_eq!(acc.keys.len(), 2);
+ assert_eq!(
+ acc.keys.get(&Uuid::from([0x80; 32])).unwrap(),
+ &UploadInfo {
+ timestamp: TS,
+ key: "b".to_string()
+ }
+ );
+ assert_eq!(
+ acc.keys.get(&Uuid::from([0x8f; 32])).unwrap(),
+ &UploadInfo {
+ timestamp: TS,
+ key: "b".to_string()
+ }
+ );
+
+ acc = UploadAccumulator::new(2);
+ start = RangeBegin::AfterUpload {
+ key: "b".to_string(),
+ upload: Uuid::from([0xff; 32]),
+ };
+ iter = objs.iter().peekable();
+
+ // Check the case where we skip all the uploads
+ match acc.extract(&(query().common), &start, &mut iter) {
+ ExtractionResult::Extracted { key } if key.as_str() == "b" => (),
+ _ => panic!("wrong result"),
+ };
+ }
+
+ #[tokio::test]
+ async fn test_fetch_uploads_no_result() -> Result<(), Error> {
+ let query = query();
+ let mut acc = query.build_accumulator();
+ let page = fetch_list_entries(
+ &query.common,
+ query.begin()?,
+ &mut acc,
+ |_, _, _| async move { Ok(vec![]) },
+ )
+ .await?;
+ assert_eq!(page, None);
+ assert_eq!(acc.common_prefixes.len(), 0);
+ assert_eq!(acc.keys.len(), 0);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_fetch_uploads_basic() -> Result<(), Error> {
+ let query = query();
+ let mut acc = query.build_accumulator();
+ let mut fake_io = |_, _, _| async move { Ok(objs()) };
+ let page =
+ fetch_list_entries(&query.common, query.begin()?, &mut acc, &mut fake_io).await?;
+ assert_eq!(page, None);
+ assert_eq!(acc.common_prefixes.len(), 1);
+ assert_eq!(acc.keys.len(), 1);
+ assert!(acc.common_prefixes.contains("a/"));
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_fetch_uploads_advanced() -> Result<(), Error> {
+ let mut query = query();
+ query.common.page_size = 2;
+
+ let mut fake_io = |_, k: Option<String>, _| async move {
+ Ok(match k.as_deref() {
+ Some("") => vec![
+ Object::new(bucket(), "b/a".to_string(), vec![objup_version([0x01; 32])]),
+ Object::new(bucket(), "b/b".to_string(), vec![objup_version([0x01; 32])]),
+ Object::new(bucket(), "b/c".to_string(), vec![objup_version([0x01; 32])]),
+ ],
+ Some("b0") => vec![
+ Object::new(bucket(), "c/a".to_string(), vec![objup_version([0x01; 32])]),
+ Object::new(bucket(), "c/b".to_string(), vec![objup_version([0x01; 32])]),
+ Object::new(bucket(), "c/c".to_string(), vec![objup_version([0x02; 32])]),
+ ],
+ Some("c0") => vec![Object::new(
+ bucket(),
+ "d".to_string(),
+ vec![objup_version([0x01; 32])],
+ )],
+ _ => panic!("wrong value {:?}", k),
+ })
+ };
+
+ let mut acc = query.build_accumulator();
+ let page =
+ fetch_list_entries(&query.common, query.begin()?, &mut acc, &mut fake_io).await?;
+ assert_eq!(
+ page,
+ Some(RangeBegin::IncludingKey {
+ key: "c0".to_string(),
+ fallback_key: Some("c/c".to_string())
+ })
+ );
+ assert_eq!(acc.common_prefixes.len(), 2);
+ assert_eq!(acc.keys.len(), 0);
+ assert!(acc.common_prefixes.contains("b/"));
+ assert!(acc.common_prefixes.contains("c/"));
+
+ Ok(())
+ }
+}
diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs
index bb92c252..d7ee5893 100644
--- a/src/api/s3_put.rs
+++ b/src/api/s3_put.rs
@@ -610,7 +610,7 @@ pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, E
})
}
-fn decode_upload_id(id: &str) -> Result<Uuid, Error> {
+pub fn decode_upload_id(id: &str) -> Result<Uuid, Error> {
let id_bin = hex::decode(id).map_err(|_| Error::NoSuchUpload)?;
if id_bin.len() != 32 {
return Err(Error::NoSuchUpload);
diff --git a/src/api/s3_router.rs b/src/api/s3_router.rs
index 234f77f0..a8ac0086 100644
--- a/src/api/s3_router.rs
+++ b/src/api/s3_router.rs
@@ -350,7 +350,7 @@ pub enum Endpoint {
delimiter: Option<char>,
encoding_type: Option<String>,
key_marker: Option<String>,
- max_uploads: Option<u64>,
+ max_uploads: Option<usize>,
prefix: Option<String>,
upload_id_marker: Option<String>,
},
diff --git a/src/api/s3_xml.rs b/src/api/s3_xml.rs
index 9b5a0202..98c63d57 100644
--- a/src/api/s3_xml.rs
+++ b/src/api/s3_xml.rs
@@ -142,6 +142,60 @@ pub struct CompleteMultipartUploadResult {
}
#[derive(Debug, Serialize, PartialEq)]
+pub struct Initiator {
+ #[serde(rename = "DisplayName")]
+ pub display_name: Value,
+ #[serde(rename = "ID")]
+ pub id: Value,
+}
+
+#[derive(Debug, Serialize, PartialEq)]
+pub struct ListMultipartItem {
+ #[serde(rename = "Initiated")]
+ pub initiated: Value,
+ #[serde(rename = "Initiator")]
+ pub initiator: Initiator,
+ #[serde(rename = "Key")]
+ pub key: Value,
+ #[serde(rename = "UploadId")]
+ pub upload_id: Value,
+ #[serde(rename = "Owner")]
+ pub owner: Owner,
+ #[serde(rename = "StorageClass")]
+ pub storage_class: Value,
+}
+
+#[derive(Debug, Serialize, PartialEq)]
+pub struct ListMultipartUploadsResult {
+ #[serde(serialize_with = "xmlns_tag")]
+ pub xmlns: (),
+ #[serde(rename = "Bucket")]
+ pub bucket: Value,
+ #[serde(rename = "KeyMarker")]
+ pub key_marker: Option<Value>,
+ #[serde(rename = "UploadIdMarker")]
+ pub upload_id_marker: Option<Value>,
+ #[serde(rename = "NextKeyMarker")]
+ pub next_key_marker: Option<Value>,
+ #[serde(rename = "NextUploadIdMarker")]
+ pub next_upload_id_marker: Option<Value>,
+ #[serde(rename = "Prefix")]
+ pub prefix: Value,
+ #[serde(rename = "Delimiter")]
+ pub delimiter: Option<Value>,
+ #[serde(rename = "MaxUploads")]
+ pub max_uploads: IntValue,
+ #[serde(rename = "IsTruncated")]
+ pub is_truncated: Value,
+ #[serde(rename = "Upload")]
+ pub upload: Vec<ListMultipartItem>,
+ #[serde(rename = "CommonPrefixes")]
+ pub common_prefixes: Vec<CommonPrefix>,
+ #[serde(rename = "EncodingType")]
+ pub encoding_type: Option<Value>,
+}
+
+#[derive(Debug, Serialize, PartialEq)]
pub struct ListBucketItem {
#[serde(rename = "Key")]
pub key: Value,
@@ -433,6 +487,58 @@ mod tests {
}
#[test]
+ fn list_multipart_uploads_result() -> Result<(), ApiError> {
+ let result = ListMultipartUploadsResult {
+ xmlns: (),
+ bucket: Value("example-bucket".to_string()),
+ key_marker: None,
+ next_key_marker: None,
+ upload_id_marker: None,
+ encoding_type: None,
+ next_upload_id_marker: None,
+ upload: vec![],
+ delimiter: Some(Value("/".to_string())),
+ prefix: Value("photos/2006/".to_string()),
+ max_uploads: IntValue(1000),
+ is_truncated: Value("false".to_string()),
+ common_prefixes: vec![
+ CommonPrefix {
+ prefix: Value("photos/2006/February/".to_string()),
+ },
+ CommonPrefix {
+ prefix: Value("photos/2006/January/".to_string()),
+ },
+ CommonPrefix {
+ prefix: Value("photos/2006/March/".to_string()),
+ },
+ ],
+ };
+
+ assert_eq!(
+ to_xml_with_header(&result)?,
+ "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
+<ListMultipartUploadsResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\
+ <Bucket>example-bucket</Bucket>\
+ <Prefix>photos/2006/</Prefix>\
+ <Delimiter>/</Delimiter>\
+ <MaxUploads>1000</MaxUploads>\
+ <IsTruncated>false</IsTruncated>\
+ <CommonPrefixes>\
+ <Prefix>photos/2006/February/</Prefix>\
+ </CommonPrefixes>\
+ <CommonPrefixes>\
+ <Prefix>photos/2006/January/</Prefix>\
+ </CommonPrefixes>\
+ <CommonPrefixes>\
+ <Prefix>photos/2006/March/</Prefix>\
+ </CommonPrefixes>\
+</ListMultipartUploadsResult>"
+ );
+
+ Ok(())
+ }
+
+ #[test]
fn list_objects_v1_1() -> Result<(), ApiError> {
let result = ListBucketResult {
xmlns: (),
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index f315c4dc..4a53792d 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -21,6 +21,7 @@ use garage_model::garage::Garage;
use garage_model::helper::error::{Error, OkOrBadRequest};
use garage_model::key_table::*;
use garage_model::migrate::Migrate;
+use garage_model::object_table::ObjectFilter;
use garage_model::permission::*;
use crate::cli::*;
@@ -209,7 +210,7 @@ impl AdminRpcHandler {
let objects = self
.garage
.object_table
- .get_range(&bucket_id, None, Some(DeletedFilter::NotDeleted), 10)
+ .get_range(&bucket_id, None, Some(ObjectFilter::IsData), 10)
.await?;
if !objects.is_empty() {
return Err(Error::BadRequest(format!(
diff --git a/src/model/object_table.rs b/src/model/object_table.rs
index 0c6c3a6d..da53878e 100644
--- a/src/model/object_table.rs
+++ b/src/model/object_table.rs
@@ -218,13 +218,19 @@ pub struct ObjectTable {
pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
}
+#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
+pub enum ObjectFilter {
+ IsData,
+ IsUploading,
+}
+
impl TableSchema for ObjectTable {
const TABLE_NAME: &'static str = "object";
type P = Uuid;
type S = String;
type E = Object;
- type Filter = DeletedFilter;
+ type Filter = ObjectFilter;
fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
let version_table = self.version_table.clone();
@@ -254,8 +260,10 @@ impl TableSchema for ObjectTable {
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
- let deleted = !entry.versions.iter().any(|v| v.is_data());
- filter.apply(deleted)
+ match filter {
+ ObjectFilter::IsData => entry.versions.iter().any(|v| v.is_data()),
+ ObjectFilter::IsUploading => entry.versions.iter().any(|v| v.is_uploading()),
+ }
}
fn try_migrate(bytes: &[u8]) -> Option<Self::E> {