aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3/list.rs
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2023-06-09 15:34:09 +0000
committerAlex <alex@adnab.me>2023-06-09 15:34:09 +0000
commit0a06fda0da35f4018c39bf6aec90e55bdf42d241 (patch)
treed2b758400f336b63e236c431a965d191954322a8 /src/api/s3/list.rs
parente7e164a280dfc1c4adf9d6da6f3b2a9674eca4bd (diff)
parent3d477906d4ff418de10973db7bd3e940f2e47b2d (diff)
downloadgarage-0a06fda0da35f4018c39bf6aec90e55bdf42d241.tar.gz
garage-0a06fda0da35f4018c39bf6aec90e55bdf42d241.zip
Merge pull request 'Fix #204 (full Multipart Uploads semantics)' (#553) from nlnet-task1 into next
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/553
Diffstat (limited to 'src/api/s3/list.rs')
-rw-r--r--src/api/s3/list.rs341
1 files changed, 140 insertions, 201 deletions
diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs
index 5cb0d65a..7408d4d3 100644
--- a/src/api/s3/list.rs
+++ b/src/api/s3/list.rs
@@ -1,4 +1,3 @@
-use std::cmp::Ordering;
use std::collections::{BTreeMap, BTreeSet};
use std::iter::{Iterator, Peekable};
use std::sync::Arc;
@@ -11,15 +10,15 @@ use garage_util::error::Error as GarageError;
use garage_util::time::*;
use garage_model::garage::Garage;
+use garage_model::s3::mpu_table::*;
use garage_model::s3::object_table::*;
-use garage_model::s3::version_table::Version;
-use garage_table::{EmptyKey, EnumerationOrder};
+use garage_table::EnumerationOrder;
use crate::encoding::*;
use crate::helpers::key_after_prefix;
use crate::s3::error::*;
-use crate::s3::put as s3_put;
+use crate::s3::multipart as s3_multipart;
use crate::s3::xml as s3_xml;
const DUMMY_NAME: &str = "Dummy Key";
@@ -176,7 +175,9 @@ pub async fn handle_list_multipart_upload(
t.get_range(
&bucket,
key,
- Some(ObjectFilter::IsUploading),
+ Some(ObjectFilter::IsUploading {
+ check_multipart: Some(true),
+ }),
count,
EnumerationOrder::Forward,
)
@@ -272,24 +273,26 @@ pub async fn handle_list_parts(
) -> Result<Response<Body>, Error> {
debug!("ListParts {:?}", query);
- let upload_id = s3_put::decode_upload_id(&query.upload_id)?;
+ let upload_id = s3_multipart::decode_upload_id(&query.upload_id)?;
- let (object, version) = futures::try_join!(
- garage.object_table.get(&query.bucket_id, &query.key),
- garage.version_table.get(&upload_id, &EmptyKey),
- )?;
+ let (_, _, mpu) =
+ s3_multipart::get_upload(&garage, &query.bucket_id, &query.key, &upload_id).await?;
- let (info, next) = fetch_part_info(query, object, version, upload_id)?;
+ let (info, next) = fetch_part_info(query, &mpu)?;
let result = s3_xml::ListPartsResult {
xmlns: (),
+
+ // Query parameters
bucket: s3_xml::Value(query.bucket_name.to_string()),
key: s3_xml::Value(query.key.to_string()),
upload_id: s3_xml::Value(query.upload_id.to_string()),
part_number_marker: query.part_number_marker.map(|e| s3_xml::IntValue(e as i64)),
- next_part_number_marker: next.map(|e| s3_xml::IntValue(e as i64)),
max_parts: s3_xml::IntValue(query.max_parts as i64),
- is_truncated: s3_xml::Value(next.map(|_| "true").unwrap_or("false").to_string()),
+
+ // Result values
+ next_part_number_marker: next.map(|e| s3_xml::IntValue(e as i64)),
+ is_truncated: s3_xml::Value(format!("{}", next.is_some())),
parts: info
.iter()
.map(|part| s3_xml::PartItem {
@@ -299,6 +302,8 @@ pub async fn handle_list_parts(
size: s3_xml::IntValue(part.size as i64),
})
.collect(),
+
+ // Dummy result values (unsupported features)
initiator: s3_xml::Initiator {
display_name: s3_xml::Value(DUMMY_NAME.to_string()),
id: s3_xml::Value(DUMMY_KEY.to_string()),
@@ -335,8 +340,8 @@ struct UploadInfo {
}
#[derive(Debug, PartialEq)]
-struct PartInfo {
- etag: String,
+struct PartInfo<'a> {
+ etag: &'a str,
timestamp: u64,
part_number: u64,
size: u64,
@@ -456,107 +461,51 @@ where
}
}
-fn fetch_part_info(
+fn fetch_part_info<'a>(
query: &ListPartsQuery,
- object: Option<Object>,
- version: Option<Version>,
- upload_id: Uuid,
-) -> Result<(Vec<PartInfo>, Option<u64>), Error> {
- // Check results
- let object = object.ok_or(Error::NoSuchKey)?;
-
- let obj_version = object
- .versions()
- .iter()
- .find(|v| v.uuid == upload_id && v.is_uploading())
- .ok_or(Error::NoSuchUpload)?;
-
- let version = version.ok_or(Error::NoSuchKey)?;
-
- // Cut the beginning of our 2 vectors if required
- let (etags, blocks) = match &query.part_number_marker {
- Some(marker) => {
- let next = marker + 1;
-
- let part_idx = into_ok_or_err(
- version
- .parts_etags
- .items()
- .binary_search_by(|(part_num, _)| part_num.cmp(&next)),
- );
- let parts = &version.parts_etags.items()[part_idx..];
-
- let block_idx = into_ok_or_err(
- version
- .blocks
- .items()
- .binary_search_by(|(vkey, _)| vkey.part_number.cmp(&next)),
- );
- let blocks = &version.blocks.items()[block_idx..];
-
- (parts, blocks)
- }
- None => (version.parts_etags.items(), version.blocks.items()),
- };
-
- // Use the block vector to compute a (part_number, size) vector
- let mut size = Vec::<(u64, u64)>::new();
- blocks.iter().for_each(|(key, val)| {
- let mut new_size = val.size;
- match size.pop() {
- Some((part_number, size)) if part_number == key.part_number => new_size += size,
- Some(v) => size.push(v),
- None => (),
- }
- size.push((key.part_number, new_size))
- });
-
- // Merge the etag vector and size vector to build a PartInfo vector
- let max_parts = query.max_parts as usize;
- let (mut etag_iter, mut size_iter) = (etags.iter().peekable(), size.iter().peekable());
-
- let mut info = Vec::<PartInfo>::with_capacity(max_parts);
-
- while info.len() < max_parts {
- match (etag_iter.peek(), size_iter.peek()) {
- (Some((ep, etag)), Some((sp, size))) => match ep.cmp(sp) {
- Ordering::Less => {
- debug!("ETag information ignored due to missing corresponding block information. Query: {:?}", query);
- etag_iter.next();
- }
- Ordering::Equal => {
- info.push(PartInfo {
- etag: etag.to_string(),
- timestamp: obj_version.timestamp,
- part_number: *ep,
- size: *size,
- });
- etag_iter.next();
- size_iter.next();
+ mpu: &'a MultipartUpload,
+) -> Result<(Vec<PartInfo<'a>>, Option<u64>), Error> {
+ assert!((1..=1000).contains(&query.max_parts)); // see s3/api_server.rs
+
+ // Parse multipart upload part list, removing parts not yet finished
+ // and failed part uploads that were overwritten
+ let mut parts: Vec<PartInfo<'a>> = Vec::with_capacity(mpu.parts.items().len());
+ for (pk, p) in mpu.parts.items().iter() {
+ if let (Some(etag), Some(size)) = (&p.etag, p.size) {
+ let part_info = PartInfo {
+ part_number: pk.part_number,
+ timestamp: pk.timestamp,
+ etag,
+ size,
+ };
+ match parts.last_mut() {
+ Some(lastpart) if lastpart.part_number == pk.part_number => {
+ *lastpart = part_info;
}
- Ordering::Greater => {
- debug!("Block information ignored due to missing corresponding ETag information. Query: {:?}", query);
- size_iter.next();
+ _ => {
+ parts.push(part_info);
}
- },
- (None, None) => return Ok((info, None)),
- _ => {
- debug!(
- "Additional block or ETag information ignored. Query: {:?}",
- query
- );
- return Ok((info, None));
}
}
}
- match info.last() {
- Some(part_info) => {
- let pagination = Some(part_info.part_number);
- Ok((info, pagination))
- }
- None => Ok((info, None)),
+ // Cut the beginning if we have a marker
+ if let Some(marker) = &query.part_number_marker {
+ let next = marker + 1;
+ let part_idx = parts
+ .binary_search_by(|part| part.part_number.cmp(&next))
+ .unwrap_or_else(|x| x);
+ parts = parts.split_off(part_idx);
+ }
+
+ // Cut the end if we have too many parts
+ if parts.len() > query.max_parts as usize {
+ parts.truncate(query.max_parts as usize);
+ let pagination = Some(parts.last().unwrap().part_number);
+ return Ok((parts, pagination));
}
+
+ Ok((parts, None))
}
/*
@@ -651,7 +600,7 @@ impl ListMultipartUploadsQuery {
}),
uuid => Ok(RangeBegin::AfterUpload {
key: key_marker.to_string(),
- upload: s3_put::decode_upload_id(uuid)?,
+ upload: s3_multipart::decode_upload_id(uuid)?,
}),
},
@@ -843,7 +792,7 @@ impl ExtractAccumulator for UploadAccumulator {
let mut uploads_for_key = object
.versions()
.iter()
- .filter(|x| x.is_uploading())
+ .filter(|x| x.is_uploading(Some(true)))
.collect::<Vec<&ObjectVersion>>();
// S3 logic requires lexicographically sorted upload ids.
@@ -918,14 +867,6 @@ impl ExtractAccumulator for UploadAccumulator {
* Utility functions
*/
-/// This is a stub for Result::into_ok_or_err that is not yet in Rust stable
-fn into_ok_or_err<T>(r: Result<T, T>) -> T {
- match r {
- Ok(r) => r,
- Err(r) => r,
- }
-}
-
/// Returns the common prefix of the object given the query prefix and delimiter
fn common_prefix<'a>(object: &'a Object, query: &ListQueryCommon) -> Option<&'a str> {
match &query.delimiter {
@@ -951,7 +892,6 @@ fn uriencode_maybe(s: &str, yes: bool) -> s3_xml::Value {
#[cfg(test)]
mod tests {
use super::*;
- use garage_model::s3::version_table::*;
use garage_util::*;
use std::iter::FromIterator;
@@ -991,10 +931,13 @@ mod tests {
ObjectVersion {
uuid: Uuid::from(uuid),
timestamp: TS,
- state: ObjectVersionState::Uploading(ObjectVersionHeaders {
- content_type: "text/plain".to_string(),
- other: BTreeMap::<String, String>::new(),
- }),
+ state: ObjectVersionState::Uploading {
+ multipart: true,
+ headers: ObjectVersionHeaders {
+ content_type: "text/plain".to_string(),
+ other: BTreeMap::<String, String>::new(),
+ },
+ },
}
}
@@ -1169,83 +1112,76 @@ mod tests {
Ok(())
}
- fn version() -> Version {
+ fn mpu() -> MultipartUpload {
let uuid = Uuid::from([0x08; 32]);
- let blocks = vec![
+ let parts = vec![
(
- VersionBlockKey {
+ MpuPartKey {
part_number: 1,
- offset: 1,
+ timestamp: TS,
},
- VersionBlock {
- hash: uuid,
- size: 3,
+ MpuPart {
+ version: uuid,
+ size: Some(3),
+ etag: Some("etag1".into()),
},
),
(
- VersionBlockKey {
- part_number: 1,
- offset: 2,
+ MpuPartKey {
+ part_number: 2,
+ timestamp: TS,
},
- VersionBlock {
- hash: uuid,
- size: 2,
+ MpuPart {
+ version: uuid,
+ size: None,
+ etag: None,
},
),
(
- VersionBlockKey {
- part_number: 2,
- offset: 1,
+ MpuPartKey {
+ part_number: 3,
+ timestamp: TS,
},
- VersionBlock {
- hash: uuid,
- size: 8,
+ MpuPart {
+ version: uuid,
+ size: Some(10),
+ etag: Some("etag2".into()),
},
),
(
- VersionBlockKey {
+ MpuPartKey {
part_number: 5,
- offset: 1,
+ timestamp: TS,
},
- VersionBlock {
- hash: uuid,
- size: 7,
+ MpuPart {
+ version: uuid,
+ size: Some(7),
+ etag: Some("etag3".into()),
},
),
(
- VersionBlockKey {
+ MpuPartKey {
part_number: 8,
- offset: 1,
+ timestamp: TS,
},
- VersionBlock {
- hash: uuid,
- size: 5,
+ MpuPart {
+ version: uuid,
+ size: Some(5),
+ etag: Some("etag4".into()),
},
),
];
- let etags = vec![
- (1, "etag1".to_string()),
- (3, "etag2".to_string()),
- (5, "etag3".to_string()),
- (8, "etag4".to_string()),
- (9, "etag5".to_string()),
- ];
- Version {
- bucket_id: uuid,
- key: "a".to_string(),
- uuid,
+ MultipartUpload {
+ upload_id: uuid,
deleted: false.into(),
- blocks: crdt::Map::<VersionBlockKey, VersionBlock>::from_iter(blocks),
- parts_etags: crdt::Map::<u64, String>::from_iter(etags),
+ parts: crdt::Map::<MpuPartKey, MpuPart>::from_iter(parts),
+ bucket_id: uuid,
+ key: "a".into(),
}
}
- fn obj() -> Object {
- Object::new(bucket(), "d".to_string(), vec![objup_version([0x08; 32])])
- }
-
#[test]
fn test_fetch_part_info() -> Result<(), Error> {
let uuid = Uuid::from([0x08; 32]);
@@ -1258,82 +1194,85 @@ mod tests {
max_parts: 2,
};
- assert!(
- fetch_part_info(&query, None, None, uuid).is_err(),
- "No object and version should fail"
- );
- assert!(
- fetch_part_info(&query, Some(obj()), None, uuid).is_err(),
- "No version should faild"
- );
- assert!(
- fetch_part_info(&query, None, Some(version()), uuid).is_err(),
- "No object should fail"
- );
+ let mpu = mpu();
// Start from the beginning but with limited size to trigger pagination
- let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?;
- assert_eq!(pagination.unwrap(), 5);
+ let (info, pagination) = fetch_part_info(&query, &mpu)?;
+ assert_eq!(pagination.unwrap(), 3);
assert_eq!(
info,
vec![
PartInfo {
- etag: "etag1".to_string(),
+ etag: "etag1",
timestamp: TS,
part_number: 1,
- size: 5
+ size: 3
},
PartInfo {
- etag: "etag3".to_string(),
+ etag: "etag2",
timestamp: TS,
- part_number: 5,
- size: 7
+ part_number: 3,
+ size: 10
},
]
);
// Use previous pagination to make a new request
query.part_number_marker = Some(pagination.unwrap());
- let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?;
+ let (info, pagination) = fetch_part_info(&query, &mpu)?;
assert!(pagination.is_none());
assert_eq!(
info,
- vec![PartInfo {
- etag: "etag4".to_string(),
- timestamp: TS,
- part_number: 8,
- size: 5
- },]
+ vec![
+ PartInfo {
+ etag: "etag3",
+ timestamp: TS,
+ part_number: 5,
+ size: 7
+ },
+ PartInfo {
+ etag: "etag4",
+ timestamp: TS,
+ part_number: 8,
+ size: 5
+ },
+ ]
);
// Trying to access a part that is way larger than registered ones
query.part_number_marker = Some(9999);
- let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?;
+ let (info, pagination) = fetch_part_info(&query, &mpu)?;
assert!(pagination.is_none());
assert_eq!(info, vec![]);
// Try without any limitation
query.max_parts = 1000;
query.part_number_marker = None;
- let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?;
+ let (info, pagination) = fetch_part_info(&query, &mpu)?;
assert!(pagination.is_none());
assert_eq!(
info,
vec![
PartInfo {
- etag: "etag1".to_string(),
+ etag: "etag1",
timestamp: TS,
part_number: 1,
- size: 5
+ size: 3
+ },
+ PartInfo {
+ etag: "etag2",
+ timestamp: TS,
+ part_number: 3,
+ size: 10
},
PartInfo {
- etag: "etag3".to_string(),
+ etag: "etag3",
timestamp: TS,
part_number: 5,
size: 7
},
PartInfo {
- etag: "etag4".to_string(),
+ etag: "etag4",
timestamp: TS,
part_number: 8,
size: 5