diff options
author | Alex <alex@adnab.me> | 2022-05-10 13:16:57 +0200 |
---|---|---|
committer | Alex <alex@adnab.me> | 2022-05-10 13:16:57 +0200 |
commit | 5768bf362262f78376af14517c4921941986192e (patch) | |
tree | b4baf3051eade0f63649443278bb3a3f4c38ec25 /src/api/s3/list.rs | |
parent | def78c5e6f5da37a0d17b5652c525fbeccbc2e86 (diff) | |
download | garage-5768bf362262f78376af14517c4921941986192e.tar.gz garage-5768bf362262f78376af14517c4921941986192e.zip |
First implementation of K2V (#293)
**Specification:**
View spec at [this URL](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/k2v/doc/drafts/k2v-spec.md)
- [x] Specify the structure of K2V triples
- [x] Specify the DVVS format used for causality detection
- [x] Specify the K2V index (just a counter of number of values per partition key)
- [x] Specify single-item endpoints: ReadItem, InsertItem, DeleteItem
- [x] Specify index endpoint: ReadIndex
- [x] Specify multi-item endpoints: InsertBatch, ReadBatch, DeleteBatch
- [x] Move to JSON objects instead of tuples
- [x] Specify endpoints for polling for updates on single values (PollItem)
**Implementation:**
- [x] Table for K2V items, causal contexts
- [x] Indexing mechanism and table for K2V index
- [x] Make API handlers a bit more generic
- [x] K2V API endpoint
- [x] K2V API router
- [x] ReadItem
- [x] InsertItem
- [x] DeleteItem
- [x] PollItem
- [x] ReadIndex
- [x] InsertBatch
- [x] ReadBatch
- [x] DeleteBatch
**Testing:**
- [x] Just a simple Python script that does some requests to check visually that things are going right (does not contain parsing of results or assertions on returned values)
- [x] Actual tests:
- [x] Adapt testing framework
- [x] Simple test with InsertItem + ReadItem
- [x] Test with several Insert/Read/DeleteItem + ReadIndex
- [x] Test all combinations of return formats for ReadItem
- [x] Test with ReadBatch, InsertBatch, DeleteBatch
- [x] Test with PollItem
- [x] Test error codes
- [ ] Fix most broken stuff
- [x] test PollItem broken randomly
- [x] when invalid causality tokens are given, errors should be 4xx not 5xx
**Improvements:**
- [x] Descending range queries
- [x] Specify
- [x] Implement
- [x] Add test
- [x] Batch updates to index counter
- [x] Put K2V behind `k2v` feature flag
Co-authored-by: Alex Auvolat <alex@adnab.me>
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/293
Co-authored-by: Alex <alex@adnab.me>
Co-committed-by: Alex <alex@adnab.me>
Diffstat (limited to 'src/api/s3/list.rs')
-rw-r--r-- | src/api/s3/list.rs | 1337 |
1 files changed, 1337 insertions, 0 deletions
diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs new file mode 100644 index 00000000..e2848c57 --- /dev/null +++ b/src/api/s3/list.rs @@ -0,0 +1,1337 @@ +use std::cmp::Ordering; +use std::collections::{BTreeMap, BTreeSet}; +use std::iter::{Iterator, Peekable}; +use std::sync::Arc; + +use hyper::{Body, Response}; + +use garage_util::data::*; +use garage_util::error::Error as GarageError; +use garage_util::time::*; + +use garage_model::garage::Garage; +use garage_model::s3::object_table::*; +use garage_model::s3::version_table::Version; + +use garage_table::{EmptyKey, EnumerationOrder}; + +use crate::encoding::*; +use crate::error::*; +use crate::helpers::key_after_prefix; +use crate::s3::put as s3_put; +use crate::s3::xml as s3_xml; + +const DUMMY_NAME: &str = "Dummy Key"; +const DUMMY_KEY: &str = "GKDummyKey"; + +#[derive(Debug)] +pub struct ListQueryCommon { + pub bucket_name: String, + pub bucket_id: Uuid, + pub delimiter: Option<String>, + pub page_size: usize, + pub prefix: String, + pub urlencode_resp: bool, +} + +#[derive(Debug)] +pub struct ListObjectsQuery { + pub is_v2: bool, + pub marker: Option<String>, + pub continuation_token: Option<String>, + pub start_after: Option<String>, + pub common: ListQueryCommon, +} + +#[derive(Debug)] +pub struct ListMultipartUploadsQuery { + pub key_marker: Option<String>, + pub upload_id_marker: Option<String>, + pub common: ListQueryCommon, +} + +#[derive(Debug)] +pub struct ListPartsQuery { + pub bucket_name: String, + pub bucket_id: Uuid, + pub key: String, + pub upload_id: String, + pub part_number_marker: Option<u64>, + pub max_parts: u64, +} + +pub async fn handle_list( + garage: Arc<Garage>, + query: &ListObjectsQuery, +) -> Result<Response<Body>, Error> { + let io = |bucket, key, count| { + let t = &garage.object_table; + async move { + t.get_range( + &bucket, + key, + Some(ObjectFilter::IsData), + count, + EnumerationOrder::Forward, + ) + .await + } + }; + + debug!("ListObjects {:?}", query); + let mut acc = query.build_accumulator(); + let pagination = fetch_list_entries(&query.common, query.begin()?, &mut acc, &io).await?; + + let result = s3_xml::ListBucketResult { + xmlns: (), + // Sending back request information + name: s3_xml::Value(query.common.bucket_name.to_string()), + prefix: uriencode_maybe(&query.common.prefix, query.common.urlencode_resp), + max_keys: s3_xml::IntValue(query.common.page_size as i64), + delimiter: query + .common + .delimiter + .as_ref() + .map(|x| uriencode_maybe(x, query.common.urlencode_resp)), + encoding_type: match query.common.urlencode_resp { + true => Some(s3_xml::Value("url".to_string())), + false => None, + }, + marker: match (!query.is_v2, &query.marker) { + (true, Some(k)) => Some(uriencode_maybe(k, query.common.urlencode_resp)), + _ => None, + }, + start_after: match (query.is_v2, &query.start_after) { + (true, Some(sa)) => Some(uriencode_maybe(sa, query.common.urlencode_resp)), + _ => None, + }, + continuation_token: match (query.is_v2, &query.continuation_token) { + (true, Some(ct)) => Some(s3_xml::Value(ct.to_string())), + _ => None, + }, + + // Pagination + is_truncated: s3_xml::Value(format!("{}", pagination.is_some())), + key_count: Some(s3_xml::IntValue( + acc.keys.len() as i64 + acc.common_prefixes.len() as i64, + )), + next_marker: match (!query.is_v2, &pagination) { + (true, Some(RangeBegin::AfterKey { key: k })) + | ( + true, + Some(RangeBegin::IncludingKey { + fallback_key: Some(k), + .. + }), + ) => Some(uriencode_maybe(k, query.common.urlencode_resp)), + _ => None, + }, + next_continuation_token: match (query.is_v2, &pagination) { + (true, Some(RangeBegin::AfterKey { key })) => Some(s3_xml::Value(format!( + "]{}", + base64::encode(key.as_bytes()) + ))), + (true, Some(RangeBegin::IncludingKey { key, .. })) => Some(s3_xml::Value(format!( + "[{}", + base64::encode(key.as_bytes()) + ))), + _ => None, + }, + + // Body + contents: acc + .keys + .iter() + .map(|(key, info)| s3_xml::ListBucketItem { + key: uriencode_maybe(key, query.common.urlencode_resp), + last_modified: s3_xml::Value(msec_to_rfc3339(info.last_modified)), + size: s3_xml::IntValue(info.size as i64), + etag: s3_xml::Value(format!("\"{}\"", info.etag)), + storage_class: s3_xml::Value("STANDARD".to_string()), + }) + .collect(), + common_prefixes: acc + .common_prefixes + .iter() + .map(|pfx| s3_xml::CommonPrefix { + prefix: uriencode_maybe(pfx, query.common.urlencode_resp), + }) + .collect(), + }; + + let xml = s3_xml::to_xml_with_header(&result)?; + Ok(Response::builder() + .header("Content-Type", "application/xml") + .body(Body::from(xml.into_bytes()))?) +} + +pub async fn handle_list_multipart_upload( + garage: Arc<Garage>, + query: &ListMultipartUploadsQuery, +) -> Result<Response<Body>, Error> { + let io = |bucket, key, count| { + let t = &garage.object_table; + async move { + t.get_range( + &bucket, + key, + Some(ObjectFilter::IsUploading), + count, + EnumerationOrder::Forward, + ) + .await + } + }; + + debug!("ListMultipartUploads {:?}", query); + let mut acc = query.build_accumulator(); + let pagination = fetch_list_entries(&query.common, query.begin()?, &mut acc, &io).await?; + + let result = s3_xml::ListMultipartUploadsResult { + xmlns: (), + + // Sending back some information about the request + bucket: s3_xml::Value(query.common.bucket_name.to_string()), + prefix: uriencode_maybe(&query.common.prefix, query.common.urlencode_resp), + delimiter: query + .common + .delimiter + .as_ref() + .map(|d| uriencode_maybe(d, query.common.urlencode_resp)), + max_uploads: s3_xml::IntValue(query.common.page_size as i64), + key_marker: query + .key_marker + .as_ref() + .map(|m| uriencode_maybe(m, query.common.urlencode_resp)), + upload_id_marker: query + .upload_id_marker + .as_ref() + .map(|m| s3_xml::Value(m.to_string())), + encoding_type: match query.common.urlencode_resp { + true => Some(s3_xml::Value("url".to_string())), + false => None, + }, + + // Handling pagination + is_truncated: s3_xml::Value(format!("{}", pagination.is_some())), + next_key_marker: match &pagination { + None => None, + Some(RangeBegin::AfterKey { key }) + | Some(RangeBegin::AfterUpload { key, .. }) + | Some(RangeBegin::IncludingKey { key, .. }) => { + Some(uriencode_maybe(key, query.common.urlencode_resp)) + } + }, + next_upload_id_marker: match pagination { + Some(RangeBegin::AfterUpload { upload, .. }) => { + Some(s3_xml::Value(hex::encode(upload))) + } + Some(RangeBegin::IncludingKey { .. }) => Some(s3_xml::Value("include".to_string())), + _ => None, + }, + + // Result body + upload: acc + .keys + .iter() + .map(|(uuid, info)| s3_xml::ListMultipartItem { + initiated: s3_xml::Value(msec_to_rfc3339(info.timestamp)), + key: uriencode_maybe(&info.key, query.common.urlencode_resp), + upload_id: s3_xml::Value(hex::encode(uuid)), + storage_class: s3_xml::Value("STANDARD".to_string()), + initiator: s3_xml::Initiator { + display_name: s3_xml::Value(DUMMY_NAME.to_string()), + id: s3_xml::Value(DUMMY_KEY.to_string()), + }, + owner: s3_xml::Owner { + display_name: s3_xml::Value(DUMMY_NAME.to_string()), + id: s3_xml::Value(DUMMY_KEY.to_string()), + }, + }) + .collect(), + common_prefixes: acc + .common_prefixes + .iter() + .map(|c| s3_xml::CommonPrefix { + prefix: s3_xml::Value(c.to_string()), + }) + .collect(), + }; + + let xml = s3_xml::to_xml_with_header(&result)?; + + Ok(Response::builder() + .header("Content-Type", "application/xml") + .body(Body::from(xml.into_bytes()))?) +} + +pub async fn handle_list_parts( + garage: Arc<Garage>, + query: &ListPartsQuery, +) -> Result<Response<Body>, Error> { + debug!("ListParts {:?}", query); + + let upload_id = s3_put::decode_upload_id(&query.upload_id)?; + + let (object, version) = futures::try_join!( + garage.object_table.get(&query.bucket_id, &query.key), + garage.version_table.get(&upload_id, &EmptyKey), + )?; + + let (info, next) = fetch_part_info(query, object, version, upload_id)?; + + let result = s3_xml::ListPartsResult { + xmlns: (), + bucket: s3_xml::Value(query.bucket_name.to_string()), + key: s3_xml::Value(query.key.to_string()), + upload_id: s3_xml::Value(query.upload_id.to_string()), + part_number_marker: query.part_number_marker.map(|e| s3_xml::IntValue(e as i64)), + next_part_number_marker: next.map(|e| s3_xml::IntValue(e as i64)), + max_parts: s3_xml::IntValue(query.max_parts as i64), + is_truncated: s3_xml::Value(next.map(|_| "true").unwrap_or("false").to_string()), + parts: info + .iter() + .map(|part| s3_xml::PartItem { + etag: s3_xml::Value(format!("\"{}\"", part.etag)), + last_modified: s3_xml::Value(msec_to_rfc3339(part.timestamp)), + part_number: s3_xml::IntValue(part.part_number as i64), + size: s3_xml::IntValue(part.size as i64), + }) + .collect(), + initiator: s3_xml::Initiator { + display_name: s3_xml::Value(DUMMY_NAME.to_string()), + id: s3_xml::Value(DUMMY_KEY.to_string()), + }, + owner: s3_xml::Owner { + display_name: s3_xml::Value(DUMMY_NAME.to_string()), + id: s3_xml::Value(DUMMY_KEY.to_string()), + }, + storage_class: s3_xml::Value("STANDARD".to_string()), + }; + + let xml = s3_xml::to_xml_with_header(&result)?; + + Ok(Response::builder() + .header("Content-Type", "application/xml") + .body(Body::from(xml.into_bytes()))?) +} + +/* + * Private enums and structs + */ + +#[derive(Debug)] +struct ObjectInfo { + last_modified: u64, + size: u64, + etag: String, +} + +#[derive(Debug, PartialEq)] +struct UploadInfo { + key: String, + timestamp: u64, +} + +#[derive(Debug, PartialEq)] +struct PartInfo { + etag: String, + timestamp: u64, + part_number: u64, + size: u64, +} + +enum ExtractionResult { + NoMore, + Filled, + FilledAtUpload { + key: String, + upload: Uuid, + }, + Extracted { + key: String, + }, + // Fallback key is used for legacy APIs that only support + // exlusive pagination (and not inclusive one). + SkipTo { + key: String, + fallback_key: Option<String>, + }, +} + +#[derive(PartialEq, Clone, Debug)] +enum RangeBegin { + // Fallback key is used for legacy APIs that only support + // exlusive pagination (and not inclusive one). + IncludingKey { + key: String, + fallback_key: Option<String>, + }, + AfterKey { + key: String, + }, + AfterUpload { + key: String, + upload: Uuid, + }, +} +type Pagination = Option<RangeBegin>; + +/* + * Fetch list entries + */ + +async fn fetch_list_entries<R, F>( + query: &ListQueryCommon, + begin: RangeBegin, + acc: &mut impl ExtractAccumulator, + mut io: F, +) -> Result<Pagination, Error> +where + R: futures::Future<Output = Result<Vec<Object>, GarageError>>, + F: FnMut(Uuid, Option<String>, usize) -> R, +{ + let mut cursor = begin; + // +1 is needed as we may need to skip the 1st key + // (range is inclusive while most S3 requests are exclusive) + let count = query.page_size + 1; + + loop { + let start_key = match cursor { + RangeBegin::AfterKey { ref key } + | RangeBegin::AfterUpload { ref key, .. } + | RangeBegin::IncludingKey { ref key, .. } => Some(key.clone()), + }; + + // Fetch objects + let objects = io(query.bucket_id, start_key.clone(), count).await?; + + debug!( + "List: get range {:?} (max {}), results: {}", + start_key, + count, + objects.len() + ); + let server_more = objects.len() >= count; + + let prev_req_cursor = cursor.clone(); + let mut iter = objects.iter().peekable(); + + // Drop the first key if needed + // Only AfterKey requires it according to the S3 spec and our implem. + match (&cursor, iter.peek()) { + (RangeBegin::AfterKey { key }, Some(object)) if &object.key == key => iter.next(), + (_, _) => None, + }; + + while let Some(object) = iter.peek() { + if !object.key.starts_with(&query.prefix) { + // If the key is not in the requested prefix, we're done + return Ok(None); + } + + cursor = match acc.extract(query, &cursor, &mut iter) { + ExtractionResult::Extracted { key } => RangeBegin::AfterKey { key }, + ExtractionResult::SkipTo { key, fallback_key } => { + RangeBegin::IncludingKey { key, fallback_key } + } + ExtractionResult::FilledAtUpload { key, upload } => { + return Ok(Some(RangeBegin::AfterUpload { key, upload })) + } + ExtractionResult::Filled => return Ok(Some(cursor)), + ExtractionResult::NoMore => return Ok(None), + }; + } + + if !server_more { + // We did not fully fill the accumulator despite exhausting all the data we have, + // we're done + return Ok(None); + } + + if prev_req_cursor == cursor { + unreachable!("No progress has been done in the loop. This is a bug, please report it."); + } + } +} + +fn fetch_part_info( + query: &ListPartsQuery, + object: Option<Object>, + version: Option<Version>, + upload_id: Uuid, +) -> Result<(Vec<PartInfo>, Option<u64>), Error> { + // Check results + let object = object.ok_or(Error::NoSuchKey)?; + + let obj_version = object + .versions() + .iter() + .find(|v| v.uuid == upload_id && v.is_uploading()) + .ok_or(Error::NoSuchUpload)?; + + let version = version.ok_or(Error::NoSuchKey)?; + + // Cut the beginning of our 2 vectors if required + let (etags, blocks) = match &query.part_number_marker { + Some(marker) => { + let next = marker + 1; + + let part_idx = into_ok_or_err( + version + .parts_etags + .items() + .binary_search_by(|(part_num, _)| part_num.cmp(&next)), + ); + let parts = &version.parts_etags.items()[part_idx..]; + + let block_idx = into_ok_or_err( + version + .blocks + .items() + .binary_search_by(|(vkey, _)| vkey.part_number.cmp(&next)), + ); + let blocks = &version.blocks.items()[block_idx..]; + + (parts, blocks) + } + None => (version.parts_etags.items(), version.blocks.items()), + }; + + // Use the block vector to compute a (part_number, size) vector + let mut size = Vec::<(u64, u64)>::new(); + blocks.iter().for_each(|(key, val)| { + let mut new_size = val.size; + match size.pop() { + Some((part_number, size)) if part_number == key.part_number => new_size += size, + Some(v) => size.push(v), + None => (), + } + size.push((key.part_number, new_size)) + }); + + // Merge the etag vector and size vector to build a PartInfo vector + let max_parts = query.max_parts as usize; + let (mut etag_iter, mut size_iter) = (etags.iter().peekable(), size.iter().peekable()); + + let mut info = Vec::<PartInfo>::with_capacity(max_parts); + + while info.len() < max_parts { + match (etag_iter.peek(), size_iter.peek()) { + (Some((ep, etag)), Some((sp, size))) => match ep.cmp(sp) { + Ordering::Less => { + debug!("ETag information ignored due to missing corresponding block information. Query: {:?}", query); + etag_iter.next(); + } + Ordering::Equal => { + info.push(PartInfo { + etag: etag.to_string(), + timestamp: obj_version.timestamp, + part_number: *ep, + size: *size, + }); + etag_iter.next(); + size_iter.next(); + } + Ordering::Greater => { + debug!("Block information ignored due to missing corresponding ETag information. Query: {:?}", query); + size_iter.next(); + } + }, + (None, None) => return Ok((info, None)), + _ => { + debug!( + "Additional block or ETag information ignored. Query: {:?}", + query + ); + return Ok((info, None)); + } + } + } + + match info.last() { + Some(part_info) => { + let pagination = Some(part_info.part_number); + Ok((info, pagination)) + } + None => Ok((info, None)), + } +} + +/* + * ListQuery logic + */ + +/// Determine the key from where we want to start fetch objects from the database +/// +/// We choose whether the object at this key must +/// be included or excluded from the response. +/// This key can be the prefix in the base case, or intermediate +/// points in the dataset if we are continuing a previous listing. +impl ListObjectsQuery { + fn build_accumulator(&self) -> Accumulator<String, ObjectInfo> { + Accumulator::<String, ObjectInfo>::new(self.common.page_size) + } + + fn begin(&self) -> Result<RangeBegin, Error> { + if self.is_v2 { + match (&self.continuation_token, &self.start_after) { + // In V2 mode, the continuation token is defined as an opaque + // string in the spec, so we can do whatever we want with it. + // In our case, it is defined as either [ or ] (for include + // representing the key to start with. + (Some(token), _) => match &token[..1] { + "[" => Ok(RangeBegin::IncludingKey { + key: String::from_utf8(base64::decode(token[1..].as_bytes())?)?, + fallback_key: None, + }), + "]" => Ok(RangeBegin::AfterKey { + key: String::from_utf8(base64::decode(token[1..].as_bytes())?)?, + }), + _ => Err(Error::BadRequest("Invalid continuation token".to_string())), + }, + + // StartAfter has defined semantics in the spec: + // start listing at the first key immediately after. + (_, Some(key)) => Ok(RangeBegin::AfterKey { + key: key.to_string(), + }), + + // In the case where neither is specified, we start + // listing at the specified prefix. If an object has this + // exact same key, we include it. (@TODO is this correct?) + _ => Ok(RangeBegin::IncludingKey { + key: self.common.prefix.to_string(), + fallback_key: None, + }), + } + } else { + match &self.marker { + // In V1 mode, the spec defines the Marker value to mean + // the same thing as the StartAfter value in V2 mode. + Some(key) => Ok(RangeBegin::AfterKey { + key: key.to_string(), + }), + _ => Ok(RangeBegin::IncludingKey { + key: self.common.prefix.to_string(), + fallback_key: None, + }), + } + } + } +} + +impl ListMultipartUploadsQuery { + fn build_accumulator(&self) -> Accumulator<Uuid, UploadInfo> { + Accumulator::<Uuid, UploadInfo>::new(self.common.page_size) + } + + fn begin(&self) -> Result<RangeBegin, Error> { + match (&self.upload_id_marker, &self.key_marker) { + // If both the upload id marker and the key marker are sets, + // the spec specifies that we must start listing uploads INCLUDING the given key, + // AFTER the specified upload id (sorted in a lexicographic order). + // To enable some optimisations, we emulate "IncludingKey" by extending the upload id + // semantic. We base our reasoning on the hypothesis that S3's upload ids are opaques + // while Garage's ones are 32 bytes hex encoded which enables us to extend this query + // with a specific "include" upload id. + (Some(up_marker), Some(key_marker)) => match &up_marker[..] { + "include" => Ok(RangeBegin::IncludingKey { + key: key_marker.to_string(), + fallback_key: None, + }), + uuid => Ok(RangeBegin::AfterUpload { + key: key_marker.to_string(), + upload: s3_put::decode_upload_id(uuid)?, + }), + }, + + // If only the key marker is specified, the spec says that we must start listing + // uploads AFTER the specified key. + (None, Some(key_marker)) => Ok(RangeBegin::AfterKey { + key: key_marker.to_string(), + }), + _ => Ok(RangeBegin::IncludingKey { + key: self.common.prefix.to_string(), + fallback_key: None, + }), + } + } +} + +/* + * Accumulator logic + */ + +trait ExtractAccumulator { + fn extract<'a>( + &mut self, + query: &ListQueryCommon, + cursor: &RangeBegin, + iter: &mut Peekable<impl Iterator<Item = &'a Object>>, + ) -> ExtractionResult; +} + +struct Accumulator<K, V> { + common_prefixes: BTreeSet<String>, + keys: BTreeMap<K, V>, + max_capacity: usize, +} + +type ObjectAccumulator = Accumulator<String, ObjectInfo>; +type UploadAccumulator = Accumulator<Uuid, UploadInfo>; + +impl<K: std::cmp::Ord, V> Accumulator<K, V> { + fn new(page_size: usize) -> Accumulator<K, V> { + Accumulator { + common_prefixes: BTreeSet::<String>::new(), + keys: BTreeMap::<K, V>::new(), + max_capacity: page_size, + } + } + + /// Observe the Object iterator and try to extract a single common prefix + /// + /// This function can consume an arbitrary number of items as long as they share the same + /// common prefix. + fn extract_common_prefix<'a>( + &mut self, + objects: &mut Peekable<impl Iterator<Item = &'a Object>>, + query: &ListQueryCommon, + ) -> Option<ExtractionResult> { + // Get the next object from the iterator + let object = objects.peek().expect("This iterator can not be empty as it is checked earlier in the code. This is a logic bug, please report it."); + + // Check if this is a common prefix (requires a passed delimiter and its value in the key) + let pfx = match common_prefix(object, query) { + Some(p) => p, + None => return None, + }; + + // Try to register this prefix + // If not possible, we can return early + if !self.try_insert_common_prefix(pfx.to_string()) { + return Some(ExtractionResult::Filled); + } + + // We consume the whole common prefix from the iterator + let mut last_pfx_key = &object.key; + loop { + last_pfx_key = match objects.peek() { + Some(o) if o.key.starts_with(pfx) => &o.key, + Some(_) => { + return Some(ExtractionResult::Extracted { + key: last_pfx_key.to_owned(), + }) + } + None => { + return match key_after_prefix(pfx) { + Some(next) => Some(ExtractionResult::SkipTo { + key: next, + fallback_key: Some(last_pfx_key.to_owned()), + }), + None => Some(ExtractionResult::NoMore), + } + } + }; + + objects.next(); + } + } + + fn is_full(&mut self) -> bool { + self.keys.len() + self.common_prefixes.len() >= self.max_capacity + } + + fn try_insert_common_prefix(&mut self, key: String) -> bool { + // If we already have an entry, we can continue + if self.common_prefixes.contains(&key) { + return true; + } + + // Otherwise, we need to check if we can add it + match self.is_full() { + true => false, + false => { + self.common_prefixes.insert(key); + true + } + } + } + + fn try_insert_entry(&mut self, key: K, value: V) -> bool { + // It is impossible to add twice a key, this is an error + assert!(!self.keys.contains_key(&key)); + + match self.is_full() { + true => false, + false => { + self.keys.insert(key, value); + true + } + } + } +} + +impl ExtractAccumulator for ObjectAccumulator { + fn extract<'a>( + &mut self, + query: &ListQueryCommon, + _cursor: &RangeBegin, + objects: &mut Peekable<impl Iterator<Item = &'a Object>>, + ) -> ExtractionResult { + if let Some(e) = self.extract_common_prefix(objects, query) { + return e; + } + + let object = objects.next().expect("This iterator can not be empty as it is checked earlier in the code. This is a logic bug, please report it."); + + let version = match object.versions().iter().find(|x| x.is_data()) { + Some(v) => v, + None => unreachable!( + "Expect to have objects having data due to earlier filtering. This is a logic bug." + ), + }; + + let meta = match &version.state { + ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta, + ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta, + _ => unreachable!(), + }; + let info = ObjectInfo { + last_modified: version.timestamp, + size: meta.size, + etag: meta.etag.to_string(), + }; + + match self.try_insert_entry(object.key.clone(), info) { + true => ExtractionResult::Extracted { + key: object.key.clone(), + }, + false => ExtractionResult::Filled, + } + } +} + +impl ExtractAccumulator for UploadAccumulator { + /// Observe the iterator, process a single key, and try to extract one or more upload entries + /// + /// This function processes a single object from the iterator that can contain an arbitrary + /// number of versions, and thus "uploads". + fn extract<'a>( + &mut self, + query: &ListQueryCommon, + cursor: &RangeBegin, + objects: &mut Peekable<impl Iterator<Item = &'a Object>>, + ) -> ExtractionResult { + if let Some(e) = self.extract_common_prefix(objects, query) { + return e; + } + + // Get the next object from the iterator + let object = objects.next().expect("This iterator can not be empty as it is checked earlier in the code. This is a logic bug, please report it."); + + let mut uploads_for_key = object + .versions() + .iter() + .filter(|x| x.is_uploading()) + .collect::<Vec<&ObjectVersion>>(); + + // S3 logic requires lexicographically sorted upload ids. + uploads_for_key.sort_unstable_by_key(|e| e.uuid); + + // Skip results if an upload marker is provided + if let RangeBegin::AfterUpload { upload, .. } = cursor { + // Because our data are sorted, we can use a binary search to find the UUID + // or to find where it should have been added. Once this position is found, + // we use it to discard the first part of the array. + let idx = match uploads_for_key.binary_search_by(|e| e.uuid.cmp(upload)) { + // we start after the found uuid so we need to discard the pointed value. + // In the worst case, the UUID is the last element, which lead us to an empty array + // but we are never out of bound. + Ok(i) => i + 1, + // if the UUID is not found, the upload may have been discarded between the 2 request, + // this function returns where it could have been inserted, + // the pointed value is thus greater than our marker and we need to keep it. + Err(i) => i, + }; + uploads_for_key = uploads_for_key[idx..].to_vec(); + } + + let mut iter = uploads_for_key.iter(); + + // The first entry is a specific case + // as it changes our result enum type + let first_upload = match iter.next() { + Some(u) => u, + None => { + return ExtractionResult::Extracted { + key: object.key.clone(), + } + } + }; + let first_up_info = UploadInfo { + key: object.key.to_string(), + timestamp: first_upload.timestamp, + }; + if !self.try_insert_entry(first_upload.uuid, first_up_info) { + return ExtractionResult::Filled; + } + + // We can then collect the remaining uploads in a loop + let mut prev_uuid = first_upload.uuid; + for upload in iter { + let up_info = UploadInfo { + key: object.key.to_string(), + timestamp: upload.timestamp, + }; + + // Insert data in our accumulator + // If it is full, return information to paginate. + if !self.try_insert_entry(upload.uuid, up_info) { + return ExtractionResult::FilledAtUpload { + key: object.key.clone(), + upload: prev_uuid, + }; + } + // Update our last added UUID + prev_uuid = upload.uuid; + } + + // We successfully collected all the uploads + ExtractionResult::Extracted { + key: object.key.clone(), + } + } +} + +/* + * Utility functions + */ + +/// This is a stub for Result::into_ok_or_err that is not yet in Rust stable +fn into_ok_or_err<T>(r: Result<T, T>) -> T { + match r { + Ok(r) => r, + Err(r) => r, + } +} + +/// Returns the common prefix of the object given the query prefix and delimiter +fn common_prefix<'a>(object: &'a Object, query: &ListQueryCommon) -> Option<&'a str> { + match &query.delimiter { + Some(delimiter) => object.key[query.prefix.len()..] + .find(delimiter) + .map(|i| &object.key[..query.prefix.len() + i + delimiter.len()]), + None => None, + } +} + +/// URIencode a value if needed +fn uriencode_maybe(s: &str, yes: bool) -> s3_xml::Value { + if yes { + s3_xml::Value(uri_encode(s, true)) + } else { + s3_xml::Value(s.to_string()) + } +} + +/* + * Unit tests of this module + */ +#[cfg(test)] +mod tests { + use super::*; + use garage_model::s3::version_table::*; + use garage_util::*; + use std::iter::FromIterator; + + const TS: u64 = 1641394898314; + + fn bucket() -> Uuid { + Uuid::from([0x42; 32]) + } + + fn query() -> ListMultipartUploadsQuery { + ListMultipartUploadsQuery { + common: ListQueryCommon { + prefix: "".to_string(), + delimiter: Some("/".to_string()), + page_size: 1000, + urlencode_resp: false, + bucket_name: "a".to_string(), + bucket_id: Uuid::from([0x00; 32]), + }, + key_marker: None, + upload_id_marker: None, + } + } + + fn objs() -> Vec<Object> { + vec![ + Object::new( + bucket(), + "a/b/c".to_string(), + vec![objup_version([0x01; 32])], + ), + Object::new(bucket(), "d".to_string(), vec![objup_version([0x01; 32])]), + ] + } + + fn objup_version(uuid: [u8; 32]) -> ObjectVersion { + ObjectVersion { + uuid: Uuid::from(uuid), + timestamp: TS, + state: ObjectVersionState::Uploading(ObjectVersionHeaders { + content_type: "text/plain".to_string(), + other: BTreeMap::<String, String>::new(), + }), + } + } + + #[test] + fn test_common_prefixes() { + let mut query = query(); + let objs = objs(); + + query.common.prefix = "a/".to_string(); + assert_eq!( + common_prefix(objs.get(0).unwrap(), &query.common), + Some("a/b/") + ); + + query.common.prefix = "a/b/".to_string(); + assert_eq!(common_prefix(objs.get(0).unwrap(), &query.common), None); + } + + #[test] + fn test_extract_common_prefix() { + let mut query = query(); + query.common.prefix = "a/".to_string(); + let objs = objs(); + let mut acc = UploadAccumulator::new(query.common.page_size); + + let mut iter = objs.iter().peekable(); + match acc.extract_common_prefix(&mut iter, &query.common) { + Some(ExtractionResult::Extracted { key }) => assert_eq!(key, "a/b/c".to_string()), + _ => panic!("wrong result"), + } + assert_eq!(acc.common_prefixes.len(), 1); + assert_eq!(acc.common_prefixes.iter().next().unwrap(), "a/b/"); + } + + #[test] + fn test_extract_upload() { + let objs = vec![ + Object::new( + bucket(), + "b".to_string(), + vec![ + objup_version([0x01; 32]), + objup_version([0x80; 32]), + objup_version([0x8f; 32]), + objup_version([0xdd; 32]), + ], + ), + Object::new(bucket(), "c".to_string(), vec![]), + ]; + + let mut acc = UploadAccumulator::new(2); + let mut start = RangeBegin::AfterUpload { + key: "b".to_string(), + upload: Uuid::from([0x01; 32]), + }; + + let mut iter = objs.iter().peekable(); + + // Check the case where we skip some uploads + match acc.extract(&(query().common), &start, &mut iter) { + ExtractionResult::FilledAtUpload { key, upload } => { + assert_eq!(key, "b"); + assert_eq!(upload, Uuid::from([0x8f; 32])); + } + _ => panic!("wrong result"), + }; + + assert_eq!(acc.keys.len(), 2); + assert_eq!( + acc.keys.get(&Uuid::from([0x80; 32])).unwrap(), + &UploadInfo { + timestamp: TS, + key: "b".to_string() + } + ); + assert_eq!( + acc.keys.get(&Uuid::from([0x8f; 32])).unwrap(), + &UploadInfo { + timestamp: TS, + key: "b".to_string() + } + ); + + acc = UploadAccumulator::new(2); + start = RangeBegin::AfterUpload { + key: "b".to_string(), + upload: Uuid::from([0xff; 32]), + }; + iter = objs.iter().peekable(); + + // Check the case where we skip all the uploads + match acc.extract(&(query().common), &start, &mut iter) { + ExtractionResult::Extracted { key } if key.as_str() == "b" => (), + _ => panic!("wrong result"), + }; + } + + #[tokio::test] + async fn test_fetch_uploads_no_result() -> Result<(), Error> { + let query = query(); + let mut acc = query.build_accumulator(); + let page = fetch_list_entries( + &query.common, + query.begin()?, + &mut acc, + |_, _, _| async move { Ok(vec![]) }, + ) + .await?; + assert_eq!(page, None); + assert_eq!(acc.common_prefixes.len(), 0); + assert_eq!(acc.keys.len(), 0); + + Ok(()) + } + + #[tokio::test] + async fn test_fetch_uploads_basic() -> Result<(), Error> { + let query = query(); + let mut acc = query.build_accumulator(); + let mut fake_io = |_, _, _| async move { Ok(objs()) }; + let page = + fetch_list_entries(&query.common, query.begin()?, &mut acc, &mut fake_io).await?; + assert_eq!(page, None); + assert_eq!(acc.common_prefixes.len(), 1); + assert_eq!(acc.keys.len(), 1); + assert!(acc.common_prefixes.contains("a/")); + + Ok(()) + } + + #[tokio::test] + async fn test_fetch_uploads_advanced() -> Result<(), Error> { + let mut query = query(); + query.common.page_size = 2; + + let mut fake_io = |_, k: Option<String>, _| async move { + Ok(match k.as_deref() { + Some("") => vec![ + Object::new(bucket(), "b/a".to_string(), vec![objup_version([0x01; 32])]), + Object::new(bucket(), "b/b".to_string(), vec![objup_version([0x01; 32])]), + Object::new(bucket(), "b/c".to_string(), vec![objup_version([0x01; 32])]), + ], + Some("b0") => vec![ + Object::new(bucket(), "c/a".to_string(), vec![objup_version([0x01; 32])]), + Object::new(bucket(), "c/b".to_string(), vec![objup_version([0x01; 32])]), + Object::new(bucket(), "c/c".to_string(), vec![objup_version([0x02; 32])]), + ], + Some("c0") => vec![Object::new( + bucket(), + "d".to_string(), + vec![objup_version([0x01; 32])], + )], + _ => panic!("wrong value {:?}", k), + }) + }; + + let mut acc = query.build_accumulator(); + let page = + fetch_list_entries(&query.common, query.begin()?, &mut acc, &mut fake_io).await?; + assert_eq!( + page, + Some(RangeBegin::IncludingKey { + key: "c0".to_string(), + fallback_key: Some("c/c".to_string()) + }) + ); + assert_eq!(acc.common_prefixes.len(), 2); + assert_eq!(acc.keys.len(), 0); + assert!(acc.common_prefixes.contains("b/")); + assert!(acc.common_prefixes.contains("c/")); + + Ok(()) + } + + fn version() -> Version { + let uuid = Uuid::from([0x08; 32]); + + let blocks = vec![ + ( + VersionBlockKey { + part_number: 1, + offset: 1, + }, + VersionBlock { + hash: uuid, + size: 3, + }, + ), + ( + VersionBlockKey { + part_number: 1, + offset: 2, + }, + VersionBlock { + hash: uuid, + size: 2, + }, + ), + ( + VersionBlockKey { + part_number: 2, + offset: 1, + }, + VersionBlock { + hash: uuid, + size: 8, + }, + ), + ( + VersionBlockKey { + part_number: 5, + offset: 1, + }, + VersionBlock { + hash: uuid, + size: 7, + }, + ), + ( + VersionBlockKey { + part_number: 8, + offset: 1, + }, + VersionBlock { + hash: uuid, + size: 5, + }, + ), + ]; + let etags = vec![ + (1, "etag1".to_string()), + (3, "etag2".to_string()), + (5, "etag3".to_string()), + (8, "etag4".to_string()), + (9, "etag5".to_string()), + ]; + + Version { + bucket_id: uuid, + key: "a".to_string(), + uuid, + deleted: false.into(), + blocks: crdt::Map::<VersionBlockKey, VersionBlock>::from_iter(blocks), + parts_etags: crdt::Map::<u64, String>::from_iter(etags), + } + } + + fn obj() -> Object { + Object::new(bucket(), "d".to_string(), vec![objup_version([0x08; 32])]) + } + + #[test] + fn test_fetch_part_info() -> Result<(), Error> { + let uuid = Uuid::from([0x08; 32]); + let mut query = ListPartsQuery { + bucket_name: "a".to_string(), + bucket_id: uuid, + key: "a".to_string(), + upload_id: "xx".to_string(), + part_number_marker: None, + max_parts: 2, + }; + + assert!( + fetch_part_info(&query, None, None, uuid).is_err(), + "No object and version should fail" + ); + assert!( + fetch_part_info(&query, Some(obj()), None, uuid).is_err(), + "No version should faild" + ); + assert!( + fetch_part_info(&query, None, Some(version()), uuid).is_err(), + "No object should fail" + ); + + // Start from the beginning but with limited size to trigger pagination + let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?; + assert_eq!(pagination.unwrap(), 5); + assert_eq!( + info, + vec![ + PartInfo { + etag: "etag1".to_string(), + timestamp: TS, + part_number: 1, + size: 5 + }, + PartInfo { + etag: "etag3".to_string(), + timestamp: TS, + part_number: 5, + size: 7 + }, + ] + ); + + // Use previous pagination to make a new request + query.part_number_marker = Some(pagination.unwrap()); + let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?; + assert!(pagination.is_none()); + assert_eq!( + info, + vec![PartInfo { + etag: "etag4".to_string(), + timestamp: TS, + part_number: 8, + size: 5 + },] + ); + + // Trying to access a part that is way larger than registered ones + query.part_number_marker = Some(9999); + let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?; + assert!(pagination.is_none()); + assert_eq!(info, vec![]); + + // Try without any limitation + query.max_parts = 1000; + query.part_number_marker = None; + let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?; + assert!(pagination.is_none()); + assert_eq!( + info, + vec![ + PartInfo { + etag: "etag1".to_string(), + timestamp: TS, + part_number: 1, + size: 5 + }, + PartInfo { + etag: "etag3".to_string(), + timestamp: TS, + part_number: 5, + size: 7 + }, + PartInfo { + etag: "etag4".to_string(), + timestamp: TS, + part_number: 8, + size: 5 + }, + ] + ); + + Ok(()) + } +} |