From 440374524bcd21cade7b68410eb2d12ba23cfaaf Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 19 Jan 2022 17:16:00 +0100 Subject: Implement ListParts --- src/api/api_server.rs | 19 +++ src/api/s3_list.rs | 374 +++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 389 insertions(+), 4 deletions(-) diff --git a/src/api/api_server.rs b/src/api/api_server.rs index b064ac24..dfb8dfdb 100644 --- a/src/api/api_server.rs +++ b/src/api/api_server.rs @@ -294,6 +294,25 @@ async fn handler_inner(garage: Arc, req: Request) -> Result { + handle_list_parts( + garage, + &ListPartsQuery { + bucket_name, + bucket_id, + key, + upload_id, + part_number_marker: part_number_marker.map(|p| p.clamp(1, 10000)), + max_parts: max_parts.map(|p| p.clamp(1, 1000)).unwrap_or(1000), + }, + ) + .await + } Endpoint::DeleteObjects {} => { handle_delete_objects(garage, bucket_id, req, content_sha256).await } diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs index c3bc6938..92998159 100644 --- a/src/api/s3_list.rs +++ b/src/api/s3_list.rs @@ -1,3 +1,4 @@ +use std::cmp::Ordering; use std::collections::{BTreeMap, BTreeSet}; use std::iter::{Iterator, Peekable}; use std::sync::Arc; @@ -10,12 +11,18 @@ use garage_util::time::*; use garage_model::garage::Garage; use garage_model::object_table::*; +use garage_model::version_table::Version; + +use garage_table::EmptyKey; use crate::encoding::*; use crate::error::*; use crate::s3_put; use crate::s3_xml; +const DUMMY_NAME: &str = "Dummy Key"; +const DUMMY_KEY: &str = "GKDummyKey"; + #[derive(Debug)] pub struct ListQueryCommon { pub bucket_name: String, @@ -42,6 +49,16 @@ pub struct ListMultipartUploadsQuery { pub common: ListQueryCommon, } +#[derive(Debug)] +pub struct ListPartsQuery { + pub bucket_name: String, + pub bucket_id: Uuid, + pub key: String, + pub upload_id: String, + pub part_number_marker: Option, + pub max_parts: u64, +} + pub async fn handle_list( garage: Arc, query: &ListObjectsQuery, @@ -54,6 +71,7 @@ pub async fn handle_list( } }; + debug!("ListObjects {:?}", query); let mut acc = query.build_accumulator(); let pagination = fetch_list_entries(&query.common, query.begin()?, &mut acc, &io).await?; @@ -152,6 +170,7 @@ pub async fn handle_list_multipart_upload( } }; + debug!("ListMultipartUploads {:?}", query); let mut acc = query.build_accumulator(); let pagination = fetch_list_entries(&query.common, query.begin()?, &mut acc, &io).await?; @@ -208,12 +227,12 @@ pub async fn handle_list_multipart_upload( upload_id: s3_xml::Value(hex::encode(uuid)), storage_class: s3_xml::Value("STANDARD".to_string()), initiator: s3_xml::Initiator { - display_name: s3_xml::Value("Dummy Key".to_string()), - id: s3_xml::Value("GKDummyKey".to_string()), + display_name: s3_xml::Value(DUMMY_NAME.to_string()), + id: s3_xml::Value(DUMMY_KEY.to_string()), }, owner: s3_xml::Owner { - display_name: s3_xml::Value("Dummy Key".to_string()), - id: s3_xml::Value("GKDummyKey".to_string()), + display_name: s3_xml::Value(DUMMY_NAME.to_string()), + id: s3_xml::Value(DUMMY_KEY.to_string()), }, }) .collect(), @@ -233,6 +252,57 @@ pub async fn handle_list_multipart_upload( .body(Body::from(xml.into_bytes()))?) } +pub async fn handle_list_parts( + garage: Arc, + query: &ListPartsQuery, +) -> Result, Error> { + debug!("ListParts {:?}", query); + + let upload_id = s3_put::decode_upload_id(&query.upload_id)?; + + let (object, version) = futures::try_join!( + garage.object_table.get(&query.bucket_id, &query.key), + garage.version_table.get(&upload_id, &EmptyKey), + )?; + + let (info, next) = fetch_part_info(query, object, version, upload_id)?; + + let result = s3_xml::ListPartsResult { + xmlns: (), + bucket: s3_xml::Value(query.bucket_name.to_string()), + key: s3_xml::Value(query.key.to_string()), + upload_id: s3_xml::Value(query.upload_id.to_string()), + part_number_marker: query.part_number_marker.map(|e| s3_xml::IntValue(e as i64)), + next_part_number_marker: next.map(|e| s3_xml::IntValue(e as i64)), + max_parts: s3_xml::IntValue(query.max_parts as i64), + is_truncated: s3_xml::Value(next.map(|_| "true").unwrap_or("false").to_string()), + parts: info + .iter() + .map(|part| s3_xml::PartItem { + etag: s3_xml::Value(format!("\"{}\"", part.etag)), + last_modified: s3_xml::Value(msec_to_rfc3339(part.timestamp)), + part_number: s3_xml::IntValue(part.part_number as i64), + size: s3_xml::IntValue(part.size as i64), + }) + .collect(), + initiator: s3_xml::Initiator { + display_name: s3_xml::Value(DUMMY_NAME.to_string()), + id: s3_xml::Value(DUMMY_KEY.to_string()), + }, + owner: s3_xml::Owner { + display_name: s3_xml::Value(DUMMY_NAME.to_string()), + id: s3_xml::Value(DUMMY_KEY.to_string()), + }, + storage_class: s3_xml::Value("STANDARD".to_string()), + }; + + let xml = s3_xml::to_xml_with_header(&result)?; + + Ok(Response::builder() + .header("Content-Type", "application/xml") + .body(Body::from(xml.into_bytes()))?) +} + /* * Private enums and structs */ @@ -250,6 +320,14 @@ struct UploadInfo { timestamp: u64, } +#[derive(Debug, PartialEq)] +struct PartInfo { + etag: String, + timestamp: u64, + part_number: u64, + size: u64, +} + enum ExtractionResult { NoMore, Filled, @@ -364,6 +442,109 @@ where } } +fn fetch_part_info( + query: &ListPartsQuery, + object: Option, + version: Option, + upload_id: Uuid, +) -> Result<(Vec, Option), Error> { + // Check results + let object = object.ok_or(Error::NoSuchKey)?; + + let obj_version = object + .versions() + .iter() + .find(|v| v.uuid == upload_id && v.is_uploading()) + .ok_or(Error::NoSuchUpload)?; + + let version = version.ok_or(Error::NoSuchKey)?; + + // Cut the beginning of our 2 vectors if required + let (etags, blocks) = match &query.part_number_marker { + Some(marker) => { + let next = marker + 1; + + let part_idx = into_ok_or_err( + version + .parts_etags + .items() + .binary_search_by(|(part_num, _)| part_num.cmp(&next)), + ); + let parts = &version.parts_etags.items()[part_idx..]; + + let block_idx = into_ok_or_err( + version + .blocks + .items() + .binary_search_by(|(vkey, _)| vkey.part_number.cmp(&next)), + ); + let blocks = &version.blocks.items()[block_idx..]; + + (parts, blocks) + } + None => (version.parts_etags.items(), version.blocks.items()), + }; + + // Use the block vector to compute a (part_number, size) vector + let mut size = Vec::<(u64, u64)>::new(); + blocks.iter().for_each(|(key, val)| { + let mut new_size = val.size; + match size.pop() { + Some((part_number, size)) if part_number == key.part_number => new_size += size, + Some(v) => size.push(v), + None => (), + } + size.push((key.part_number, new_size)) + }); + + // Merge the etag vector and size vector to build a PartInfo vector + let max_parts = query.max_parts as usize; + let (mut etag_iter, mut size_iter) = (etags.iter().peekable(), size.iter().peekable()); + + let mut info = Vec::::with_capacity(max_parts); + + while info.len() < max_parts { + match (etag_iter.peek(), size_iter.peek()) { + (Some((ep, etag)), Some((sp, size))) => match ep.cmp(sp) { + Ordering::Less => { + debug!("ETag information ignored due to missing corresponding block information. Query: {:?}", query); + etag_iter.next(); + } + Ordering::Equal => { + info.push(PartInfo { + etag: etag.to_string(), + timestamp: obj_version.timestamp, + part_number: *ep, + size: *size, + }); + etag_iter.next(); + size_iter.next(); + } + Ordering::Greater => { + debug!("Block information ignored due to missing corresponding ETag information. Query: {:?}", query); + size_iter.next(); + } + }, + (None, None) => return Ok((info, None)), + _ => { + debug!( + "Additional block or ETag information ignored. Query: {:?}", + query + ); + return Ok((info, None)); + } + } + } + + match info.last() { + Some(part_info) => { + let pagination = Some(part_info.part_number); + Ok((info, pagination)) + } + None => Ok((info, None)), + } +} + /* * ListQuery logic */ @@ -715,6 +896,14 @@ impl ExtractAccumulator for UploadAccumulator { * Utility functions */ +/// This is a stub for Result::into_ok_or_err that is not yet in Rust stable +fn into_ok_or_err(r: Result) -> T { + match r { + Ok(r) => r, + Err(r) => r, + } +} + /// Returns the common prefix of the object given the query prefix and delimiter fn common_prefix<'a>(object: &'a Object, query: &ListQueryCommon) -> Option<&'a str> { match &query.delimiter { @@ -766,6 +955,8 @@ fn key_after_prefix(pfx: &str) -> Option { #[cfg(test)] mod tests { use super::*; + use garage_model::version_table::*; + use garage_util::*; use std::iter::FromIterator; const TS: u64 = 1641394898314; @@ -1014,4 +1205,179 @@ mod tests { Ok(()) } + + fn version() -> Version { + let uuid = Uuid::from([0x08; 32]); + + let blocks = vec![ + ( + VersionBlockKey { + part_number: 1, + offset: 1, + }, + VersionBlock { + hash: uuid, + size: 3, + }, + ), + ( + VersionBlockKey { + part_number: 1, + offset: 2, + }, + VersionBlock { + hash: uuid, + size: 2, + }, + ), + ( + VersionBlockKey { + part_number: 2, + offset: 1, + }, + VersionBlock { + hash: uuid, + size: 8, + }, + ), + ( + VersionBlockKey { + part_number: 5, + offset: 1, + }, + VersionBlock { + hash: uuid, + size: 7, + }, + ), + ( + VersionBlockKey { + part_number: 8, + offset: 1, + }, + VersionBlock { + hash: uuid, + size: 5, + }, + ), + ]; + let etags = vec![ + (1, "etag1".to_string()), + (3, "etag2".to_string()), + (5, "etag3".to_string()), + (8, "etag4".to_string()), + (9, "etag5".to_string()), + ]; + + Version { + bucket_id: uuid, + key: "a".to_string(), + uuid: uuid, + deleted: false.into(), + blocks: crdt::Map::::from_iter(blocks), + parts_etags: crdt::Map::::from_iter(etags), + } + } + + fn obj() -> Object { + Object::new(bucket(), "d".to_string(), vec![objup_version([0x08; 32])]) + } + + #[test] + fn test_fetch_part_info() -> Result<(), Error> { + let uuid = Uuid::from([0x08; 32]); + let mut query = ListPartsQuery { + bucket_name: "a".to_string(), + bucket_id: uuid, + key: "a".to_string(), + upload_id: "xx".to_string(), + part_number_marker: None, + max_parts: 2, + }; + + assert!( + fetch_part_info(&query, None, None, uuid).is_err(), + "No object and version should fail" + ); + assert!( + fetch_part_info(&query, Some(obj()), None, uuid).is_err(), + "No version should faild" + ); + assert!( + fetch_part_info(&query, None, Some(version()), uuid).is_err(), + "No object should fail" + ); + + // Start from the beginning but with limited size to trigger pagination + let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?; + assert_eq!(pagination.unwrap(), 5); + assert_eq!( + info, + vec![ + PartInfo { + etag: "etag1".to_string(), + timestamp: TS, + part_number: 1, + size: 5 + }, + PartInfo { + etag: "etag3".to_string(), + timestamp: TS, + part_number: 5, + size: 7 + }, + ] + ); + + // Use previous pagination to make a new request + query.part_number_marker = Some(pagination.unwrap()); + let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?; + assert!(pagination.is_none()); + assert_eq!( + info, + vec![PartInfo { + etag: "etag4".to_string(), + timestamp: TS, + part_number: 8, + size: 5 + },] + ); + + // Trying to access a part that is way larger than registered ones + query.part_number_marker = Some(9999); + let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?; + assert!(pagination.is_none()); + assert_eq!(info, vec![]); + + // Try without any limitation + query.max_parts = 1000; + query.part_number_marker = None; + let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?; + assert!(pagination.is_none()); + assert_eq!( + info, + vec![ + PartInfo { + etag: "etag1".to_string(), + timestamp: TS, + part_number: 1, + size: 5 + }, + PartInfo { + etag: "etag3".to_string(), + timestamp: TS, + part_number: 5, + size: 7 + }, + PartInfo { + etag: "etag4".to_string(), + timestamp: TS, + part_number: 8, + size: 5 + }, + ] + ); + + Ok(()) + } } -- cgit v1.2.3