aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/api/api_server.rs19
-rw-r--r--src/api/s3_list.rs374
2 files changed, 389 insertions, 4 deletions
diff --git a/src/api/api_server.rs b/src/api/api_server.rs
index b064ac24..dfb8dfdb 100644
--- a/src/api/api_server.rs
+++ b/src/api/api_server.rs
@@ -294,6 +294,25 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
)
.await
}
+ Endpoint::ListParts {
+ key,
+ max_parts,
+ part_number_marker,
+ upload_id,
+ } => {
+ handle_list_parts(
+ garage,
+ &ListPartsQuery {
+ bucket_name,
+ bucket_id,
+ key,
+ upload_id,
+ part_number_marker: part_number_marker.map(|p| p.clamp(1, 10000)),
+ max_parts: max_parts.map(|p| p.clamp(1, 1000)).unwrap_or(1000),
+ },
+ )
+ .await
+ }
Endpoint::DeleteObjects {} => {
handle_delete_objects(garage, bucket_id, req, content_sha256).await
}
diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs
index c3bc6938..92998159 100644
--- a/src/api/s3_list.rs
+++ b/src/api/s3_list.rs
@@ -1,3 +1,4 @@
+use std::cmp::Ordering;
use std::collections::{BTreeMap, BTreeSet};
use std::iter::{Iterator, Peekable};
use std::sync::Arc;
@@ -10,12 +11,18 @@ use garage_util::time::*;
use garage_model::garage::Garage;
use garage_model::object_table::*;
+use garage_model::version_table::Version;
+
+use garage_table::EmptyKey;
use crate::encoding::*;
use crate::error::*;
use crate::s3_put;
use crate::s3_xml;
+const DUMMY_NAME: &str = "Dummy Key";
+const DUMMY_KEY: &str = "GKDummyKey";
+
#[derive(Debug)]
pub struct ListQueryCommon {
pub bucket_name: String,
@@ -42,6 +49,16 @@ pub struct ListMultipartUploadsQuery {
pub common: ListQueryCommon,
}
+#[derive(Debug)]
+pub struct ListPartsQuery {
+ pub bucket_name: String,
+ pub bucket_id: Uuid,
+ pub key: String,
+ pub upload_id: String,
+ pub part_number_marker: Option<u64>,
+ pub max_parts: u64,
+}
+
pub async fn handle_list(
garage: Arc<Garage>,
query: &ListObjectsQuery,
@@ -54,6 +71,7 @@ pub async fn handle_list(
}
};
+ debug!("ListObjects {:?}", query);
let mut acc = query.build_accumulator();
let pagination = fetch_list_entries(&query.common, query.begin()?, &mut acc, &io).await?;
@@ -152,6 +170,7 @@ pub async fn handle_list_multipart_upload(
}
};
+ debug!("ListMultipartUploads {:?}", query);
let mut acc = query.build_accumulator();
let pagination = fetch_list_entries(&query.common, query.begin()?, &mut acc, &io).await?;
@@ -208,12 +227,12 @@ pub async fn handle_list_multipart_upload(
upload_id: s3_xml::Value(hex::encode(uuid)),
storage_class: s3_xml::Value("STANDARD".to_string()),
initiator: s3_xml::Initiator {
- display_name: s3_xml::Value("Dummy Key".to_string()),
- id: s3_xml::Value("GKDummyKey".to_string()),
+ display_name: s3_xml::Value(DUMMY_NAME.to_string()),
+ id: s3_xml::Value(DUMMY_KEY.to_string()),
},
owner: s3_xml::Owner {
- display_name: s3_xml::Value("Dummy Key".to_string()),
- id: s3_xml::Value("GKDummyKey".to_string()),
+ display_name: s3_xml::Value(DUMMY_NAME.to_string()),
+ id: s3_xml::Value(DUMMY_KEY.to_string()),
},
})
.collect(),
@@ -233,6 +252,57 @@ pub async fn handle_list_multipart_upload(
.body(Body::from(xml.into_bytes()))?)
}
+pub async fn handle_list_parts(
+ garage: Arc<Garage>,
+ query: &ListPartsQuery,
+) -> Result<Response<Body>, Error> {
+ debug!("ListParts {:?}", query);
+
+ let upload_id = s3_put::decode_upload_id(&query.upload_id)?;
+
+ let (object, version) = futures::try_join!(
+ garage.object_table.get(&query.bucket_id, &query.key),
+ garage.version_table.get(&upload_id, &EmptyKey),
+ )?;
+
+ let (info, next) = fetch_part_info(query, object, version, upload_id)?;
+
+ let result = s3_xml::ListPartsResult {
+ xmlns: (),
+ bucket: s3_xml::Value(query.bucket_name.to_string()),
+ key: s3_xml::Value(query.key.to_string()),
+ upload_id: s3_xml::Value(query.upload_id.to_string()),
+ part_number_marker: query.part_number_marker.map(|e| s3_xml::IntValue(e as i64)),
+ next_part_number_marker: next.map(|e| s3_xml::IntValue(e as i64)),
+ max_parts: s3_xml::IntValue(query.max_parts as i64),
+ is_truncated: s3_xml::Value(next.map(|_| "true").unwrap_or("false").to_string()),
+ parts: info
+ .iter()
+ .map(|part| s3_xml::PartItem {
+ etag: s3_xml::Value(format!("\"{}\"", part.etag)),
+ last_modified: s3_xml::Value(msec_to_rfc3339(part.timestamp)),
+ part_number: s3_xml::IntValue(part.part_number as i64),
+ size: s3_xml::IntValue(part.size as i64),
+ })
+ .collect(),
+ initiator: s3_xml::Initiator {
+ display_name: s3_xml::Value(DUMMY_NAME.to_string()),
+ id: s3_xml::Value(DUMMY_KEY.to_string()),
+ },
+ owner: s3_xml::Owner {
+ display_name: s3_xml::Value(DUMMY_NAME.to_string()),
+ id: s3_xml::Value(DUMMY_KEY.to_string()),
+ },
+ storage_class: s3_xml::Value("STANDARD".to_string()),
+ };
+
+ let xml = s3_xml::to_xml_with_header(&result)?;
+
+ Ok(Response::builder()
+ .header("Content-Type", "application/xml")
+ .body(Body::from(xml.into_bytes()))?)
+}
+
/*
* Private enums and structs
*/
@@ -250,6 +320,14 @@ struct UploadInfo {
timestamp: u64,
}
+#[derive(Debug, PartialEq)]
+struct PartInfo {
+ etag: String,
+ timestamp: u64,
+ part_number: u64,
+ size: u64,
+}
+
enum ExtractionResult {
NoMore,
Filled,
@@ -364,6 +442,109 @@ where
}
}
+fn fetch_part_info(
+ query: &ListPartsQuery,
+ object: Option<Object>,
+ version: Option<Version>,
+ upload_id: Uuid,
+) -> Result<(Vec<PartInfo>, Option<u64>), Error> {
+ // Check results
+ let object = object.ok_or(Error::NoSuchKey)?;
+
+ let obj_version = object
+ .versions()
+ .iter()
+ .find(|v| v.uuid == upload_id && v.is_uploading())
+ .ok_or(Error::NoSuchUpload)?;
+
+ let version = version.ok_or(Error::NoSuchKey)?;
+
+ // Cut the beginning of our 2 vectors if required
+ let (etags, blocks) = match &query.part_number_marker {
+ Some(marker) => {
+ let next = marker + 1;
+
+ let part_idx = into_ok_or_err(
+ version
+ .parts_etags
+ .items()
+ .binary_search_by(|(part_num, _)| part_num.cmp(&next)),
+ );
+ let parts = &version.parts_etags.items()[part_idx..];
+
+ let block_idx = into_ok_or_err(
+ version
+ .blocks
+ .items()
+ .binary_search_by(|(vkey, _)| vkey.part_number.cmp(&next)),
+ );
+ let blocks = &version.blocks.items()[block_idx..];
+
+ (parts, blocks)
+ }
+ None => (version.parts_etags.items(), version.blocks.items()),
+ };
+
+ // Use the block vector to compute a (part_number, size) vector
+ let mut size = Vec::<(u64, u64)>::new();
+ blocks.iter().for_each(|(key, val)| {
+ let mut new_size = val.size;
+ match size.pop() {
+ Some((part_number, size)) if part_number == key.part_number => new_size += size,
+ Some(v) => size.push(v),
+ None => (),
+ }
+ size.push((key.part_number, new_size))
+ });
+
+ // Merge the etag vector and size vector to build a PartInfo vector
+ let max_parts = query.max_parts as usize;
+ let (mut etag_iter, mut size_iter) = (etags.iter().peekable(), size.iter().peekable());
+
+ let mut info = Vec::<PartInfo>::with_capacity(max_parts);
+
+ while info.len() < max_parts {
+ match (etag_iter.peek(), size_iter.peek()) {
+ (Some((ep, etag)), Some((sp, size))) => match ep.cmp(sp) {
+ Ordering::Less => {
+ debug!("ETag information ignored due to missing corresponding block information. Query: {:?}", query);
+ etag_iter.next();
+ }
+ Ordering::Equal => {
+ info.push(PartInfo {
+ etag: etag.to_string(),
+ timestamp: obj_version.timestamp,
+ part_number: *ep,
+ size: *size,
+ });
+ etag_iter.next();
+ size_iter.next();
+ }
+ Ordering::Greater => {
+ debug!("Block information ignored due to missing corresponding ETag information. Query: {:?}", query);
+ size_iter.next();
+ }
+ },
+ (None, None) => return Ok((info, None)),
+ _ => {
+ debug!(
+ "Additional block or ETag information ignored. Query: {:?}",
+ query
+ );
+ return Ok((info, None));
+ }
+ }
+ }
+
+ match info.last() {
+ Some(part_info) => {
+ let pagination = Some(part_info.part_number);
+ Ok((info, pagination))
+ }
+ None => Ok((info, None)),
+ }
+}
+
/*
* ListQuery logic
*/
@@ -715,6 +896,14 @@ impl ExtractAccumulator for UploadAccumulator {
* Utility functions
*/
+/// This is a stub for Result::into_ok_or_err that is not yet in Rust stable
+fn into_ok_or_err<T>(r: Result<T, T>) -> T {
+ match r {
+ Ok(r) => r,
+ Err(r) => r,
+ }
+}
+
/// Returns the common prefix of the object given the query prefix and delimiter
fn common_prefix<'a>(object: &'a Object, query: &ListQueryCommon) -> Option<&'a str> {
match &query.delimiter {
@@ -766,6 +955,8 @@ fn key_after_prefix(pfx: &str) -> Option<String> {
#[cfg(test)]
mod tests {
use super::*;
+ use garage_model::version_table::*;
+ use garage_util::*;
use std::iter::FromIterator;
const TS: u64 = 1641394898314;
@@ -1014,4 +1205,179 @@ mod tests {
Ok(())
}
+
+ fn version() -> Version {
+ let uuid = Uuid::from([0x08; 32]);
+
+ let blocks = vec![
+ (
+ VersionBlockKey {
+ part_number: 1,
+ offset: 1,
+ },
+ VersionBlock {
+ hash: uuid,
+ size: 3,
+ },
+ ),
+ (
+ VersionBlockKey {
+ part_number: 1,
+ offset: 2,
+ },
+ VersionBlock {
+ hash: uuid,
+ size: 2,
+ },
+ ),
+ (
+ VersionBlockKey {
+ part_number: 2,
+ offset: 1,
+ },
+ VersionBlock {
+ hash: uuid,
+ size: 8,
+ },
+ ),
+ (
+ VersionBlockKey {
+ part_number: 5,
+ offset: 1,
+ },
+ VersionBlock {
+ hash: uuid,
+ size: 7,
+ },
+ ),
+ (
+ VersionBlockKey {
+ part_number: 8,
+ offset: 1,
+ },
+ VersionBlock {
+ hash: uuid,
+ size: 5,
+ },
+ ),
+ ];
+ let etags = vec![
+ (1, "etag1".to_string()),
+ (3, "etag2".to_string()),
+ (5, "etag3".to_string()),
+ (8, "etag4".to_string()),
+ (9, "etag5".to_string()),
+ ];
+
+ Version {
+ bucket_id: uuid,
+ key: "a".to_string(),
+ uuid: uuid,
+ deleted: false.into(),
+ blocks: crdt::Map::<VersionBlockKey, VersionBlock>::from_iter(blocks),
+ parts_etags: crdt::Map::<u64, String>::from_iter(etags),
+ }
+ }
+
+ fn obj() -> Object {
+ Object::new(bucket(), "d".to_string(), vec![objup_version([0x08; 32])])
+ }
+
+ #[test]
+ fn test_fetch_part_info() -> Result<(), Error> {
+ let uuid = Uuid::from([0x08; 32]);
+ let mut query = ListPartsQuery {
+ bucket_name: "a".to_string(),
+ bucket_id: uuid,
+ key: "a".to_string(),
+ upload_id: "xx".to_string(),
+ part_number_marker: None,
+ max_parts: 2,
+ };
+
+ assert!(
+ fetch_part_info(&query, None, None, uuid).is_err(),
+ "No object and version should fail"
+ );
+ assert!(
+ fetch_part_info(&query, Some(obj()), None, uuid).is_err(),
+ "No version should faild"
+ );
+ assert!(
+ fetch_part_info(&query, None, Some(version()), uuid).is_err(),
+ "No object should fail"
+ );
+
+ // Start from the beginning but with limited size to trigger pagination
+ let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?;
+ assert_eq!(pagination.unwrap(), 5);
+ assert_eq!(
+ info,
+ vec![
+ PartInfo {
+ etag: "etag1".to_string(),
+ timestamp: TS,
+ part_number: 1,
+ size: 5
+ },
+ PartInfo {
+ etag: "etag3".to_string(),
+ timestamp: TS,
+ part_number: 5,
+ size: 7
+ },
+ ]
+ );
+
+ // Use previous pagination to make a new request
+ query.part_number_marker = Some(pagination.unwrap());
+ let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?;
+ assert!(pagination.is_none());
+ assert_eq!(
+ info,
+ vec![PartInfo {
+ etag: "etag4".to_string(),
+ timestamp: TS,
+ part_number: 8,
+ size: 5
+ },]
+ );
+
+ // Trying to access a part that is way larger than registered ones
+ query.part_number_marker = Some(9999);
+ let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?;
+ assert!(pagination.is_none());
+ assert_eq!(info, vec![]);
+
+ // Try without any limitation
+ query.max_parts = 1000;
+ query.part_number_marker = None;
+ let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?;
+ assert!(pagination.is_none());
+ assert_eq!(
+ info,
+ vec![
+ PartInfo {
+ etag: "etag1".to_string(),
+ timestamp: TS,
+ part_number: 1,
+ size: 5
+ },
+ PartInfo {
+ etag: "etag3".to_string(),
+ timestamp: TS,
+ part_number: 5,
+ size: 7
+ },
+ PartInfo {
+ etag: "etag4".to_string(),
+ timestamp: TS,
+ part_number: 8,
+ size: 5
+ },
+ ]
+ );
+
+ Ok(())
+ }
}