From 82e75c0e296c74c374f3d40feeb1aadcb58398f0 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 3 May 2023 12:02:59 +0200 Subject: Adapt S3 API code to use new multipart upload models - Create and PutPart - completemultipartupload - upload part copy - list_parts --- src/api/s3/list.rs | 177 ++++++++++++++++++++--------------------------------- 1 file changed, 65 insertions(+), 112 deletions(-) (limited to 'src/api/s3/list.rs') diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs index 5cb0d65a..5a9eb133 100644 --- a/src/api/s3/list.rs +++ b/src/api/s3/list.rs @@ -1,4 +1,3 @@ -use std::cmp::Ordering; use std::collections::{BTreeMap, BTreeSet}; use std::iter::{Iterator, Peekable}; use std::sync::Arc; @@ -11,15 +10,15 @@ use garage_util::error::Error as GarageError; use garage_util::time::*; use garage_model::garage::Garage; +use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; -use garage_model::s3::version_table::Version; -use garage_table::{EmptyKey, EnumerationOrder}; +use garage_table::EnumerationOrder; use crate::encoding::*; use crate::helpers::key_after_prefix; use crate::s3::error::*; -use crate::s3::put as s3_put; +use crate::s3::multipart as s3_multipart; use crate::s3::xml as s3_xml; const DUMMY_NAME: &str = "Dummy Key"; @@ -176,7 +175,9 @@ pub async fn handle_list_multipart_upload( t.get_range( &bucket, key, - Some(ObjectFilter::IsUploading), + Some(ObjectFilter::IsUploading { + check_multipart: Some(true), + }), count, EnumerationOrder::Forward, ) @@ -272,23 +273,23 @@ pub async fn handle_list_parts( ) -> Result, Error> { debug!("ListParts {:?}", query); - let upload_id = s3_put::decode_upload_id(&query.upload_id)?; + let upload_id = s3_multipart::decode_upload_id(&query.upload_id)?; - let (object, version) = futures::try_join!( - garage.object_table.get(&query.bucket_id, &query.key), - garage.version_table.get(&upload_id, &EmptyKey), - )?; + let (_, _, mpu) = + s3_multipart::get_upload(&garage, &query.bucket_id, &query.key, &upload_id).await?; - let (info, next) = fetch_part_info(query, object, version, upload_id)?; + let (info, next) = fetch_part_info(query, &mpu)?; let result = s3_xml::ListPartsResult { xmlns: (), + // Query parameters bucket: s3_xml::Value(query.bucket_name.to_string()), key: s3_xml::Value(query.key.to_string()), upload_id: s3_xml::Value(query.upload_id.to_string()), part_number_marker: query.part_number_marker.map(|e| s3_xml::IntValue(e as i64)), - next_part_number_marker: next.map(|e| s3_xml::IntValue(e as i64)), max_parts: s3_xml::IntValue(query.max_parts as i64), + // Result values + next_part_number_marker: next.map(|e| s3_xml::IntValue(e as i64)), is_truncated: s3_xml::Value(next.map(|_| "true").unwrap_or("false").to_string()), parts: info .iter() @@ -299,6 +300,7 @@ pub async fn handle_list_parts( size: s3_xml::IntValue(part.size as i64), }) .collect(), + // Dummy result values (unsupported features) initiator: s3_xml::Initiator { display_name: s3_xml::Value(DUMMY_NAME.to_string()), id: s3_xml::Value(DUMMY_KEY.to_string()), @@ -335,8 +337,8 @@ struct UploadInfo { } #[derive(Debug, PartialEq)] -struct PartInfo { - etag: String, +struct PartInfo<'a> { + etag: &'a str, timestamp: u64, part_number: u64, size: u64, @@ -456,106 +458,52 @@ where } } -fn fetch_part_info( +fn fetch_part_info<'a>( query: &ListPartsQuery, - object: Option, - version: Option, - upload_id: Uuid, -) -> Result<(Vec, Option), Error> { - // Check results - let object = object.ok_or(Error::NoSuchKey)?; - - let obj_version = object - .versions() - .iter() - .find(|v| v.uuid == upload_id && v.is_uploading()) - .ok_or(Error::NoSuchUpload)?; - - let version = version.ok_or(Error::NoSuchKey)?; - - // Cut the beginning of our 2 vectors if required - let (etags, blocks) = match &query.part_number_marker { - Some(marker) => { - let next = marker + 1; - - let part_idx = into_ok_or_err( - version - .parts_etags - .items() - .binary_search_by(|(part_num, _)| part_num.cmp(&next)), - ); - let parts = &version.parts_etags.items()[part_idx..]; - - let block_idx = into_ok_or_err( - version - .blocks - .items() - .binary_search_by(|(vkey, _)| vkey.part_number.cmp(&next)), - ); - let blocks = &version.blocks.items()[block_idx..]; - - (parts, blocks) - } - None => (version.parts_etags.items(), version.blocks.items()), - }; - - // Use the block vector to compute a (part_number, size) vector - let mut size = Vec::<(u64, u64)>::new(); - blocks.iter().for_each(|(key, val)| { - let mut new_size = val.size; - match size.pop() { - Some((part_number, size)) if part_number == key.part_number => new_size += size, - Some(v) => size.push(v), - None => (), - } - size.push((key.part_number, new_size)) - }); - - // Merge the etag vector and size vector to build a PartInfo vector - let max_parts = query.max_parts as usize; - let (mut etag_iter, mut size_iter) = (etags.iter().peekable(), size.iter().peekable()); - - let mut info = Vec::::with_capacity(max_parts); - - while info.len() < max_parts { - match (etag_iter.peek(), size_iter.peek()) { - (Some((ep, etag)), Some((sp, size))) => match ep.cmp(sp) { - Ordering::Less => { - debug!("ETag information ignored due to missing corresponding block information. Query: {:?}", query); - etag_iter.next(); - } - Ordering::Equal => { - info.push(PartInfo { - etag: etag.to_string(), - timestamp: obj_version.timestamp, - part_number: *ep, - size: *size, - }); - etag_iter.next(); - size_iter.next(); + mpu: &'a MultipartUpload, +) -> Result<(Vec>, Option), Error> { + // Parse multipart upload part list, removing parts not yet finished + // and failed part uploads that were overwritten + let mut parts: Vec> = Vec::with_capacity(mpu.parts.items().len()); + for (pk, p) in mpu.parts.items().iter() { + if let (Some(etag), Some(size)) = (&p.etag, p.size) { + let part_info = PartInfo { + part_number: pk.part_number, + timestamp: pk.timestamp, + etag, + size, + }; + match parts.last_mut() { + Some(lastpart) if lastpart.part_number == pk.part_number => { + *lastpart = part_info; } - Ordering::Greater => { - debug!("Block information ignored due to missing corresponding ETag information. Query: {:?}", query); - size_iter.next(); + _ => { + parts.push(part_info); } - }, - (None, None) => return Ok((info, None)), - _ => { - debug!( - "Additional block or ETag information ignored. Query: {:?}", - query - ); - return Ok((info, None)); } } } - match info.last() { + // Cut the beginning and end + match &query.part_number_marker { + Some(marker) => { + let next = marker + 1; + let part_idx = + into_ok_or_err(parts.binary_search_by(|part| part.part_number.cmp(&next))); + parts.truncate(part_idx + query.max_parts as usize); + parts = parts.split_off(part_idx); + } + None => { + parts.truncate(query.max_parts as usize); + } + }; + + match parts.last() { Some(part_info) => { let pagination = Some(part_info.part_number); - Ok((info, pagination)) + Ok((parts, pagination)) } - None => Ok((info, None)), + None => Ok((parts, None)), } } @@ -651,7 +599,7 @@ impl ListMultipartUploadsQuery { }), uuid => Ok(RangeBegin::AfterUpload { key: key_marker.to_string(), - upload: s3_put::decode_upload_id(uuid)?, + upload: s3_multipart::decode_upload_id(uuid)?, }), }, @@ -843,7 +791,7 @@ impl ExtractAccumulator for UploadAccumulator { let mut uploads_for_key = object .versions() .iter() - .filter(|x| x.is_uploading()) + .filter(|x| x.is_uploading(Some(true))) .collect::>(); // S3 logic requires lexicographically sorted upload ids. @@ -991,10 +939,13 @@ mod tests { ObjectVersion { uuid: Uuid::from(uuid), timestamp: TS, - state: ObjectVersionState::Uploading(ObjectVersionHeaders { - content_type: "text/plain".to_string(), - other: BTreeMap::::new(), - }), + state: ObjectVersionState::Uploading { + multipart: true, + headers: ObjectVersionHeaders { + content_type: "text/plain".to_string(), + other: BTreeMap::::new(), + }, + }, } } @@ -1233,11 +1184,13 @@ mod tests { ]; Version { - bucket_id: uuid, - key: "a".to_string(), uuid, deleted: false.into(), blocks: crdt::Map::::from_iter(blocks), + backlink: VersionBacklink::Object { + bucket_id: uuid, + key: "a".to_string(), + }, parts_etags: crdt::Map::::from_iter(etags), } } -- cgit v1.2.3 From 7ad7dae5d46c76432edd68c152531c65443cad7a Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 3 May 2023 19:49:36 +0200 Subject: fix s3 list test --- src/api/s3/list.rs | 195 ++++++++++++++++++++++++----------------------------- 1 file changed, 89 insertions(+), 106 deletions(-) (limited to 'src/api/s3/list.rs') diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs index 5a9eb133..e6f0daac 100644 --- a/src/api/s3/list.rs +++ b/src/api/s3/list.rs @@ -484,27 +484,25 @@ fn fetch_part_info<'a>( } } - // Cut the beginning and end - match &query.part_number_marker { - Some(marker) => { - let next = marker + 1; - let part_idx = - into_ok_or_err(parts.binary_search_by(|part| part.part_number.cmp(&next))); - parts.truncate(part_idx + query.max_parts as usize); - parts = parts.split_off(part_idx); - } - None => { - parts.truncate(query.max_parts as usize); - } - }; + // Cut the beginning if we have a marker + if let Some(marker) = &query.part_number_marker { + let next = marker + 1; + let part_idx = parts + .binary_search_by(|part| part.part_number.cmp(&next)) + .unwrap_or_else(|x| x); + parts = parts.split_off(part_idx); + } - match parts.last() { - Some(part_info) => { + // Cut the end if we have too many parts + if parts.len() > query.max_parts as usize { + parts.truncate(query.max_parts as usize); + if let Some(part_info) = parts.last() { let pagination = Some(part_info.part_number); - Ok((parts, pagination)) + return Ok((parts, pagination)); } - None => Ok((parts, None)), } + + Ok((parts, None)) } /* @@ -866,14 +864,6 @@ impl ExtractAccumulator for UploadAccumulator { * Utility functions */ -/// This is a stub for Result::into_ok_or_err that is not yet in Rust stable -fn into_ok_or_err(r: Result) -> T { - match r { - Ok(r) => r, - Err(r) => r, - } -} - /// Returns the common prefix of the object given the query prefix and delimiter fn common_prefix<'a>(object: &'a Object, query: &ListQueryCommon) -> Option<&'a str> { match &query.delimiter { @@ -899,7 +889,6 @@ fn uriencode_maybe(s: &str, yes: bool) -> s3_xml::Value { #[cfg(test)] mod tests { use super::*; - use garage_model::s3::version_table::*; use garage_util::*; use std::iter::FromIterator; @@ -1120,85 +1109,76 @@ mod tests { Ok(()) } - fn version() -> Version { + fn mpu() -> MultipartUpload { let uuid = Uuid::from([0x08; 32]); - let blocks = vec![ + let parts = vec![ ( - VersionBlockKey { + MpuPartKey { part_number: 1, - offset: 1, + timestamp: TS, }, - VersionBlock { - hash: uuid, - size: 3, + MpuPart { + version: uuid, + size: Some(3), + etag: Some("etag1".into()), }, ), ( - VersionBlockKey { - part_number: 1, - offset: 2, + MpuPartKey { + part_number: 2, + timestamp: TS, }, - VersionBlock { - hash: uuid, - size: 2, + MpuPart { + version: uuid, + size: None, + etag: None, }, ), ( - VersionBlockKey { - part_number: 2, - offset: 1, + MpuPartKey { + part_number: 3, + timestamp: TS, }, - VersionBlock { - hash: uuid, - size: 8, + MpuPart { + version: uuid, + size: Some(10), + etag: Some("etag2".into()), }, ), ( - VersionBlockKey { + MpuPartKey { part_number: 5, - offset: 1, + timestamp: TS, }, - VersionBlock { - hash: uuid, - size: 7, + MpuPart { + version: uuid, + size: Some(7), + etag: Some("etag3".into()), }, ), ( - VersionBlockKey { + MpuPartKey { part_number: 8, - offset: 1, + timestamp: TS, }, - VersionBlock { - hash: uuid, - size: 5, + MpuPart { + version: uuid, + size: Some(5), + etag: Some("etag4".into()), }, ), ]; - let etags = vec![ - (1, "etag1".to_string()), - (3, "etag2".to_string()), - (5, "etag3".to_string()), - (8, "etag4".to_string()), - (9, "etag5".to_string()), - ]; - Version { - uuid, + MultipartUpload { + upload_id: uuid, deleted: false.into(), - blocks: crdt::Map::::from_iter(blocks), - backlink: VersionBacklink::Object { - bucket_id: uuid, - key: "a".to_string(), - }, - parts_etags: crdt::Map::::from_iter(etags), + parts: crdt::Map::::from_iter(parts), + bucket_id: uuid, + key: "a".into(), } } - fn obj() -> Object { - Object::new(bucket(), "d".to_string(), vec![objup_version([0x08; 32])]) - } - #[test] fn test_fetch_part_info() -> Result<(), Error> { let uuid = Uuid::from([0x08; 32]); @@ -1211,82 +1191,85 @@ mod tests { max_parts: 2, }; - assert!( - fetch_part_info(&query, None, None, uuid).is_err(), - "No object and version should fail" - ); - assert!( - fetch_part_info(&query, Some(obj()), None, uuid).is_err(), - "No version should faild" - ); - assert!( - fetch_part_info(&query, None, Some(version()), uuid).is_err(), - "No object should fail" - ); + let mpu = mpu(); // Start from the beginning but with limited size to trigger pagination - let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?; - assert_eq!(pagination.unwrap(), 5); + let (info, pagination) = fetch_part_info(&query, &mpu)?; + assert_eq!(pagination.unwrap(), 3); assert_eq!( info, vec![ PartInfo { - etag: "etag1".to_string(), + etag: "etag1", timestamp: TS, part_number: 1, - size: 5 + size: 3 }, PartInfo { - etag: "etag3".to_string(), + etag: "etag2", timestamp: TS, - part_number: 5, - size: 7 + part_number: 3, + size: 10 }, ] ); // Use previous pagination to make a new request query.part_number_marker = Some(pagination.unwrap()); - let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?; + let (info, pagination) = fetch_part_info(&query, &mpu)?; assert!(pagination.is_none()); assert_eq!( info, - vec![PartInfo { - etag: "etag4".to_string(), - timestamp: TS, - part_number: 8, - size: 5 - },] + vec![ + PartInfo { + etag: "etag3", + timestamp: TS, + part_number: 5, + size: 7 + }, + PartInfo { + etag: "etag4", + timestamp: TS, + part_number: 8, + size: 5 + }, + ] ); // Trying to access a part that is way larger than registered ones query.part_number_marker = Some(9999); - let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?; + let (info, pagination) = fetch_part_info(&query, &mpu)?; assert!(pagination.is_none()); assert_eq!(info, vec![]); // Try without any limitation query.max_parts = 1000; query.part_number_marker = None; - let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?; + let (info, pagination) = fetch_part_info(&query, &mpu)?; assert!(pagination.is_none()); assert_eq!( info, vec![ PartInfo { - etag: "etag1".to_string(), + etag: "etag1", timestamp: TS, part_number: 1, - size: 5 + size: 3 + }, + PartInfo { + etag: "etag2", + timestamp: TS, + part_number: 3, + size: 10 }, PartInfo { - etag: "etag3".to_string(), + etag: "etag3", timestamp: TS, part_number: 5, size: 7 }, PartInfo { - etag: "etag4".to_string(), + etag: "etag4", timestamp: TS, part_number: 8, size: 5 -- cgit v1.2.3 From 8644376ac2dd8015e9212c19f30df63811426e1c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 4 May 2023 10:09:52 +0200 Subject: fix test; simplify code --- src/api/s3/list.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) (limited to 'src/api/s3/list.rs') diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs index e6f0daac..7408d4d3 100644 --- a/src/api/s3/list.rs +++ b/src/api/s3/list.rs @@ -282,15 +282,17 @@ pub async fn handle_list_parts( let result = s3_xml::ListPartsResult { xmlns: (), + // Query parameters bucket: s3_xml::Value(query.bucket_name.to_string()), key: s3_xml::Value(query.key.to_string()), upload_id: s3_xml::Value(query.upload_id.to_string()), part_number_marker: query.part_number_marker.map(|e| s3_xml::IntValue(e as i64)), max_parts: s3_xml::IntValue(query.max_parts as i64), + // Result values next_part_number_marker: next.map(|e| s3_xml::IntValue(e as i64)), - is_truncated: s3_xml::Value(next.map(|_| "true").unwrap_or("false").to_string()), + is_truncated: s3_xml::Value(format!("{}", next.is_some())), parts: info .iter() .map(|part| s3_xml::PartItem { @@ -300,6 +302,7 @@ pub async fn handle_list_parts( size: s3_xml::IntValue(part.size as i64), }) .collect(), + // Dummy result values (unsupported features) initiator: s3_xml::Initiator { display_name: s3_xml::Value(DUMMY_NAME.to_string()), @@ -462,6 +465,8 @@ fn fetch_part_info<'a>( query: &ListPartsQuery, mpu: &'a MultipartUpload, ) -> Result<(Vec>, Option), Error> { + assert!((1..=1000).contains(&query.max_parts)); // see s3/api_server.rs + // Parse multipart upload part list, removing parts not yet finished // and failed part uploads that were overwritten let mut parts: Vec> = Vec::with_capacity(mpu.parts.items().len()); @@ -496,10 +501,8 @@ fn fetch_part_info<'a>( // Cut the end if we have too many parts if parts.len() > query.max_parts as usize { parts.truncate(query.max_parts as usize); - if let Some(part_info) = parts.last() { - let pagination = Some(part_info.part_number); - return Ok((parts, pagination)); - } + let pagination = Some(parts.last().unwrap().part_number); + return Ok((parts, pagination)); } Ok((parts, None)) -- cgit v1.2.3