diff options
author | Alex <alex@adnab.me> | 2022-05-10 13:16:57 +0200 |
---|---|---|
committer | Alex <alex@adnab.me> | 2022-05-10 13:16:57 +0200 |
commit | 5768bf362262f78376af14517c4921941986192e (patch) | |
tree | b4baf3051eade0f63649443278bb3a3f4c38ec25 /src/api/s3_list.rs | |
parent | def78c5e6f5da37a0d17b5652c525fbeccbc2e86 (diff) | |
download | garage-5768bf362262f78376af14517c4921941986192e.tar.gz garage-5768bf362262f78376af14517c4921941986192e.zip |
First implementation of K2V (#293)
**Specification:**
View spec at [this URL](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/k2v/doc/drafts/k2v-spec.md)
- [x] Specify the structure of K2V triples
- [x] Specify the DVVS format used for causality detection
- [x] Specify the K2V index (just a counter of number of values per partition key)
- [x] Specify single-item endpoints: ReadItem, InsertItem, DeleteItem
- [x] Specify index endpoint: ReadIndex
- [x] Specify multi-item endpoints: InsertBatch, ReadBatch, DeleteBatch
- [x] Move to JSON objects instead of tuples
- [x] Specify endpoints for polling for updates on single values (PollItem)
**Implementation:**
- [x] Table for K2V items, causal contexts
- [x] Indexing mechanism and table for K2V index
- [x] Make API handlers a bit more generic
- [x] K2V API endpoint
- [x] K2V API router
- [x] ReadItem
- [x] InsertItem
- [x] DeleteItem
- [x] PollItem
- [x] ReadIndex
- [x] InsertBatch
- [x] ReadBatch
- [x] DeleteBatch
**Testing:**
- [x] Just a simple Python script that does some requests to check visually that things are going right (does not contain parsing of results or assertions on returned values)
- [x] Actual tests:
- [x] Adapt testing framework
- [x] Simple test with InsertItem + ReadItem
- [x] Test with several Insert/Read/DeleteItem + ReadIndex
- [x] Test all combinations of return formats for ReadItem
- [x] Test with ReadBatch, InsertBatch, DeleteBatch
- [x] Test with PollItem
- [x] Test error codes
- [ ] Fix most broken stuff
- [x] test PollItem broken randomly
- [x] when invalid causality tokens are given, errors should be 4xx not 5xx
**Improvements:**
- [x] Descending range queries
- [x] Specify
- [x] Implement
- [x] Add test
- [x] Batch updates to index counter
- [x] Put K2V behind `k2v` feature flag
Co-authored-by: Alex Auvolat <alex@adnab.me>
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/293
Co-authored-by: Alex <alex@adnab.me>
Co-committed-by: Alex <alex@adnab.me>
Diffstat (limited to 'src/api/s3_list.rs')
-rw-r--r-- | src/api/s3_list.rs | 1383 |
1 files changed, 0 insertions, 1383 deletions
diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs deleted file mode 100644 index 5852fc1b..00000000 --- a/src/api/s3_list.rs +++ /dev/null @@ -1,1383 +0,0 @@ -use std::cmp::Ordering; -use std::collections::{BTreeMap, BTreeSet}; -use std::iter::{Iterator, Peekable}; -use std::sync::Arc; - -use hyper::{Body, Response}; - -use garage_util::data::*; -use garage_util::error::Error as GarageError; -use garage_util::time::*; - -use garage_model::garage::Garage; -use garage_model::object_table::*; -use garage_model::version_table::Version; - -use garage_table::EmptyKey; - -use crate::encoding::*; -use crate::error::*; -use crate::s3_put; -use crate::s3_xml; - -const DUMMY_NAME: &str = "Dummy Key"; -const DUMMY_KEY: &str = "GKDummyKey"; - -#[derive(Debug)] -pub struct ListQueryCommon { - pub bucket_name: String, - pub bucket_id: Uuid, - pub delimiter: Option<String>, - pub page_size: usize, - pub prefix: String, - pub urlencode_resp: bool, -} - -#[derive(Debug)] -pub struct ListObjectsQuery { - pub is_v2: bool, - pub marker: Option<String>, - pub continuation_token: Option<String>, - pub start_after: Option<String>, - pub common: ListQueryCommon, -} - -#[derive(Debug)] -pub struct ListMultipartUploadsQuery { - pub key_marker: Option<String>, - pub upload_id_marker: Option<String>, - pub common: ListQueryCommon, -} - -#[derive(Debug)] -pub struct ListPartsQuery { - pub bucket_name: String, - pub bucket_id: Uuid, - pub key: String, - pub upload_id: String, - pub part_number_marker: Option<u64>, - pub max_parts: u64, -} - -pub async fn handle_list( - garage: Arc<Garage>, - query: &ListObjectsQuery, -) -> Result<Response<Body>, Error> { - let io = |bucket, key, count| { - let t = &garage.object_table; - async move { - t.get_range(&bucket, key, Some(ObjectFilter::IsData), count) - .await - } - }; - - debug!("ListObjects {:?}", query); - let mut acc = query.build_accumulator(); - let pagination = fetch_list_entries(&query.common, query.begin()?, &mut acc, &io).await?; - - let result = s3_xml::ListBucketResult { - xmlns: (), - // Sending back request information - name: s3_xml::Value(query.common.bucket_name.to_string()), - prefix: uriencode_maybe(&query.common.prefix, query.common.urlencode_resp), - max_keys: s3_xml::IntValue(query.common.page_size as i64), - delimiter: query - .common - .delimiter - .as_ref() - .map(|x| uriencode_maybe(x, query.common.urlencode_resp)), - encoding_type: match query.common.urlencode_resp { - true => Some(s3_xml::Value("url".to_string())), - false => None, - }, - marker: match (!query.is_v2, &query.marker) { - (true, Some(k)) => Some(uriencode_maybe(k, query.common.urlencode_resp)), - _ => None, - }, - start_after: match (query.is_v2, &query.start_after) { - (true, Some(sa)) => Some(uriencode_maybe(sa, query.common.urlencode_resp)), - _ => None, - }, - continuation_token: match (query.is_v2, &query.continuation_token) { - (true, Some(ct)) => Some(s3_xml::Value(ct.to_string())), - _ => None, - }, - - // Pagination - is_truncated: s3_xml::Value(format!("{}", pagination.is_some())), - key_count: Some(s3_xml::IntValue( - acc.keys.len() as i64 + acc.common_prefixes.len() as i64, - )), - next_marker: match (!query.is_v2, &pagination) { - (true, Some(RangeBegin::AfterKey { key: k })) - | ( - true, - Some(RangeBegin::IncludingKey { - fallback_key: Some(k), - .. - }), - ) => Some(uriencode_maybe(k, query.common.urlencode_resp)), - _ => None, - }, - next_continuation_token: match (query.is_v2, &pagination) { - (true, Some(RangeBegin::AfterKey { key })) => Some(s3_xml::Value(format!( - "]{}", - base64::encode(key.as_bytes()) - ))), - (true, Some(RangeBegin::IncludingKey { key, .. })) => Some(s3_xml::Value(format!( - "[{}", - base64::encode(key.as_bytes()) - ))), - _ => None, - }, - - // Body - contents: acc - .keys - .iter() - .map(|(key, info)| s3_xml::ListBucketItem { - key: uriencode_maybe(key, query.common.urlencode_resp), - last_modified: s3_xml::Value(msec_to_rfc3339(info.last_modified)), - size: s3_xml::IntValue(info.size as i64), - etag: s3_xml::Value(format!("\"{}\"", info.etag)), - storage_class: s3_xml::Value("STANDARD".to_string()), - }) - .collect(), - common_prefixes: acc - .common_prefixes - .iter() - .map(|pfx| s3_xml::CommonPrefix { - prefix: uriencode_maybe(pfx, query.common.urlencode_resp), - }) - .collect(), - }; - - let xml = s3_xml::to_xml_with_header(&result)?; - Ok(Response::builder() - .header("Content-Type", "application/xml") - .body(Body::from(xml.into_bytes()))?) -} - -pub async fn handle_list_multipart_upload( - garage: Arc<Garage>, - query: &ListMultipartUploadsQuery, -) -> Result<Response<Body>, Error> { - let io = |bucket, key, count| { - let t = &garage.object_table; - async move { - t.get_range(&bucket, key, Some(ObjectFilter::IsUploading), count) - .await - } - }; - - debug!("ListMultipartUploads {:?}", query); - let mut acc = query.build_accumulator(); - let pagination = fetch_list_entries(&query.common, query.begin()?, &mut acc, &io).await?; - - let result = s3_xml::ListMultipartUploadsResult { - xmlns: (), - - // Sending back some information about the request - bucket: s3_xml::Value(query.common.bucket_name.to_string()), - prefix: uriencode_maybe(&query.common.prefix, query.common.urlencode_resp), - delimiter: query - .common - .delimiter - .as_ref() - .map(|d| uriencode_maybe(d, query.common.urlencode_resp)), - max_uploads: s3_xml::IntValue(query.common.page_size as i64), - key_marker: query - .key_marker - .as_ref() - .map(|m| uriencode_maybe(m, query.common.urlencode_resp)), - upload_id_marker: query - .upload_id_marker - .as_ref() - .map(|m| s3_xml::Value(m.to_string())), - encoding_type: match query.common.urlencode_resp { - true => Some(s3_xml::Value("url".to_string())), - false => None, - }, - - // Handling pagination - is_truncated: s3_xml::Value(format!("{}", pagination.is_some())), - next_key_marker: match &pagination { - None => None, - Some(RangeBegin::AfterKey { key }) - | Some(RangeBegin::AfterUpload { key, .. }) - | Some(RangeBegin::IncludingKey { key, .. }) => { - Some(uriencode_maybe(key, query.common.urlencode_resp)) - } - }, - next_upload_id_marker: match pagination { - Some(RangeBegin::AfterUpload { upload, .. }) => { - Some(s3_xml::Value(hex::encode(upload))) - } - Some(RangeBegin::IncludingKey { .. }) => Some(s3_xml::Value("include".to_string())), - _ => None, - }, - - // Result body - upload: acc - .keys - .iter() - .map(|(uuid, info)| s3_xml::ListMultipartItem { - initiated: s3_xml::Value(msec_to_rfc3339(info.timestamp)), - key: uriencode_maybe(&info.key, query.common.urlencode_resp), - upload_id: s3_xml::Value(hex::encode(uuid)), - storage_class: s3_xml::Value("STANDARD".to_string()), - initiator: s3_xml::Initiator { - display_name: s3_xml::Value(DUMMY_NAME.to_string()), - id: s3_xml::Value(DUMMY_KEY.to_string()), - }, - owner: s3_xml::Owner { - display_name: s3_xml::Value(DUMMY_NAME.to_string()), - id: s3_xml::Value(DUMMY_KEY.to_string()), - }, - }) - .collect(), - common_prefixes: acc - .common_prefixes - .iter() - .map(|c| s3_xml::CommonPrefix { - prefix: s3_xml::Value(c.to_string()), - }) - .collect(), - }; - - let xml = s3_xml::to_xml_with_header(&result)?; - - Ok(Response::builder() - .header("Content-Type", "application/xml") - .body(Body::from(xml.into_bytes()))?) -} - -pub async fn handle_list_parts( - garage: Arc<Garage>, - query: &ListPartsQuery, -) -> Result<Response<Body>, Error> { - debug!("ListParts {:?}", query); - - let upload_id = s3_put::decode_upload_id(&query.upload_id)?; - - let (object, version) = futures::try_join!( - garage.object_table.get(&query.bucket_id, &query.key), - garage.version_table.get(&upload_id, &EmptyKey), - )?; - - let (info, next) = fetch_part_info(query, object, version, upload_id)?; - - let result = s3_xml::ListPartsResult { - xmlns: (), - bucket: s3_xml::Value(query.bucket_name.to_string()), - key: s3_xml::Value(query.key.to_string()), - upload_id: s3_xml::Value(query.upload_id.to_string()), - part_number_marker: query.part_number_marker.map(|e| s3_xml::IntValue(e as i64)), - next_part_number_marker: next.map(|e| s3_xml::IntValue(e as i64)), - max_parts: s3_xml::IntValue(query.max_parts as i64), - is_truncated: s3_xml::Value(next.map(|_| "true").unwrap_or("false").to_string()), - parts: info - .iter() - .map(|part| s3_xml::PartItem { - etag: s3_xml::Value(format!("\"{}\"", part.etag)), - last_modified: s3_xml::Value(msec_to_rfc3339(part.timestamp)), - part_number: s3_xml::IntValue(part.part_number as i64), - size: s3_xml::IntValue(part.size as i64), - }) - .collect(), - initiator: s3_xml::Initiator { - display_name: s3_xml::Value(DUMMY_NAME.to_string()), - id: s3_xml::Value(DUMMY_KEY.to_string()), - }, - owner: s3_xml::Owner { - display_name: s3_xml::Value(DUMMY_NAME.to_string()), - id: s3_xml::Value(DUMMY_KEY.to_string()), - }, - storage_class: s3_xml::Value("STANDARD".to_string()), - }; - - let xml = s3_xml::to_xml_with_header(&result)?; - - Ok(Response::builder() - .header("Content-Type", "application/xml") - .body(Body::from(xml.into_bytes()))?) -} - -/* - * Private enums and structs - */ - -#[derive(Debug)] -struct ObjectInfo { - last_modified: u64, - size: u64, - etag: String, -} - -#[derive(Debug, PartialEq)] -struct UploadInfo { - key: String, - timestamp: u64, -} - -#[derive(Debug, PartialEq)] -struct PartInfo { - etag: String, - timestamp: u64, - part_number: u64, - size: u64, -} - -enum ExtractionResult { - NoMore, - Filled, - FilledAtUpload { - key: String, - upload: Uuid, - }, - Extracted { - key: String, - }, - // Fallback key is used for legacy APIs that only support - // exlusive pagination (and not inclusive one). - SkipTo { - key: String, - fallback_key: Option<String>, - }, -} - -#[derive(PartialEq, Clone, Debug)] -enum RangeBegin { - // Fallback key is used for legacy APIs that only support - // exlusive pagination (and not inclusive one). - IncludingKey { - key: String, - fallback_key: Option<String>, - }, - AfterKey { - key: String, - }, - AfterUpload { - key: String, - upload: Uuid, - }, -} -type Pagination = Option<RangeBegin>; - -/* - * Fetch list entries - */ - -async fn fetch_list_entries<R, F>( - query: &ListQueryCommon, - begin: RangeBegin, - acc: &mut impl ExtractAccumulator, - mut io: F, -) -> Result<Pagination, Error> -where - R: futures::Future<Output = Result<Vec<Object>, GarageError>>, - F: FnMut(Uuid, Option<String>, usize) -> R, -{ - let mut cursor = begin; - // +1 is needed as we may need to skip the 1st key - // (range is inclusive while most S3 requests are exclusive) - let count = query.page_size + 1; - - loop { - let start_key = match cursor { - RangeBegin::AfterKey { ref key } - | RangeBegin::AfterUpload { ref key, .. } - | RangeBegin::IncludingKey { ref key, .. } => Some(key.clone()), - }; - - // Fetch objects - let objects = io(query.bucket_id, start_key.clone(), count).await?; - - debug!( - "List: get range {:?} (max {}), results: {}", - start_key, - count, - objects.len() - ); - let server_more = objects.len() >= count; - - let prev_req_cursor = cursor.clone(); - let mut iter = objects.iter().peekable(); - - // Drop the first key if needed - // Only AfterKey requires it according to the S3 spec and our implem. - match (&cursor, iter.peek()) { - (RangeBegin::AfterKey { key }, Some(object)) if &object.key == key => iter.next(), - (_, _) => None, - }; - - while let Some(object) = iter.peek() { - if !object.key.starts_with(&query.prefix) { - // If the key is not in the requested prefix, we're done - return Ok(None); - } - - cursor = match acc.extract(query, &cursor, &mut iter) { - ExtractionResult::Extracted { key } => RangeBegin::AfterKey { key }, - ExtractionResult::SkipTo { key, fallback_key } => { - RangeBegin::IncludingKey { key, fallback_key } - } - ExtractionResult::FilledAtUpload { key, upload } => { - return Ok(Some(RangeBegin::AfterUpload { key, upload })) - } - ExtractionResult::Filled => return Ok(Some(cursor)), - ExtractionResult::NoMore => return Ok(None), - }; - } - - if !server_more { - // We did not fully fill the accumulator despite exhausting all the data we have, - // we're done - return Ok(None); - } - - if prev_req_cursor == cursor { - unreachable!("No progress has been done in the loop. This is a bug, please report it."); - } - } -} - -fn fetch_part_info( - query: &ListPartsQuery, - object: Option<Object>, - version: Option<Version>, - upload_id: Uuid, -) -> Result<(Vec<PartInfo>, Option<u64>), Error> { - // Check results - let object = object.ok_or(Error::NoSuchKey)?; - - let obj_version = object - .versions() - .iter() - .find(|v| v.uuid == upload_id && v.is_uploading()) - .ok_or(Error::NoSuchUpload)?; - - let version = version.ok_or(Error::NoSuchKey)?; - - // Cut the beginning of our 2 vectors if required - let (etags, blocks) = match &query.part_number_marker { - Some(marker) => { - let next = marker + 1; - - let part_idx = into_ok_or_err( - version - .parts_etags - .items() - .binary_search_by(|(part_num, _)| part_num.cmp(&next)), - ); - let parts = &version.parts_etags.items()[part_idx..]; - - let block_idx = into_ok_or_err( - version - .blocks - .items() - .binary_search_by(|(vkey, _)| vkey.part_number.cmp(&next)), - ); - let blocks = &version.blocks.items()[block_idx..]; - - (parts, blocks) - } - None => (version.parts_etags.items(), version.blocks.items()), - }; - - // Use the block vector to compute a (part_number, size) vector - let mut size = Vec::<(u64, u64)>::new(); - blocks.iter().for_each(|(key, val)| { - let mut new_size = val.size; - match size.pop() { - Some((part_number, size)) if part_number == key.part_number => new_size += size, - Some(v) => size.push(v), - None => (), - } - size.push((key.part_number, new_size)) - }); - - // Merge the etag vector and size vector to build a PartInfo vector - let max_parts = query.max_parts as usize; - let (mut etag_iter, mut size_iter) = (etags.iter().peekable(), size.iter().peekable()); - - let mut info = Vec::<PartInfo>::with_capacity(max_parts); - - while info.len() < max_parts { - match (etag_iter.peek(), size_iter.peek()) { - (Some((ep, etag)), Some((sp, size))) => match ep.cmp(sp) { - Ordering::Less => { - debug!("ETag information ignored due to missing corresponding block information. Query: {:?}", query); - etag_iter.next(); - } - Ordering::Equal => { - info.push(PartInfo { - etag: etag.to_string(), - timestamp: obj_version.timestamp, - part_number: *ep, - size: *size, - }); - etag_iter.next(); - size_iter.next(); - } - Ordering::Greater => { - debug!("Block information ignored due to missing corresponding ETag information. Query: {:?}", query); - size_iter.next(); - } - }, - (None, None) => return Ok((info, None)), - _ => { - debug!( - "Additional block or ETag information ignored. Query: {:?}", - query - ); - return Ok((info, None)); - } - } - } - - match info.last() { - Some(part_info) => { - let pagination = Some(part_info.part_number); - Ok((info, pagination)) - } - None => Ok((info, None)), - } -} - -/* - * ListQuery logic - */ - -/// Determine the key from where we want to start fetch objects from the database -/// -/// We choose whether the object at this key must -/// be included or excluded from the response. -/// This key can be the prefix in the base case, or intermediate -/// points in the dataset if we are continuing a previous listing. -impl ListObjectsQuery { - fn build_accumulator(&self) -> Accumulator<String, ObjectInfo> { - Accumulator::<String, ObjectInfo>::new(self.common.page_size) - } - - fn begin(&self) -> Result<RangeBegin, Error> { - if self.is_v2 { - match (&self.continuation_token, &self.start_after) { - // In V2 mode, the continuation token is defined as an opaque - // string in the spec, so we can do whatever we want with it. - // In our case, it is defined as either [ or ] (for include - // representing the key to start with. - (Some(token), _) => match &token[..1] { - "[" => Ok(RangeBegin::IncludingKey { - key: String::from_utf8(base64::decode(token[1..].as_bytes())?)?, - fallback_key: None, - }), - "]" => Ok(RangeBegin::AfterKey { - key: String::from_utf8(base64::decode(token[1..].as_bytes())?)?, - }), - _ => Err(Error::BadRequest("Invalid continuation token".to_string())), - }, - - // StartAfter has defined semantics in the spec: - // start listing at the first key immediately after. - (_, Some(key)) => Ok(RangeBegin::AfterKey { - key: key.to_string(), - }), - - // In the case where neither is specified, we start - // listing at the specified prefix. If an object has this - // exact same key, we include it. (@TODO is this correct?) - _ => Ok(RangeBegin::IncludingKey { - key: self.common.prefix.to_string(), - fallback_key: None, - }), - } - } else { - match &self.marker { - // In V1 mode, the spec defines the Marker value to mean - // the same thing as the StartAfter value in V2 mode. - Some(key) => Ok(RangeBegin::AfterKey { - key: key.to_string(), - }), - _ => Ok(RangeBegin::IncludingKey { - key: self.common.prefix.to_string(), - fallback_key: None, - }), - } - } - } -} - -impl ListMultipartUploadsQuery { - fn build_accumulator(&self) -> Accumulator<Uuid, UploadInfo> { - Accumulator::<Uuid, UploadInfo>::new(self.common.page_size) - } - - fn begin(&self) -> Result<RangeBegin, Error> { - match (&self.upload_id_marker, &self.key_marker) { - // If both the upload id marker and the key marker are sets, - // the spec specifies that we must start listing uploads INCLUDING the given key, - // AFTER the specified upload id (sorted in a lexicographic order). - // To enable some optimisations, we emulate "IncludingKey" by extending the upload id - // semantic. We base our reasoning on the hypothesis that S3's upload ids are opaques - // while Garage's ones are 32 bytes hex encoded which enables us to extend this query - // with a specific "include" upload id. - (Some(up_marker), Some(key_marker)) => match &up_marker[..] { - "include" => Ok(RangeBegin::IncludingKey { - key: key_marker.to_string(), - fallback_key: None, - }), - uuid => Ok(RangeBegin::AfterUpload { - key: key_marker.to_string(), - upload: s3_put::decode_upload_id(uuid)?, - }), - }, - - // If only the key marker is specified, the spec says that we must start listing - // uploads AFTER the specified key. - (None, Some(key_marker)) => Ok(RangeBegin::AfterKey { - key: key_marker.to_string(), - }), - _ => Ok(RangeBegin::IncludingKey { - key: self.common.prefix.to_string(), - fallback_key: None, - }), - } - } -} - -/* - * Accumulator logic - */ - -trait ExtractAccumulator { - fn extract<'a>( - &mut self, - query: &ListQueryCommon, - cursor: &RangeBegin, - iter: &mut Peekable<impl Iterator<Item = &'a Object>>, - ) -> ExtractionResult; -} - -struct Accumulator<K, V> { - common_prefixes: BTreeSet<String>, - keys: BTreeMap<K, V>, - max_capacity: usize, -} - -type ObjectAccumulator = Accumulator<String, ObjectInfo>; -type UploadAccumulator = Accumulator<Uuid, UploadInfo>; - -impl<K: std::cmp::Ord, V> Accumulator<K, V> { - fn new(page_size: usize) -> Accumulator<K, V> { - Accumulator { - common_prefixes: BTreeSet::<String>::new(), - keys: BTreeMap::<K, V>::new(), - max_capacity: page_size, - } - } - - /// Observe the Object iterator and try to extract a single common prefix - /// - /// This function can consume an arbitrary number of items as long as they share the same - /// common prefix. - fn extract_common_prefix<'a>( - &mut self, - objects: &mut Peekable<impl Iterator<Item = &'a Object>>, - query: &ListQueryCommon, - ) -> Option<ExtractionResult> { - // Get the next object from the iterator - let object = objects.peek().expect("This iterator can not be empty as it is checked earlier in the code. This is a logic bug, please report it."); - - // Check if this is a common prefix (requires a passed delimiter and its value in the key) - let pfx = match common_prefix(object, query) { - Some(p) => p, - None => return None, - }; - - // Try to register this prefix - // If not possible, we can return early - if !self.try_insert_common_prefix(pfx.to_string()) { - return Some(ExtractionResult::Filled); - } - - // We consume the whole common prefix from the iterator - let mut last_pfx_key = &object.key; - loop { - last_pfx_key = match objects.peek() { - Some(o) if o.key.starts_with(pfx) => &o.key, - Some(_) => { - return Some(ExtractionResult::Extracted { - key: last_pfx_key.to_owned(), - }) - } - None => { - return match key_after_prefix(pfx) { - Some(next) => Some(ExtractionResult::SkipTo { - key: next, - fallback_key: Some(last_pfx_key.to_owned()), - }), - None => Some(ExtractionResult::NoMore), - } - } - }; - - objects.next(); - } - } - - fn is_full(&mut self) -> bool { - self.keys.len() + self.common_prefixes.len() >= self.max_capacity - } - - fn try_insert_common_prefix(&mut self, key: String) -> bool { - // If we already have an entry, we can continue - if self.common_prefixes.contains(&key) { - return true; - } - - // Otherwise, we need to check if we can add it - match self.is_full() { - true => false, - false => { - self.common_prefixes.insert(key); - true - } - } - } - - fn try_insert_entry(&mut self, key: K, value: V) -> bool { - // It is impossible to add twice a key, this is an error - assert!(!self.keys.contains_key(&key)); - - match self.is_full() { - true => false, - false => { - self.keys.insert(key, value); - true - } - } - } -} - -impl ExtractAccumulator for ObjectAccumulator { - fn extract<'a>( - &mut self, - query: &ListQueryCommon, - _cursor: &RangeBegin, - objects: &mut Peekable<impl Iterator<Item = &'a Object>>, - ) -> ExtractionResult { - if let Some(e) = self.extract_common_prefix(objects, query) { - return e; - } - - let object = objects.next().expect("This iterator can not be empty as it is checked earlier in the code. This is a logic bug, please report it."); - - let version = match object.versions().iter().find(|x| x.is_data()) { - Some(v) => v, - None => unreachable!( - "Expect to have objects having data due to earlier filtering. This is a logic bug." - ), - }; - - let meta = match &version.state { - ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta, - ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta, - _ => unreachable!(), - }; - let info = ObjectInfo { - last_modified: version.timestamp, - size: meta.size, - etag: meta.etag.to_string(), - }; - - match self.try_insert_entry(object.key.clone(), info) { - true => ExtractionResult::Extracted { - key: object.key.clone(), - }, - false => ExtractionResult::Filled, - } - } -} - -impl ExtractAccumulator for UploadAccumulator { - /// Observe the iterator, process a single key, and try to extract one or more upload entries - /// - /// This function processes a single object from the iterator that can contain an arbitrary - /// number of versions, and thus "uploads". - fn extract<'a>( - &mut self, - query: &ListQueryCommon, - cursor: &RangeBegin, - objects: &mut Peekable<impl Iterator<Item = &'a Object>>, - ) -> ExtractionResult { - if let Some(e) = self.extract_common_prefix(objects, query) { - return e; - } - - // Get the next object from the iterator - let object = objects.next().expect("This iterator can not be empty as it is checked earlier in the code. This is a logic bug, please report it."); - - let mut uploads_for_key = object - .versions() - .iter() - .filter(|x| x.is_uploading()) - .collect::<Vec<&ObjectVersion>>(); - - // S3 logic requires lexicographically sorted upload ids. - uploads_for_key.sort_unstable_by_key(|e| e.uuid); - - // Skip results if an upload marker is provided - if let RangeBegin::AfterUpload { upload, .. } = cursor { - // Because our data are sorted, we can use a binary search to find the UUID - // or to find where it should have been added. Once this position is found, - // we use it to discard the first part of the array. - let idx = match uploads_for_key.binary_search_by(|e| e.uuid.cmp(upload)) { - // we start after the found uuid so we need to discard the pointed value. - // In the worst case, the UUID is the last element, which lead us to an empty array - // but we are never out of bound. - Ok(i) => i + 1, - // if the UUID is not found, the upload may have been discarded between the 2 request, - // this function returns where it could have been inserted, - // the pointed value is thus greater than our marker and we need to keep it. - Err(i) => i, - }; - uploads_for_key = uploads_for_key[idx..].to_vec(); - } - - let mut iter = uploads_for_key.iter(); - - // The first entry is a specific case - // as it changes our result enum type - let first_upload = match iter.next() { - Some(u) => u, - None => { - return ExtractionResult::Extracted { - key: object.key.clone(), - } - } - }; - let first_up_info = UploadInfo { - key: object.key.to_string(), - timestamp: first_upload.timestamp, - }; - if !self.try_insert_entry(first_upload.uuid, first_up_info) { - return ExtractionResult::Filled; - } - - // We can then collect the remaining uploads in a loop - let mut prev_uuid = first_upload.uuid; - for upload in iter { - let up_info = UploadInfo { - key: object.key.to_string(), - timestamp: upload.timestamp, - }; - - // Insert data in our accumulator - // If it is full, return information to paginate. - if !self.try_insert_entry(upload.uuid, up_info) { - return ExtractionResult::FilledAtUpload { - key: object.key.clone(), - upload: prev_uuid, - }; - } - // Update our last added UUID - prev_uuid = upload.uuid; - } - - // We successfully collected all the uploads - ExtractionResult::Extracted { - key: object.key.clone(), - } - } -} - -/* - * Utility functions - */ - -/// This is a stub for Result::into_ok_or_err that is not yet in Rust stable -fn into_ok_or_err<T>(r: Result<T, T>) -> T { - match r { - Ok(r) => r, - Err(r) => r, - } -} - -/// Returns the common prefix of the object given the query prefix and delimiter -fn common_prefix<'a>(object: &'a Object, query: &ListQueryCommon) -> Option<&'a str> { - match &query.delimiter { - Some(delimiter) => object.key[query.prefix.len()..] - .find(delimiter) - .map(|i| &object.key[..query.prefix.len() + i + delimiter.len()]), - None => None, - } -} - -/// URIencode a value if needed -fn uriencode_maybe(s: &str, yes: bool) -> s3_xml::Value { - if yes { - s3_xml::Value(uri_encode(s, true)) - } else { - s3_xml::Value(s.to_string()) - } -} - -const UTF8_BEFORE_LAST_CHAR: char = '\u{10FFFE}'; - -/// Compute the key after the prefix -fn key_after_prefix(pfx: &str) -> Option<String> { - let mut next = pfx.to_string(); - while !next.is_empty() { - let tail = next.pop().unwrap(); - if tail >= char::MAX { - continue; - } - - // Circumvent a limitation of RangeFrom that overflow earlier than needed - // See: https://doc.rust-lang.org/core/ops/struct.RangeFrom.html - let new_tail = if tail == UTF8_BEFORE_LAST_CHAR { - char::MAX - } else { - (tail..).nth(1).unwrap() - }; - - next.push(new_tail); - return Some(next); - } - - None -} - -/* - * Unit tests of this module - */ -#[cfg(test)] -mod tests { - use super::*; - use garage_model::version_table::*; - use garage_util::*; - use std::iter::FromIterator; - - const TS: u64 = 1641394898314; - - fn bucket() -> Uuid { - Uuid::from([0x42; 32]) - } - - fn query() -> ListMultipartUploadsQuery { - ListMultipartUploadsQuery { - common: ListQueryCommon { - prefix: "".to_string(), - delimiter: Some("/".to_string()), - page_size: 1000, - urlencode_resp: false, - bucket_name: "a".to_string(), - bucket_id: Uuid::from([0x00; 32]), - }, - key_marker: None, - upload_id_marker: None, - } - } - - fn objs() -> Vec<Object> { - vec![ - Object::new( - bucket(), - "a/b/c".to_string(), - vec![objup_version([0x01; 32])], - ), - Object::new(bucket(), "d".to_string(), vec![objup_version([0x01; 32])]), - ] - } - - fn objup_version(uuid: [u8; 32]) -> ObjectVersion { - ObjectVersion { - uuid: Uuid::from(uuid), - timestamp: TS, - state: ObjectVersionState::Uploading(ObjectVersionHeaders { - content_type: "text/plain".to_string(), - other: BTreeMap::<String, String>::new(), - }), - } - } - - #[test] - fn test_key_after_prefix() { - assert_eq!(UTF8_BEFORE_LAST_CHAR as u32, (char::MAX as u32) - 1); - assert_eq!(key_after_prefix("a/b/").unwrap().as_str(), "a/b0"); - assert_eq!(key_after_prefix("€").unwrap().as_str(), "₭"); - assert_eq!( - key_after_prefix("").unwrap().as_str(), - String::from(char::from_u32(0x10FFFE).unwrap()) - ); - - // When the last character is the biggest UTF8 char - let a = String::from_iter(['a', char::MAX].iter()); - assert_eq!(key_after_prefix(a.as_str()).unwrap().as_str(), "b"); - - // When all characters are the biggest UTF8 char - let b = String::from_iter([char::MAX; 3].iter()); - assert!(key_after_prefix(b.as_str()).is_none()); - - // Check utf8 surrogates - let c = String::from('\u{D7FF}'); - assert_eq!( - key_after_prefix(c.as_str()).unwrap().as_str(), - String::from('\u{E000}') - ); - - // Check the character before the biggest one - let d = String::from('\u{10FFFE}'); - assert_eq!( - key_after_prefix(d.as_str()).unwrap().as_str(), - String::from(char::MAX) - ); - } - - #[test] - fn test_common_prefixes() { - let mut query = query(); - let objs = objs(); - - query.common.prefix = "a/".to_string(); - assert_eq!( - common_prefix(objs.get(0).unwrap(), &query.common), - Some("a/b/") - ); - - query.common.prefix = "a/b/".to_string(); - assert_eq!(common_prefix(objs.get(0).unwrap(), &query.common), None); - } - - #[test] - fn test_extract_common_prefix() { - let mut query = query(); - query.common.prefix = "a/".to_string(); - let objs = objs(); - let mut acc = UploadAccumulator::new(query.common.page_size); - - let mut iter = objs.iter().peekable(); - match acc.extract_common_prefix(&mut iter, &query.common) { - Some(ExtractionResult::Extracted { key }) => assert_eq!(key, "a/b/c".to_string()), - _ => panic!("wrong result"), - } - assert_eq!(acc.common_prefixes.len(), 1); - assert_eq!(acc.common_prefixes.iter().next().unwrap(), "a/b/"); - } - - #[test] - fn test_extract_upload() { - let objs = vec![ - Object::new( - bucket(), - "b".to_string(), - vec![ - objup_version([0x01; 32]), - objup_version([0x80; 32]), - objup_version([0x8f; 32]), - objup_version([0xdd; 32]), - ], - ), - Object::new(bucket(), "c".to_string(), vec![]), - ]; - - let mut acc = UploadAccumulator::new(2); - let mut start = RangeBegin::AfterUpload { - key: "b".to_string(), - upload: Uuid::from([0x01; 32]), - }; - - let mut iter = objs.iter().peekable(); - - // Check the case where we skip some uploads - match acc.extract(&(query().common), &start, &mut iter) { - ExtractionResult::FilledAtUpload { key, upload } => { - assert_eq!(key, "b"); - assert_eq!(upload, Uuid::from([0x8f; 32])); - } - _ => panic!("wrong result"), - }; - - assert_eq!(acc.keys.len(), 2); - assert_eq!( - acc.keys.get(&Uuid::from([0x80; 32])).unwrap(), - &UploadInfo { - timestamp: TS, - key: "b".to_string() - } - ); - assert_eq!( - acc.keys.get(&Uuid::from([0x8f; 32])).unwrap(), - &UploadInfo { - timestamp: TS, - key: "b".to_string() - } - ); - - acc = UploadAccumulator::new(2); - start = RangeBegin::AfterUpload { - key: "b".to_string(), - upload: Uuid::from([0xff; 32]), - }; - iter = objs.iter().peekable(); - - // Check the case where we skip all the uploads - match acc.extract(&(query().common), &start, &mut iter) { - ExtractionResult::Extracted { key } if key.as_str() == "b" => (), - _ => panic!("wrong result"), - }; - } - - #[tokio::test] - async fn test_fetch_uploads_no_result() -> Result<(), Error> { - let query = query(); - let mut acc = query.build_accumulator(); - let page = fetch_list_entries( - &query.common, - query.begin()?, - &mut acc, - |_, _, _| async move { Ok(vec![]) }, - ) - .await?; - assert_eq!(page, None); - assert_eq!(acc.common_prefixes.len(), 0); - assert_eq!(acc.keys.len(), 0); - - Ok(()) - } - - #[tokio::test] - async fn test_fetch_uploads_basic() -> Result<(), Error> { - let query = query(); - let mut acc = query.build_accumulator(); - let mut fake_io = |_, _, _| async move { Ok(objs()) }; - let page = - fetch_list_entries(&query.common, query.begin()?, &mut acc, &mut fake_io).await?; - assert_eq!(page, None); - assert_eq!(acc.common_prefixes.len(), 1); - assert_eq!(acc.keys.len(), 1); - assert!(acc.common_prefixes.contains("a/")); - - Ok(()) - } - - #[tokio::test] - async fn test_fetch_uploads_advanced() -> Result<(), Error> { - let mut query = query(); - query.common.page_size = 2; - - let mut fake_io = |_, k: Option<String>, _| async move { - Ok(match k.as_deref() { - Some("") => vec![ - Object::new(bucket(), "b/a".to_string(), vec![objup_version([0x01; 32])]), - Object::new(bucket(), "b/b".to_string(), vec![objup_version([0x01; 32])]), - Object::new(bucket(), "b/c".to_string(), vec![objup_version([0x01; 32])]), - ], - Some("b0") => vec![ - Object::new(bucket(), "c/a".to_string(), vec![objup_version([0x01; 32])]), - Object::new(bucket(), "c/b".to_string(), vec![objup_version([0x01; 32])]), - Object::new(bucket(), "c/c".to_string(), vec![objup_version([0x02; 32])]), - ], - Some("c0") => vec![Object::new( - bucket(), - "d".to_string(), - vec![objup_version([0x01; 32])], - )], - _ => panic!("wrong value {:?}", k), - }) - }; - - let mut acc = query.build_accumulator(); - let page = - fetch_list_entries(&query.common, query.begin()?, &mut acc, &mut fake_io).await?; - assert_eq!( - page, - Some(RangeBegin::IncludingKey { - key: "c0".to_string(), - fallback_key: Some("c/c".to_string()) - }) - ); - assert_eq!(acc.common_prefixes.len(), 2); - assert_eq!(acc.keys.len(), 0); - assert!(acc.common_prefixes.contains("b/")); - assert!(acc.common_prefixes.contains("c/")); - - Ok(()) - } - - fn version() -> Version { - let uuid = Uuid::from([0x08; 32]); - - let blocks = vec![ - ( - VersionBlockKey { - part_number: 1, - offset: 1, - }, - VersionBlock { - hash: uuid, - size: 3, - }, - ), - ( - VersionBlockKey { - part_number: 1, - offset: 2, - }, - VersionBlock { - hash: uuid, - size: 2, - }, - ), - ( - VersionBlockKey { - part_number: 2, - offset: 1, - }, - VersionBlock { - hash: uuid, - size: 8, - }, - ), - ( - VersionBlockKey { - part_number: 5, - offset: 1, - }, - VersionBlock { - hash: uuid, - size: 7, - }, - ), - ( - VersionBlockKey { - part_number: 8, - offset: 1, - }, - VersionBlock { - hash: uuid, - size: 5, - }, - ), - ]; - let etags = vec![ - (1, "etag1".to_string()), - (3, "etag2".to_string()), - (5, "etag3".to_string()), - (8, "etag4".to_string()), - (9, "etag5".to_string()), - ]; - - Version { - bucket_id: uuid, - key: "a".to_string(), - uuid, - deleted: false.into(), - blocks: crdt::Map::<VersionBlockKey, VersionBlock>::from_iter(blocks), - parts_etags: crdt::Map::<u64, String>::from_iter(etags), - } - } - - fn obj() -> Object { - Object::new(bucket(), "d".to_string(), vec![objup_version([0x08; 32])]) - } - - #[test] - fn test_fetch_part_info() -> Result<(), Error> { - let uuid = Uuid::from([0x08; 32]); - let mut query = ListPartsQuery { - bucket_name: "a".to_string(), - bucket_id: uuid, - key: "a".to_string(), - upload_id: "xx".to_string(), - part_number_marker: None, - max_parts: 2, - }; - - assert!( - fetch_part_info(&query, None, None, uuid).is_err(), - "No object and version should fail" - ); - assert!( - fetch_part_info(&query, Some(obj()), None, uuid).is_err(), - "No version should faild" - ); - assert!( - fetch_part_info(&query, None, Some(version()), uuid).is_err(), - "No object should fail" - ); - - // Start from the beginning but with limited size to trigger pagination - let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?; - assert_eq!(pagination.unwrap(), 5); - assert_eq!( - info, - vec![ - PartInfo { - etag: "etag1".to_string(), - timestamp: TS, - part_number: 1, - size: 5 - }, - PartInfo { - etag: "etag3".to_string(), - timestamp: TS, - part_number: 5, - size: 7 - }, - ] - ); - - // Use previous pagination to make a new request - query.part_number_marker = Some(pagination.unwrap()); - let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?; - assert!(pagination.is_none()); - assert_eq!( - info, - vec![PartInfo { - etag: "etag4".to_string(), - timestamp: TS, - part_number: 8, - size: 5 - },] - ); - - // Trying to access a part that is way larger than registered ones - query.part_number_marker = Some(9999); - let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?; - assert!(pagination.is_none()); - assert_eq!(info, vec![]); - - // Try without any limitation - query.max_parts = 1000; - query.part_number_marker = None; - let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?; - assert!(pagination.is_none()); - assert_eq!( - info, - vec![ - PartInfo { - etag: "etag1".to_string(), - timestamp: TS, - part_number: 1, - size: 5 - }, - PartInfo { - etag: "etag3".to_string(), - timestamp: TS, - part_number: 5, - size: 7 - }, - PartInfo { - etag: "etag4".to_string(), - timestamp: TS, - part_number: 8, - size: 5 - }, - ] - ); - - Ok(()) - } -} |