diff options
author | Alex Auvolat <alex@adnab.me> | 2023-05-03 12:02:59 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-06-09 16:23:37 +0200 |
commit | 82e75c0e296c74c374f3d40feeb1aadcb58398f0 (patch) | |
tree | 5449f6cd219e9b6628cb10d70f86a952d5b35382 /src/api/s3/list.rs | |
parent | 38d6ac429506f9f488ac522581b12fa530442a59 (diff) | |
download | garage-82e75c0e296c74c374f3d40feeb1aadcb58398f0.tar.gz garage-82e75c0e296c74c374f3d40feeb1aadcb58398f0.zip |
Adapt S3 API code to use new multipart upload models
- Create and PutPart
- completemultipartupload
- upload part copy
- list_parts
Diffstat (limited to 'src/api/s3/list.rs')
-rw-r--r-- | src/api/s3/list.rs | 177 |
1 files changed, 65 insertions, 112 deletions
diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs index 5cb0d65a..5a9eb133 100644 --- a/src/api/s3/list.rs +++ b/src/api/s3/list.rs @@ -1,4 +1,3 @@ -use std::cmp::Ordering; use std::collections::{BTreeMap, BTreeSet}; use std::iter::{Iterator, Peekable}; use std::sync::Arc; @@ -11,15 +10,15 @@ use garage_util::error::Error as GarageError; use garage_util::time::*; use garage_model::garage::Garage; +use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; -use garage_model::s3::version_table::Version; -use garage_table::{EmptyKey, EnumerationOrder}; +use garage_table::EnumerationOrder; use crate::encoding::*; use crate::helpers::key_after_prefix; use crate::s3::error::*; -use crate::s3::put as s3_put; +use crate::s3::multipart as s3_multipart; use crate::s3::xml as s3_xml; const DUMMY_NAME: &str = "Dummy Key"; @@ -176,7 +175,9 @@ pub async fn handle_list_multipart_upload( t.get_range( &bucket, key, - Some(ObjectFilter::IsUploading), + Some(ObjectFilter::IsUploading { + check_multipart: Some(true), + }), count, EnumerationOrder::Forward, ) @@ -272,23 +273,23 @@ pub async fn handle_list_parts( ) -> Result<Response<Body>, Error> { debug!("ListParts {:?}", query); - let upload_id = s3_put::decode_upload_id(&query.upload_id)?; + let upload_id = s3_multipart::decode_upload_id(&query.upload_id)?; - let (object, version) = futures::try_join!( - garage.object_table.get(&query.bucket_id, &query.key), - garage.version_table.get(&upload_id, &EmptyKey), - )?; + let (_, _, mpu) = + s3_multipart::get_upload(&garage, &query.bucket_id, &query.key, &upload_id).await?; - let (info, next) = fetch_part_info(query, object, version, upload_id)?; + let (info, next) = fetch_part_info(query, &mpu)?; let result = s3_xml::ListPartsResult { xmlns: (), + // Query parameters bucket: s3_xml::Value(query.bucket_name.to_string()), key: s3_xml::Value(query.key.to_string()), upload_id: s3_xml::Value(query.upload_id.to_string()), part_number_marker: query.part_number_marker.map(|e| s3_xml::IntValue(e as i64)), - next_part_number_marker: next.map(|e| s3_xml::IntValue(e as i64)), max_parts: s3_xml::IntValue(query.max_parts as i64), + // Result values + next_part_number_marker: next.map(|e| s3_xml::IntValue(e as i64)), is_truncated: s3_xml::Value(next.map(|_| "true").unwrap_or("false").to_string()), parts: info .iter() @@ -299,6 +300,7 @@ pub async fn handle_list_parts( size: s3_xml::IntValue(part.size as i64), }) .collect(), + // Dummy result values (unsupported features) initiator: s3_xml::Initiator { display_name: s3_xml::Value(DUMMY_NAME.to_string()), id: s3_xml::Value(DUMMY_KEY.to_string()), @@ -335,8 +337,8 @@ struct UploadInfo { } #[derive(Debug, PartialEq)] -struct PartInfo { - etag: String, +struct PartInfo<'a> { + etag: &'a str, timestamp: u64, part_number: u64, size: u64, @@ -456,106 +458,52 @@ where } } -fn fetch_part_info( +fn fetch_part_info<'a>( query: &ListPartsQuery, - object: Option<Object>, - version: Option<Version>, - upload_id: Uuid, -) -> Result<(Vec<PartInfo>, Option<u64>), Error> { - // Check results - let object = object.ok_or(Error::NoSuchKey)?; - - let obj_version = object - .versions() - .iter() - .find(|v| v.uuid == upload_id && v.is_uploading()) - .ok_or(Error::NoSuchUpload)?; - - let version = version.ok_or(Error::NoSuchKey)?; - - // Cut the beginning of our 2 vectors if required - let (etags, blocks) = match &query.part_number_marker { - Some(marker) => { - let next = marker + 1; - - let part_idx = into_ok_or_err( - version - .parts_etags - .items() - .binary_search_by(|(part_num, _)| part_num.cmp(&next)), - ); - let parts = &version.parts_etags.items()[part_idx..]; - - let block_idx = into_ok_or_err( - version - .blocks - .items() - .binary_search_by(|(vkey, _)| vkey.part_number.cmp(&next)), - ); - let blocks = &version.blocks.items()[block_idx..]; - - (parts, blocks) - } - None => (version.parts_etags.items(), version.blocks.items()), - }; - - // Use the block vector to compute a (part_number, size) vector - let mut size = Vec::<(u64, u64)>::new(); - blocks.iter().for_each(|(key, val)| { - let mut new_size = val.size; - match size.pop() { - Some((part_number, size)) if part_number == key.part_number => new_size += size, - Some(v) => size.push(v), - None => (), - } - size.push((key.part_number, new_size)) - }); - - // Merge the etag vector and size vector to build a PartInfo vector - let max_parts = query.max_parts as usize; - let (mut etag_iter, mut size_iter) = (etags.iter().peekable(), size.iter().peekable()); - - let mut info = Vec::<PartInfo>::with_capacity(max_parts); - - while info.len() < max_parts { - match (etag_iter.peek(), size_iter.peek()) { - (Some((ep, etag)), Some((sp, size))) => match ep.cmp(sp) { - Ordering::Less => { - debug!("ETag information ignored due to missing corresponding block information. Query: {:?}", query); - etag_iter.next(); - } - Ordering::Equal => { - info.push(PartInfo { - etag: etag.to_string(), - timestamp: obj_version.timestamp, - part_number: *ep, - size: *size, - }); - etag_iter.next(); - size_iter.next(); + mpu: &'a MultipartUpload, +) -> Result<(Vec<PartInfo<'a>>, Option<u64>), Error> { + // Parse multipart upload part list, removing parts not yet finished + // and failed part uploads that were overwritten + let mut parts: Vec<PartInfo<'a>> = Vec::with_capacity(mpu.parts.items().len()); + for (pk, p) in mpu.parts.items().iter() { + if let (Some(etag), Some(size)) = (&p.etag, p.size) { + let part_info = PartInfo { + part_number: pk.part_number, + timestamp: pk.timestamp, + etag, + size, + }; + match parts.last_mut() { + Some(lastpart) if lastpart.part_number == pk.part_number => { + *lastpart = part_info; } - Ordering::Greater => { - debug!("Block information ignored due to missing corresponding ETag information. Query: {:?}", query); - size_iter.next(); + _ => { + parts.push(part_info); } - }, - (None, None) => return Ok((info, None)), - _ => { - debug!( - "Additional block or ETag information ignored. Query: {:?}", - query - ); - return Ok((info, None)); } } } - match info.last() { + // Cut the beginning and end + match &query.part_number_marker { + Some(marker) => { + let next = marker + 1; + let part_idx = + into_ok_or_err(parts.binary_search_by(|part| part.part_number.cmp(&next))); + parts.truncate(part_idx + query.max_parts as usize); + parts = parts.split_off(part_idx); + } + None => { + parts.truncate(query.max_parts as usize); + } + }; + + match parts.last() { Some(part_info) => { let pagination = Some(part_info.part_number); - Ok((info, pagination)) + Ok((parts, pagination)) } - None => Ok((info, None)), + None => Ok((parts, None)), } } @@ -651,7 +599,7 @@ impl ListMultipartUploadsQuery { }), uuid => Ok(RangeBegin::AfterUpload { key: key_marker.to_string(), - upload: s3_put::decode_upload_id(uuid)?, + upload: s3_multipart::decode_upload_id(uuid)?, }), }, @@ -843,7 +791,7 @@ impl ExtractAccumulator for UploadAccumulator { let mut uploads_for_key = object .versions() .iter() - .filter(|x| x.is_uploading()) + .filter(|x| x.is_uploading(Some(true))) .collect::<Vec<&ObjectVersion>>(); // S3 logic requires lexicographically sorted upload ids. @@ -991,10 +939,13 @@ mod tests { ObjectVersion { uuid: Uuid::from(uuid), timestamp: TS, - state: ObjectVersionState::Uploading(ObjectVersionHeaders { - content_type: "text/plain".to_string(), - other: BTreeMap::<String, String>::new(), - }), + state: ObjectVersionState::Uploading { + multipart: true, + headers: ObjectVersionHeaders { + content_type: "text/plain".to_string(), + other: BTreeMap::<String, String>::new(), + }, + }, } } @@ -1233,11 +1184,13 @@ mod tests { ]; Version { - bucket_id: uuid, - key: "a".to_string(), uuid, deleted: false.into(), blocks: crdt::Map::<VersionBlockKey, VersionBlock>::from_iter(blocks), + backlink: VersionBacklink::Object { + bucket_id: uuid, + key: "a".to_string(), + }, parts_etags: crdt::Map::<u64, String>::from_iter(etags), } } |