aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3_list.rs
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2022-05-10 13:16:57 +0200
committerAlex <alex@adnab.me>2022-05-10 13:16:57 +0200
commit5768bf362262f78376af14517c4921941986192e (patch)
treeb4baf3051eade0f63649443278bb3a3f4c38ec25 /src/api/s3_list.rs
parentdef78c5e6f5da37a0d17b5652c525fbeccbc2e86 (diff)
downloadgarage-5768bf362262f78376af14517c4921941986192e.tar.gz
garage-5768bf362262f78376af14517c4921941986192e.zip
First implementation of K2V (#293)
**Specification:** View spec at [this URL](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/k2v/doc/drafts/k2v-spec.md) - [x] Specify the structure of K2V triples - [x] Specify the DVVS format used for causality detection - [x] Specify the K2V index (just a counter of number of values per partition key) - [x] Specify single-item endpoints: ReadItem, InsertItem, DeleteItem - [x] Specify index endpoint: ReadIndex - [x] Specify multi-item endpoints: InsertBatch, ReadBatch, DeleteBatch - [x] Move to JSON objects instead of tuples - [x] Specify endpoints for polling for updates on single values (PollItem) **Implementation:** - [x] Table for K2V items, causal contexts - [x] Indexing mechanism and table for K2V index - [x] Make API handlers a bit more generic - [x] K2V API endpoint - [x] K2V API router - [x] ReadItem - [x] InsertItem - [x] DeleteItem - [x] PollItem - [x] ReadIndex - [x] InsertBatch - [x] ReadBatch - [x] DeleteBatch **Testing:** - [x] Just a simple Python script that does some requests to check visually that things are going right (does not contain parsing of results or assertions on returned values) - [x] Actual tests: - [x] Adapt testing framework - [x] Simple test with InsertItem + ReadItem - [x] Test with several Insert/Read/DeleteItem + ReadIndex - [x] Test all combinations of return formats for ReadItem - [x] Test with ReadBatch, InsertBatch, DeleteBatch - [x] Test with PollItem - [x] Test error codes - [ ] Fix most broken stuff - [x] test PollItem broken randomly - [x] when invalid causality tokens are given, errors should be 4xx not 5xx **Improvements:** - [x] Descending range queries - [x] Specify - [x] Implement - [x] Add test - [x] Batch updates to index counter - [x] Put K2V behind `k2v` feature flag Co-authored-by: Alex Auvolat <alex@adnab.me> Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/293 Co-authored-by: Alex <alex@adnab.me> Co-committed-by: Alex <alex@adnab.me>
Diffstat (limited to 'src/api/s3_list.rs')
-rw-r--r--src/api/s3_list.rs1383
1 files changed, 0 insertions, 1383 deletions
diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs
deleted file mode 100644
index 5852fc1b..00000000
--- a/src/api/s3_list.rs
+++ /dev/null
@@ -1,1383 +0,0 @@
-use std::cmp::Ordering;
-use std::collections::{BTreeMap, BTreeSet};
-use std::iter::{Iterator, Peekable};
-use std::sync::Arc;
-
-use hyper::{Body, Response};
-
-use garage_util::data::*;
-use garage_util::error::Error as GarageError;
-use garage_util::time::*;
-
-use garage_model::garage::Garage;
-use garage_model::object_table::*;
-use garage_model::version_table::Version;
-
-use garage_table::EmptyKey;
-
-use crate::encoding::*;
-use crate::error::*;
-use crate::s3_put;
-use crate::s3_xml;
-
-const DUMMY_NAME: &str = "Dummy Key";
-const DUMMY_KEY: &str = "GKDummyKey";
-
-#[derive(Debug)]
-pub struct ListQueryCommon {
- pub bucket_name: String,
- pub bucket_id: Uuid,
- pub delimiter: Option<String>,
- pub page_size: usize,
- pub prefix: String,
- pub urlencode_resp: bool,
-}
-
-#[derive(Debug)]
-pub struct ListObjectsQuery {
- pub is_v2: bool,
- pub marker: Option<String>,
- pub continuation_token: Option<String>,
- pub start_after: Option<String>,
- pub common: ListQueryCommon,
-}
-
-#[derive(Debug)]
-pub struct ListMultipartUploadsQuery {
- pub key_marker: Option<String>,
- pub upload_id_marker: Option<String>,
- pub common: ListQueryCommon,
-}
-
-#[derive(Debug)]
-pub struct ListPartsQuery {
- pub bucket_name: String,
- pub bucket_id: Uuid,
- pub key: String,
- pub upload_id: String,
- pub part_number_marker: Option<u64>,
- pub max_parts: u64,
-}
-
-pub async fn handle_list(
- garage: Arc<Garage>,
- query: &ListObjectsQuery,
-) -> Result<Response<Body>, Error> {
- let io = |bucket, key, count| {
- let t = &garage.object_table;
- async move {
- t.get_range(&bucket, key, Some(ObjectFilter::IsData), count)
- .await
- }
- };
-
- debug!("ListObjects {:?}", query);
- let mut acc = query.build_accumulator();
- let pagination = fetch_list_entries(&query.common, query.begin()?, &mut acc, &io).await?;
-
- let result = s3_xml::ListBucketResult {
- xmlns: (),
- // Sending back request information
- name: s3_xml::Value(query.common.bucket_name.to_string()),
- prefix: uriencode_maybe(&query.common.prefix, query.common.urlencode_resp),
- max_keys: s3_xml::IntValue(query.common.page_size as i64),
- delimiter: query
- .common
- .delimiter
- .as_ref()
- .map(|x| uriencode_maybe(x, query.common.urlencode_resp)),
- encoding_type: match query.common.urlencode_resp {
- true => Some(s3_xml::Value("url".to_string())),
- false => None,
- },
- marker: match (!query.is_v2, &query.marker) {
- (true, Some(k)) => Some(uriencode_maybe(k, query.common.urlencode_resp)),
- _ => None,
- },
- start_after: match (query.is_v2, &query.start_after) {
- (true, Some(sa)) => Some(uriencode_maybe(sa, query.common.urlencode_resp)),
- _ => None,
- },
- continuation_token: match (query.is_v2, &query.continuation_token) {
- (true, Some(ct)) => Some(s3_xml::Value(ct.to_string())),
- _ => None,
- },
-
- // Pagination
- is_truncated: s3_xml::Value(format!("{}", pagination.is_some())),
- key_count: Some(s3_xml::IntValue(
- acc.keys.len() as i64 + acc.common_prefixes.len() as i64,
- )),
- next_marker: match (!query.is_v2, &pagination) {
- (true, Some(RangeBegin::AfterKey { key: k }))
- | (
- true,
- Some(RangeBegin::IncludingKey {
- fallback_key: Some(k),
- ..
- }),
- ) => Some(uriencode_maybe(k, query.common.urlencode_resp)),
- _ => None,
- },
- next_continuation_token: match (query.is_v2, &pagination) {
- (true, Some(RangeBegin::AfterKey { key })) => Some(s3_xml::Value(format!(
- "]{}",
- base64::encode(key.as_bytes())
- ))),
- (true, Some(RangeBegin::IncludingKey { key, .. })) => Some(s3_xml::Value(format!(
- "[{}",
- base64::encode(key.as_bytes())
- ))),
- _ => None,
- },
-
- // Body
- contents: acc
- .keys
- .iter()
- .map(|(key, info)| s3_xml::ListBucketItem {
- key: uriencode_maybe(key, query.common.urlencode_resp),
- last_modified: s3_xml::Value(msec_to_rfc3339(info.last_modified)),
- size: s3_xml::IntValue(info.size as i64),
- etag: s3_xml::Value(format!("\"{}\"", info.etag)),
- storage_class: s3_xml::Value("STANDARD".to_string()),
- })
- .collect(),
- common_prefixes: acc
- .common_prefixes
- .iter()
- .map(|pfx| s3_xml::CommonPrefix {
- prefix: uriencode_maybe(pfx, query.common.urlencode_resp),
- })
- .collect(),
- };
-
- let xml = s3_xml::to_xml_with_header(&result)?;
- Ok(Response::builder()
- .header("Content-Type", "application/xml")
- .body(Body::from(xml.into_bytes()))?)
-}
-
-pub async fn handle_list_multipart_upload(
- garage: Arc<Garage>,
- query: &ListMultipartUploadsQuery,
-) -> Result<Response<Body>, Error> {
- let io = |bucket, key, count| {
- let t = &garage.object_table;
- async move {
- t.get_range(&bucket, key, Some(ObjectFilter::IsUploading), count)
- .await
- }
- };
-
- debug!("ListMultipartUploads {:?}", query);
- let mut acc = query.build_accumulator();
- let pagination = fetch_list_entries(&query.common, query.begin()?, &mut acc, &io).await?;
-
- let result = s3_xml::ListMultipartUploadsResult {
- xmlns: (),
-
- // Sending back some information about the request
- bucket: s3_xml::Value(query.common.bucket_name.to_string()),
- prefix: uriencode_maybe(&query.common.prefix, query.common.urlencode_resp),
- delimiter: query
- .common
- .delimiter
- .as_ref()
- .map(|d| uriencode_maybe(d, query.common.urlencode_resp)),
- max_uploads: s3_xml::IntValue(query.common.page_size as i64),
- key_marker: query
- .key_marker
- .as_ref()
- .map(|m| uriencode_maybe(m, query.common.urlencode_resp)),
- upload_id_marker: query
- .upload_id_marker
- .as_ref()
- .map(|m| s3_xml::Value(m.to_string())),
- encoding_type: match query.common.urlencode_resp {
- true => Some(s3_xml::Value("url".to_string())),
- false => None,
- },
-
- // Handling pagination
- is_truncated: s3_xml::Value(format!("{}", pagination.is_some())),
- next_key_marker: match &pagination {
- None => None,
- Some(RangeBegin::AfterKey { key })
- | Some(RangeBegin::AfterUpload { key, .. })
- | Some(RangeBegin::IncludingKey { key, .. }) => {
- Some(uriencode_maybe(key, query.common.urlencode_resp))
- }
- },
- next_upload_id_marker: match pagination {
- Some(RangeBegin::AfterUpload { upload, .. }) => {
- Some(s3_xml::Value(hex::encode(upload)))
- }
- Some(RangeBegin::IncludingKey { .. }) => Some(s3_xml::Value("include".to_string())),
- _ => None,
- },
-
- // Result body
- upload: acc
- .keys
- .iter()
- .map(|(uuid, info)| s3_xml::ListMultipartItem {
- initiated: s3_xml::Value(msec_to_rfc3339(info.timestamp)),
- key: uriencode_maybe(&info.key, query.common.urlencode_resp),
- upload_id: s3_xml::Value(hex::encode(uuid)),
- storage_class: s3_xml::Value("STANDARD".to_string()),
- initiator: s3_xml::Initiator {
- display_name: s3_xml::Value(DUMMY_NAME.to_string()),
- id: s3_xml::Value(DUMMY_KEY.to_string()),
- },
- owner: s3_xml::Owner {
- display_name: s3_xml::Value(DUMMY_NAME.to_string()),
- id: s3_xml::Value(DUMMY_KEY.to_string()),
- },
- })
- .collect(),
- common_prefixes: acc
- .common_prefixes
- .iter()
- .map(|c| s3_xml::CommonPrefix {
- prefix: s3_xml::Value(c.to_string()),
- })
- .collect(),
- };
-
- let xml = s3_xml::to_xml_with_header(&result)?;
-
- Ok(Response::builder()
- .header("Content-Type", "application/xml")
- .body(Body::from(xml.into_bytes()))?)
-}
-
-pub async fn handle_list_parts(
- garage: Arc<Garage>,
- query: &ListPartsQuery,
-) -> Result<Response<Body>, Error> {
- debug!("ListParts {:?}", query);
-
- let upload_id = s3_put::decode_upload_id(&query.upload_id)?;
-
- let (object, version) = futures::try_join!(
- garage.object_table.get(&query.bucket_id, &query.key),
- garage.version_table.get(&upload_id, &EmptyKey),
- )?;
-
- let (info, next) = fetch_part_info(query, object, version, upload_id)?;
-
- let result = s3_xml::ListPartsResult {
- xmlns: (),
- bucket: s3_xml::Value(query.bucket_name.to_string()),
- key: s3_xml::Value(query.key.to_string()),
- upload_id: s3_xml::Value(query.upload_id.to_string()),
- part_number_marker: query.part_number_marker.map(|e| s3_xml::IntValue(e as i64)),
- next_part_number_marker: next.map(|e| s3_xml::IntValue(e as i64)),
- max_parts: s3_xml::IntValue(query.max_parts as i64),
- is_truncated: s3_xml::Value(next.map(|_| "true").unwrap_or("false").to_string()),
- parts: info
- .iter()
- .map(|part| s3_xml::PartItem {
- etag: s3_xml::Value(format!("\"{}\"", part.etag)),
- last_modified: s3_xml::Value(msec_to_rfc3339(part.timestamp)),
- part_number: s3_xml::IntValue(part.part_number as i64),
- size: s3_xml::IntValue(part.size as i64),
- })
- .collect(),
- initiator: s3_xml::Initiator {
- display_name: s3_xml::Value(DUMMY_NAME.to_string()),
- id: s3_xml::Value(DUMMY_KEY.to_string()),
- },
- owner: s3_xml::Owner {
- display_name: s3_xml::Value(DUMMY_NAME.to_string()),
- id: s3_xml::Value(DUMMY_KEY.to_string()),
- },
- storage_class: s3_xml::Value("STANDARD".to_string()),
- };
-
- let xml = s3_xml::to_xml_with_header(&result)?;
-
- Ok(Response::builder()
- .header("Content-Type", "application/xml")
- .body(Body::from(xml.into_bytes()))?)
-}
-
-/*
- * Private enums and structs
- */
-
-#[derive(Debug)]
-struct ObjectInfo {
- last_modified: u64,
- size: u64,
- etag: String,
-}
-
-#[derive(Debug, PartialEq)]
-struct UploadInfo {
- key: String,
- timestamp: u64,
-}
-
-#[derive(Debug, PartialEq)]
-struct PartInfo {
- etag: String,
- timestamp: u64,
- part_number: u64,
- size: u64,
-}
-
-enum ExtractionResult {
- NoMore,
- Filled,
- FilledAtUpload {
- key: String,
- upload: Uuid,
- },
- Extracted {
- key: String,
- },
- // Fallback key is used for legacy APIs that only support
- // exlusive pagination (and not inclusive one).
- SkipTo {
- key: String,
- fallback_key: Option<String>,
- },
-}
-
-#[derive(PartialEq, Clone, Debug)]
-enum RangeBegin {
- // Fallback key is used for legacy APIs that only support
- // exlusive pagination (and not inclusive one).
- IncludingKey {
- key: String,
- fallback_key: Option<String>,
- },
- AfterKey {
- key: String,
- },
- AfterUpload {
- key: String,
- upload: Uuid,
- },
-}
-type Pagination = Option<RangeBegin>;
-
-/*
- * Fetch list entries
- */
-
-async fn fetch_list_entries<R, F>(
- query: &ListQueryCommon,
- begin: RangeBegin,
- acc: &mut impl ExtractAccumulator,
- mut io: F,
-) -> Result<Pagination, Error>
-where
- R: futures::Future<Output = Result<Vec<Object>, GarageError>>,
- F: FnMut(Uuid, Option<String>, usize) -> R,
-{
- let mut cursor = begin;
- // +1 is needed as we may need to skip the 1st key
- // (range is inclusive while most S3 requests are exclusive)
- let count = query.page_size + 1;
-
- loop {
- let start_key = match cursor {
- RangeBegin::AfterKey { ref key }
- | RangeBegin::AfterUpload { ref key, .. }
- | RangeBegin::IncludingKey { ref key, .. } => Some(key.clone()),
- };
-
- // Fetch objects
- let objects = io(query.bucket_id, start_key.clone(), count).await?;
-
- debug!(
- "List: get range {:?} (max {}), results: {}",
- start_key,
- count,
- objects.len()
- );
- let server_more = objects.len() >= count;
-
- let prev_req_cursor = cursor.clone();
- let mut iter = objects.iter().peekable();
-
- // Drop the first key if needed
- // Only AfterKey requires it according to the S3 spec and our implem.
- match (&cursor, iter.peek()) {
- (RangeBegin::AfterKey { key }, Some(object)) if &object.key == key => iter.next(),
- (_, _) => None,
- };
-
- while let Some(object) = iter.peek() {
- if !object.key.starts_with(&query.prefix) {
- // If the key is not in the requested prefix, we're done
- return Ok(None);
- }
-
- cursor = match acc.extract(query, &cursor, &mut iter) {
- ExtractionResult::Extracted { key } => RangeBegin::AfterKey { key },
- ExtractionResult::SkipTo { key, fallback_key } => {
- RangeBegin::IncludingKey { key, fallback_key }
- }
- ExtractionResult::FilledAtUpload { key, upload } => {
- return Ok(Some(RangeBegin::AfterUpload { key, upload }))
- }
- ExtractionResult::Filled => return Ok(Some(cursor)),
- ExtractionResult::NoMore => return Ok(None),
- };
- }
-
- if !server_more {
- // We did not fully fill the accumulator despite exhausting all the data we have,
- // we're done
- return Ok(None);
- }
-
- if prev_req_cursor == cursor {
- unreachable!("No progress has been done in the loop. This is a bug, please report it.");
- }
- }
-}
-
-fn fetch_part_info(
- query: &ListPartsQuery,
- object: Option<Object>,
- version: Option<Version>,
- upload_id: Uuid,
-) -> Result<(Vec<PartInfo>, Option<u64>), Error> {
- // Check results
- let object = object.ok_or(Error::NoSuchKey)?;
-
- let obj_version = object
- .versions()
- .iter()
- .find(|v| v.uuid == upload_id && v.is_uploading())
- .ok_or(Error::NoSuchUpload)?;
-
- let version = version.ok_or(Error::NoSuchKey)?;
-
- // Cut the beginning of our 2 vectors if required
- let (etags, blocks) = match &query.part_number_marker {
- Some(marker) => {
- let next = marker + 1;
-
- let part_idx = into_ok_or_err(
- version
- .parts_etags
- .items()
- .binary_search_by(|(part_num, _)| part_num.cmp(&next)),
- );
- let parts = &version.parts_etags.items()[part_idx..];
-
- let block_idx = into_ok_or_err(
- version
- .blocks
- .items()
- .binary_search_by(|(vkey, _)| vkey.part_number.cmp(&next)),
- );
- let blocks = &version.blocks.items()[block_idx..];
-
- (parts, blocks)
- }
- None => (version.parts_etags.items(), version.blocks.items()),
- };
-
- // Use the block vector to compute a (part_number, size) vector
- let mut size = Vec::<(u64, u64)>::new();
- blocks.iter().for_each(|(key, val)| {
- let mut new_size = val.size;
- match size.pop() {
- Some((part_number, size)) if part_number == key.part_number => new_size += size,
- Some(v) => size.push(v),
- None => (),
- }
- size.push((key.part_number, new_size))
- });
-
- // Merge the etag vector and size vector to build a PartInfo vector
- let max_parts = query.max_parts as usize;
- let (mut etag_iter, mut size_iter) = (etags.iter().peekable(), size.iter().peekable());
-
- let mut info = Vec::<PartInfo>::with_capacity(max_parts);
-
- while info.len() < max_parts {
- match (etag_iter.peek(), size_iter.peek()) {
- (Some((ep, etag)), Some((sp, size))) => match ep.cmp(sp) {
- Ordering::Less => {
- debug!("ETag information ignored due to missing corresponding block information. Query: {:?}", query);
- etag_iter.next();
- }
- Ordering::Equal => {
- info.push(PartInfo {
- etag: etag.to_string(),
- timestamp: obj_version.timestamp,
- part_number: *ep,
- size: *size,
- });
- etag_iter.next();
- size_iter.next();
- }
- Ordering::Greater => {
- debug!("Block information ignored due to missing corresponding ETag information. Query: {:?}", query);
- size_iter.next();
- }
- },
- (None, None) => return Ok((info, None)),
- _ => {
- debug!(
- "Additional block or ETag information ignored. Query: {:?}",
- query
- );
- return Ok((info, None));
- }
- }
- }
-
- match info.last() {
- Some(part_info) => {
- let pagination = Some(part_info.part_number);
- Ok((info, pagination))
- }
- None => Ok((info, None)),
- }
-}
-
-/*
- * ListQuery logic
- */
-
-/// Determine the key from where we want to start fetch objects from the database
-///
-/// We choose whether the object at this key must
-/// be included or excluded from the response.
-/// This key can be the prefix in the base case, or intermediate
-/// points in the dataset if we are continuing a previous listing.
-impl ListObjectsQuery {
- fn build_accumulator(&self) -> Accumulator<String, ObjectInfo> {
- Accumulator::<String, ObjectInfo>::new(self.common.page_size)
- }
-
- fn begin(&self) -> Result<RangeBegin, Error> {
- if self.is_v2 {
- match (&self.continuation_token, &self.start_after) {
- // In V2 mode, the continuation token is defined as an opaque
- // string in the spec, so we can do whatever we want with it.
- // In our case, it is defined as either [ or ] (for include
- // representing the key to start with.
- (Some(token), _) => match &token[..1] {
- "[" => Ok(RangeBegin::IncludingKey {
- key: String::from_utf8(base64::decode(token[1..].as_bytes())?)?,
- fallback_key: None,
- }),
- "]" => Ok(RangeBegin::AfterKey {
- key: String::from_utf8(base64::decode(token[1..].as_bytes())?)?,
- }),
- _ => Err(Error::BadRequest("Invalid continuation token".to_string())),
- },
-
- // StartAfter has defined semantics in the spec:
- // start listing at the first key immediately after.
- (_, Some(key)) => Ok(RangeBegin::AfterKey {
- key: key.to_string(),
- }),
-
- // In the case where neither is specified, we start
- // listing at the specified prefix. If an object has this
- // exact same key, we include it. (@TODO is this correct?)
- _ => Ok(RangeBegin::IncludingKey {
- key: self.common.prefix.to_string(),
- fallback_key: None,
- }),
- }
- } else {
- match &self.marker {
- // In V1 mode, the spec defines the Marker value to mean
- // the same thing as the StartAfter value in V2 mode.
- Some(key) => Ok(RangeBegin::AfterKey {
- key: key.to_string(),
- }),
- _ => Ok(RangeBegin::IncludingKey {
- key: self.common.prefix.to_string(),
- fallback_key: None,
- }),
- }
- }
- }
-}
-
-impl ListMultipartUploadsQuery {
- fn build_accumulator(&self) -> Accumulator<Uuid, UploadInfo> {
- Accumulator::<Uuid, UploadInfo>::new(self.common.page_size)
- }
-
- fn begin(&self) -> Result<RangeBegin, Error> {
- match (&self.upload_id_marker, &self.key_marker) {
- // If both the upload id marker and the key marker are sets,
- // the spec specifies that we must start listing uploads INCLUDING the given key,
- // AFTER the specified upload id (sorted in a lexicographic order).
- // To enable some optimisations, we emulate "IncludingKey" by extending the upload id
- // semantic. We base our reasoning on the hypothesis that S3's upload ids are opaques
- // while Garage's ones are 32 bytes hex encoded which enables us to extend this query
- // with a specific "include" upload id.
- (Some(up_marker), Some(key_marker)) => match &up_marker[..] {
- "include" => Ok(RangeBegin::IncludingKey {
- key: key_marker.to_string(),
- fallback_key: None,
- }),
- uuid => Ok(RangeBegin::AfterUpload {
- key: key_marker.to_string(),
- upload: s3_put::decode_upload_id(uuid)?,
- }),
- },
-
- // If only the key marker is specified, the spec says that we must start listing
- // uploads AFTER the specified key.
- (None, Some(key_marker)) => Ok(RangeBegin::AfterKey {
- key: key_marker.to_string(),
- }),
- _ => Ok(RangeBegin::IncludingKey {
- key: self.common.prefix.to_string(),
- fallback_key: None,
- }),
- }
- }
-}
-
-/*
- * Accumulator logic
- */
-
-trait ExtractAccumulator {
- fn extract<'a>(
- &mut self,
- query: &ListQueryCommon,
- cursor: &RangeBegin,
- iter: &mut Peekable<impl Iterator<Item = &'a Object>>,
- ) -> ExtractionResult;
-}
-
-struct Accumulator<K, V> {
- common_prefixes: BTreeSet<String>,
- keys: BTreeMap<K, V>,
- max_capacity: usize,
-}
-
-type ObjectAccumulator = Accumulator<String, ObjectInfo>;
-type UploadAccumulator = Accumulator<Uuid, UploadInfo>;
-
-impl<K: std::cmp::Ord, V> Accumulator<K, V> {
- fn new(page_size: usize) -> Accumulator<K, V> {
- Accumulator {
- common_prefixes: BTreeSet::<String>::new(),
- keys: BTreeMap::<K, V>::new(),
- max_capacity: page_size,
- }
- }
-
- /// Observe the Object iterator and try to extract a single common prefix
- ///
- /// This function can consume an arbitrary number of items as long as they share the same
- /// common prefix.
- fn extract_common_prefix<'a>(
- &mut self,
- objects: &mut Peekable<impl Iterator<Item = &'a Object>>,
- query: &ListQueryCommon,
- ) -> Option<ExtractionResult> {
- // Get the next object from the iterator
- let object = objects.peek().expect("This iterator can not be empty as it is checked earlier in the code. This is a logic bug, please report it.");
-
- // Check if this is a common prefix (requires a passed delimiter and its value in the key)
- let pfx = match common_prefix(object, query) {
- Some(p) => p,
- None => return None,
- };
-
- // Try to register this prefix
- // If not possible, we can return early
- if !self.try_insert_common_prefix(pfx.to_string()) {
- return Some(ExtractionResult::Filled);
- }
-
- // We consume the whole common prefix from the iterator
- let mut last_pfx_key = &object.key;
- loop {
- last_pfx_key = match objects.peek() {
- Some(o) if o.key.starts_with(pfx) => &o.key,
- Some(_) => {
- return Some(ExtractionResult::Extracted {
- key: last_pfx_key.to_owned(),
- })
- }
- None => {
- return match key_after_prefix(pfx) {
- Some(next) => Some(ExtractionResult::SkipTo {
- key: next,
- fallback_key: Some(last_pfx_key.to_owned()),
- }),
- None => Some(ExtractionResult::NoMore),
- }
- }
- };
-
- objects.next();
- }
- }
-
- fn is_full(&mut self) -> bool {
- self.keys.len() + self.common_prefixes.len() >= self.max_capacity
- }
-
- fn try_insert_common_prefix(&mut self, key: String) -> bool {
- // If we already have an entry, we can continue
- if self.common_prefixes.contains(&key) {
- return true;
- }
-
- // Otherwise, we need to check if we can add it
- match self.is_full() {
- true => false,
- false => {
- self.common_prefixes.insert(key);
- true
- }
- }
- }
-
- fn try_insert_entry(&mut self, key: K, value: V) -> bool {
- // It is impossible to add twice a key, this is an error
- assert!(!self.keys.contains_key(&key));
-
- match self.is_full() {
- true => false,
- false => {
- self.keys.insert(key, value);
- true
- }
- }
- }
-}
-
-impl ExtractAccumulator for ObjectAccumulator {
- fn extract<'a>(
- &mut self,
- query: &ListQueryCommon,
- _cursor: &RangeBegin,
- objects: &mut Peekable<impl Iterator<Item = &'a Object>>,
- ) -> ExtractionResult {
- if let Some(e) = self.extract_common_prefix(objects, query) {
- return e;
- }
-
- let object = objects.next().expect("This iterator can not be empty as it is checked earlier in the code. This is a logic bug, please report it.");
-
- let version = match object.versions().iter().find(|x| x.is_data()) {
- Some(v) => v,
- None => unreachable!(
- "Expect to have objects having data due to earlier filtering. This is a logic bug."
- ),
- };
-
- let meta = match &version.state {
- ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta,
- ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta,
- _ => unreachable!(),
- };
- let info = ObjectInfo {
- last_modified: version.timestamp,
- size: meta.size,
- etag: meta.etag.to_string(),
- };
-
- match self.try_insert_entry(object.key.clone(), info) {
- true => ExtractionResult::Extracted {
- key: object.key.clone(),
- },
- false => ExtractionResult::Filled,
- }
- }
-}
-
-impl ExtractAccumulator for UploadAccumulator {
- /// Observe the iterator, process a single key, and try to extract one or more upload entries
- ///
- /// This function processes a single object from the iterator that can contain an arbitrary
- /// number of versions, and thus "uploads".
- fn extract<'a>(
- &mut self,
- query: &ListQueryCommon,
- cursor: &RangeBegin,
- objects: &mut Peekable<impl Iterator<Item = &'a Object>>,
- ) -> ExtractionResult {
- if let Some(e) = self.extract_common_prefix(objects, query) {
- return e;
- }
-
- // Get the next object from the iterator
- let object = objects.next().expect("This iterator can not be empty as it is checked earlier in the code. This is a logic bug, please report it.");
-
- let mut uploads_for_key = object
- .versions()
- .iter()
- .filter(|x| x.is_uploading())
- .collect::<Vec<&ObjectVersion>>();
-
- // S3 logic requires lexicographically sorted upload ids.
- uploads_for_key.sort_unstable_by_key(|e| e.uuid);
-
- // Skip results if an upload marker is provided
- if let RangeBegin::AfterUpload { upload, .. } = cursor {
- // Because our data are sorted, we can use a binary search to find the UUID
- // or to find where it should have been added. Once this position is found,
- // we use it to discard the first part of the array.
- let idx = match uploads_for_key.binary_search_by(|e| e.uuid.cmp(upload)) {
- // we start after the found uuid so we need to discard the pointed value.
- // In the worst case, the UUID is the last element, which lead us to an empty array
- // but we are never out of bound.
- Ok(i) => i + 1,
- // if the UUID is not found, the upload may have been discarded between the 2 request,
- // this function returns where it could have been inserted,
- // the pointed value is thus greater than our marker and we need to keep it.
- Err(i) => i,
- };
- uploads_for_key = uploads_for_key[idx..].to_vec();
- }
-
- let mut iter = uploads_for_key.iter();
-
- // The first entry is a specific case
- // as it changes our result enum type
- let first_upload = match iter.next() {
- Some(u) => u,
- None => {
- return ExtractionResult::Extracted {
- key: object.key.clone(),
- }
- }
- };
- let first_up_info = UploadInfo {
- key: object.key.to_string(),
- timestamp: first_upload.timestamp,
- };
- if !self.try_insert_entry(first_upload.uuid, first_up_info) {
- return ExtractionResult::Filled;
- }
-
- // We can then collect the remaining uploads in a loop
- let mut prev_uuid = first_upload.uuid;
- for upload in iter {
- let up_info = UploadInfo {
- key: object.key.to_string(),
- timestamp: upload.timestamp,
- };
-
- // Insert data in our accumulator
- // If it is full, return information to paginate.
- if !self.try_insert_entry(upload.uuid, up_info) {
- return ExtractionResult::FilledAtUpload {
- key: object.key.clone(),
- upload: prev_uuid,
- };
- }
- // Update our last added UUID
- prev_uuid = upload.uuid;
- }
-
- // We successfully collected all the uploads
- ExtractionResult::Extracted {
- key: object.key.clone(),
- }
- }
-}
-
-/*
- * Utility functions
- */
-
-/// This is a stub for Result::into_ok_or_err that is not yet in Rust stable
-fn into_ok_or_err<T>(r: Result<T, T>) -> T {
- match r {
- Ok(r) => r,
- Err(r) => r,
- }
-}
-
-/// Returns the common prefix of the object given the query prefix and delimiter
-fn common_prefix<'a>(object: &'a Object, query: &ListQueryCommon) -> Option<&'a str> {
- match &query.delimiter {
- Some(delimiter) => object.key[query.prefix.len()..]
- .find(delimiter)
- .map(|i| &object.key[..query.prefix.len() + i + delimiter.len()]),
- None => None,
- }
-}
-
-/// URIencode a value if needed
-fn uriencode_maybe(s: &str, yes: bool) -> s3_xml::Value {
- if yes {
- s3_xml::Value(uri_encode(s, true))
- } else {
- s3_xml::Value(s.to_string())
- }
-}
-
-const UTF8_BEFORE_LAST_CHAR: char = '\u{10FFFE}';
-
-/// Compute the key after the prefix
-fn key_after_prefix(pfx: &str) -> Option<String> {
- let mut next = pfx.to_string();
- while !next.is_empty() {
- let tail = next.pop().unwrap();
- if tail >= char::MAX {
- continue;
- }
-
- // Circumvent a limitation of RangeFrom that overflow earlier than needed
- // See: https://doc.rust-lang.org/core/ops/struct.RangeFrom.html
- let new_tail = if tail == UTF8_BEFORE_LAST_CHAR {
- char::MAX
- } else {
- (tail..).nth(1).unwrap()
- };
-
- next.push(new_tail);
- return Some(next);
- }
-
- None
-}
-
-/*
- * Unit tests of this module
- */
-#[cfg(test)]
-mod tests {
- use super::*;
- use garage_model::version_table::*;
- use garage_util::*;
- use std::iter::FromIterator;
-
- const TS: u64 = 1641394898314;
-
- fn bucket() -> Uuid {
- Uuid::from([0x42; 32])
- }
-
- fn query() -> ListMultipartUploadsQuery {
- ListMultipartUploadsQuery {
- common: ListQueryCommon {
- prefix: "".to_string(),
- delimiter: Some("/".to_string()),
- page_size: 1000,
- urlencode_resp: false,
- bucket_name: "a".to_string(),
- bucket_id: Uuid::from([0x00; 32]),
- },
- key_marker: None,
- upload_id_marker: None,
- }
- }
-
- fn objs() -> Vec<Object> {
- vec![
- Object::new(
- bucket(),
- "a/b/c".to_string(),
- vec![objup_version([0x01; 32])],
- ),
- Object::new(bucket(), "d".to_string(), vec![objup_version([0x01; 32])]),
- ]
- }
-
- fn objup_version(uuid: [u8; 32]) -> ObjectVersion {
- ObjectVersion {
- uuid: Uuid::from(uuid),
- timestamp: TS,
- state: ObjectVersionState::Uploading(ObjectVersionHeaders {
- content_type: "text/plain".to_string(),
- other: BTreeMap::<String, String>::new(),
- }),
- }
- }
-
- #[test]
- fn test_key_after_prefix() {
- assert_eq!(UTF8_BEFORE_LAST_CHAR as u32, (char::MAX as u32) - 1);
- assert_eq!(key_after_prefix("a/b/").unwrap().as_str(), "a/b0");
- assert_eq!(key_after_prefix("€").unwrap().as_str(), "₭");
- assert_eq!(
- key_after_prefix("􏿽").unwrap().as_str(),
- String::from(char::from_u32(0x10FFFE).unwrap())
- );
-
- // When the last character is the biggest UTF8 char
- let a = String::from_iter(['a', char::MAX].iter());
- assert_eq!(key_after_prefix(a.as_str()).unwrap().as_str(), "b");
-
- // When all characters are the biggest UTF8 char
- let b = String::from_iter([char::MAX; 3].iter());
- assert!(key_after_prefix(b.as_str()).is_none());
-
- // Check utf8 surrogates
- let c = String::from('\u{D7FF}');
- assert_eq!(
- key_after_prefix(c.as_str()).unwrap().as_str(),
- String::from('\u{E000}')
- );
-
- // Check the character before the biggest one
- let d = String::from('\u{10FFFE}');
- assert_eq!(
- key_after_prefix(d.as_str()).unwrap().as_str(),
- String::from(char::MAX)
- );
- }
-
- #[test]
- fn test_common_prefixes() {
- let mut query = query();
- let objs = objs();
-
- query.common.prefix = "a/".to_string();
- assert_eq!(
- common_prefix(objs.get(0).unwrap(), &query.common),
- Some("a/b/")
- );
-
- query.common.prefix = "a/b/".to_string();
- assert_eq!(common_prefix(objs.get(0).unwrap(), &query.common), None);
- }
-
- #[test]
- fn test_extract_common_prefix() {
- let mut query = query();
- query.common.prefix = "a/".to_string();
- let objs = objs();
- let mut acc = UploadAccumulator::new(query.common.page_size);
-
- let mut iter = objs.iter().peekable();
- match acc.extract_common_prefix(&mut iter, &query.common) {
- Some(ExtractionResult::Extracted { key }) => assert_eq!(key, "a/b/c".to_string()),
- _ => panic!("wrong result"),
- }
- assert_eq!(acc.common_prefixes.len(), 1);
- assert_eq!(acc.common_prefixes.iter().next().unwrap(), "a/b/");
- }
-
- #[test]
- fn test_extract_upload() {
- let objs = vec![
- Object::new(
- bucket(),
- "b".to_string(),
- vec![
- objup_version([0x01; 32]),
- objup_version([0x80; 32]),
- objup_version([0x8f; 32]),
- objup_version([0xdd; 32]),
- ],
- ),
- Object::new(bucket(), "c".to_string(), vec![]),
- ];
-
- let mut acc = UploadAccumulator::new(2);
- let mut start = RangeBegin::AfterUpload {
- key: "b".to_string(),
- upload: Uuid::from([0x01; 32]),
- };
-
- let mut iter = objs.iter().peekable();
-
- // Check the case where we skip some uploads
- match acc.extract(&(query().common), &start, &mut iter) {
- ExtractionResult::FilledAtUpload { key, upload } => {
- assert_eq!(key, "b");
- assert_eq!(upload, Uuid::from([0x8f; 32]));
- }
- _ => panic!("wrong result"),
- };
-
- assert_eq!(acc.keys.len(), 2);
- assert_eq!(
- acc.keys.get(&Uuid::from([0x80; 32])).unwrap(),
- &UploadInfo {
- timestamp: TS,
- key: "b".to_string()
- }
- );
- assert_eq!(
- acc.keys.get(&Uuid::from([0x8f; 32])).unwrap(),
- &UploadInfo {
- timestamp: TS,
- key: "b".to_string()
- }
- );
-
- acc = UploadAccumulator::new(2);
- start = RangeBegin::AfterUpload {
- key: "b".to_string(),
- upload: Uuid::from([0xff; 32]),
- };
- iter = objs.iter().peekable();
-
- // Check the case where we skip all the uploads
- match acc.extract(&(query().common), &start, &mut iter) {
- ExtractionResult::Extracted { key } if key.as_str() == "b" => (),
- _ => panic!("wrong result"),
- };
- }
-
- #[tokio::test]
- async fn test_fetch_uploads_no_result() -> Result<(), Error> {
- let query = query();
- let mut acc = query.build_accumulator();
- let page = fetch_list_entries(
- &query.common,
- query.begin()?,
- &mut acc,
- |_, _, _| async move { Ok(vec![]) },
- )
- .await?;
- assert_eq!(page, None);
- assert_eq!(acc.common_prefixes.len(), 0);
- assert_eq!(acc.keys.len(), 0);
-
- Ok(())
- }
-
- #[tokio::test]
- async fn test_fetch_uploads_basic() -> Result<(), Error> {
- let query = query();
- let mut acc = query.build_accumulator();
- let mut fake_io = |_, _, _| async move { Ok(objs()) };
- let page =
- fetch_list_entries(&query.common, query.begin()?, &mut acc, &mut fake_io).await?;
- assert_eq!(page, None);
- assert_eq!(acc.common_prefixes.len(), 1);
- assert_eq!(acc.keys.len(), 1);
- assert!(acc.common_prefixes.contains("a/"));
-
- Ok(())
- }
-
- #[tokio::test]
- async fn test_fetch_uploads_advanced() -> Result<(), Error> {
- let mut query = query();
- query.common.page_size = 2;
-
- let mut fake_io = |_, k: Option<String>, _| async move {
- Ok(match k.as_deref() {
- Some("") => vec![
- Object::new(bucket(), "b/a".to_string(), vec![objup_version([0x01; 32])]),
- Object::new(bucket(), "b/b".to_string(), vec![objup_version([0x01; 32])]),
- Object::new(bucket(), "b/c".to_string(), vec![objup_version([0x01; 32])]),
- ],
- Some("b0") => vec![
- Object::new(bucket(), "c/a".to_string(), vec![objup_version([0x01; 32])]),
- Object::new(bucket(), "c/b".to_string(), vec![objup_version([0x01; 32])]),
- Object::new(bucket(), "c/c".to_string(), vec![objup_version([0x02; 32])]),
- ],
- Some("c0") => vec![Object::new(
- bucket(),
- "d".to_string(),
- vec![objup_version([0x01; 32])],
- )],
- _ => panic!("wrong value {:?}", k),
- })
- };
-
- let mut acc = query.build_accumulator();
- let page =
- fetch_list_entries(&query.common, query.begin()?, &mut acc, &mut fake_io).await?;
- assert_eq!(
- page,
- Some(RangeBegin::IncludingKey {
- key: "c0".to_string(),
- fallback_key: Some("c/c".to_string())
- })
- );
- assert_eq!(acc.common_prefixes.len(), 2);
- assert_eq!(acc.keys.len(), 0);
- assert!(acc.common_prefixes.contains("b/"));
- assert!(acc.common_prefixes.contains("c/"));
-
- Ok(())
- }
-
- fn version() -> Version {
- let uuid = Uuid::from([0x08; 32]);
-
- let blocks = vec![
- (
- VersionBlockKey {
- part_number: 1,
- offset: 1,
- },
- VersionBlock {
- hash: uuid,
- size: 3,
- },
- ),
- (
- VersionBlockKey {
- part_number: 1,
- offset: 2,
- },
- VersionBlock {
- hash: uuid,
- size: 2,
- },
- ),
- (
- VersionBlockKey {
- part_number: 2,
- offset: 1,
- },
- VersionBlock {
- hash: uuid,
- size: 8,
- },
- ),
- (
- VersionBlockKey {
- part_number: 5,
- offset: 1,
- },
- VersionBlock {
- hash: uuid,
- size: 7,
- },
- ),
- (
- VersionBlockKey {
- part_number: 8,
- offset: 1,
- },
- VersionBlock {
- hash: uuid,
- size: 5,
- },
- ),
- ];
- let etags = vec![
- (1, "etag1".to_string()),
- (3, "etag2".to_string()),
- (5, "etag3".to_string()),
- (8, "etag4".to_string()),
- (9, "etag5".to_string()),
- ];
-
- Version {
- bucket_id: uuid,
- key: "a".to_string(),
- uuid,
- deleted: false.into(),
- blocks: crdt::Map::<VersionBlockKey, VersionBlock>::from_iter(blocks),
- parts_etags: crdt::Map::<u64, String>::from_iter(etags),
- }
- }
-
- fn obj() -> Object {
- Object::new(bucket(), "d".to_string(), vec![objup_version([0x08; 32])])
- }
-
- #[test]
- fn test_fetch_part_info() -> Result<(), Error> {
- let uuid = Uuid::from([0x08; 32]);
- let mut query = ListPartsQuery {
- bucket_name: "a".to_string(),
- bucket_id: uuid,
- key: "a".to_string(),
- upload_id: "xx".to_string(),
- part_number_marker: None,
- max_parts: 2,
- };
-
- assert!(
- fetch_part_info(&query, None, None, uuid).is_err(),
- "No object and version should fail"
- );
- assert!(
- fetch_part_info(&query, Some(obj()), None, uuid).is_err(),
- "No version should faild"
- );
- assert!(
- fetch_part_info(&query, None, Some(version()), uuid).is_err(),
- "No object should fail"
- );
-
- // Start from the beginning but with limited size to trigger pagination
- let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?;
- assert_eq!(pagination.unwrap(), 5);
- assert_eq!(
- info,
- vec![
- PartInfo {
- etag: "etag1".to_string(),
- timestamp: TS,
- part_number: 1,
- size: 5
- },
- PartInfo {
- etag: "etag3".to_string(),
- timestamp: TS,
- part_number: 5,
- size: 7
- },
- ]
- );
-
- // Use previous pagination to make a new request
- query.part_number_marker = Some(pagination.unwrap());
- let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?;
- assert!(pagination.is_none());
- assert_eq!(
- info,
- vec![PartInfo {
- etag: "etag4".to_string(),
- timestamp: TS,
- part_number: 8,
- size: 5
- },]
- );
-
- // Trying to access a part that is way larger than registered ones
- query.part_number_marker = Some(9999);
- let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?;
- assert!(pagination.is_none());
- assert_eq!(info, vec![]);
-
- // Try without any limitation
- query.max_parts = 1000;
- query.part_number_marker = None;
- let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?;
- assert!(pagination.is_none());
- assert_eq!(
- info,
- vec![
- PartInfo {
- etag: "etag1".to_string(),
- timestamp: TS,
- part_number: 1,
- size: 5
- },
- PartInfo {
- etag: "etag3".to_string(),
- timestamp: TS,
- part_number: 5,
- size: 7
- },
- PartInfo {
- etag: "etag4".to_string(),
- timestamp: TS,
- part_number: 8,
- size: 5
- },
- ]
- );
-
- Ok(())
- }
-}