From 82e75c0e296c74c374f3d40feeb1aadcb58398f0 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 3 May 2023 12:02:59 +0200 Subject: Adapt S3 API code to use new multipart upload models - Create and PutPart - completemultipartupload - upload part copy - list_parts --- src/api/s3/api_server.rs | 1 + src/api/s3/copy.rs | 121 ++++++++------ src/api/s3/get.rs | 6 +- src/api/s3/list.rs | 177 ++++++++------------ src/api/s3/mod.rs | 1 + src/api/s3/multipart.rs | 422 +++++++++++++++++++++++++++++++++++++++++++++++ src/api/s3/put.rs | 402 +++----------------------------------------- 7 files changed, 584 insertions(+), 546 deletions(-) create mode 100644 src/api/s3/multipart.rs (limited to 'src/api/s3') diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 27837297..7c23de19 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -27,6 +27,7 @@ use crate::s3::cors::*; use crate::s3::delete::*; use crate::s3::get::*; use crate::s3::list::*; +use crate::s3::multipart::*; use crate::s3::post_object::handle_post_object; use crate::s3::put::*; use crate::s3::router::Endpoint; diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 7eb6459d..68b4f0c9 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -2,7 +2,7 @@ use std::pin::Pin; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use futures::{stream, stream::Stream, StreamExt, TryFutureExt}; +use futures::{stream, stream::Stream, StreamExt}; use md5::{Digest as Md5Digest, Md5}; use bytes::Bytes; @@ -18,12 +18,14 @@ use garage_util::time::*; use garage_model::garage::Garage; use garage_model::key_table::Key; use garage_model::s3::block_ref_table::*; +use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; use crate::helpers::parse_bucket_key; use crate::s3::error::*; -use crate::s3::put::{decode_upload_id, get_headers}; +use crate::s3::multipart; +use crate::s3::put::get_headers; use crate::s3::xml::{self as s3_xml, xmlns_tag}; pub async fn handle_copy( @@ -92,7 +94,10 @@ pub async fn handle_copy( let tmp_dest_object_version = ObjectVersion { uuid: new_uuid, timestamp: new_timestamp, - state: ObjectVersionState::Uploading(new_meta.headers.clone()), + state: ObjectVersionState::Uploading { + headers: new_meta.headers.clone(), + multipart: false, + }, }; let tmp_dest_object = Object::new( dest_bucket_id, @@ -105,8 +110,14 @@ pub async fn handle_copy( // this means that the BlockRef entries linked to this version cannot be // marked as deleted (they are marked as deleted only if the Version // doesn't exist or is marked as deleted). - let mut dest_version = - Version::new(new_uuid, dest_bucket_id, dest_key.to_string(), false); + let mut dest_version = Version::new( + new_uuid, + VersionBacklink::Object { + bucket_id: dest_bucket_id, + key: dest_key.to_string(), + }, + false, + ); garage.version_table.insert(&dest_version).await?; // Fill in block list for version and insert block refs @@ -179,17 +190,13 @@ pub async fn handle_upload_part_copy( ) -> Result, Error> { let copy_precondition = CopyPreconditionHeaders::parse(req)?; - let dest_version_uuid = decode_upload_id(upload_id)?; + let dest_upload_id = multipart::decode_upload_id(upload_id)?; let dest_key = dest_key.to_string(); - let (source_object, dest_object) = futures::try_join!( + let (source_object, (_, _, mut dest_mpu)) = futures::try_join!( get_copy_source(&garage, api_key, req), - garage - .object_table - .get(&dest_bucket_id, &dest_key) - .map_err(Error::from), + multipart::get_upload(&garage, &dest_bucket_id, &dest_key, &dest_upload_id) )?; - let dest_object = dest_object.ok_or(Error::NoSuchKey)?; let (source_object_version, source_version_data, source_version_meta) = extract_source_info(&source_object)?; @@ -217,15 +224,6 @@ pub async fn handle_upload_part_copy( }, }; - // Check destination version is indeed in uploading state - if !dest_object - .versions() - .iter() - .any(|v| v.uuid == dest_version_uuid && v.is_uploading()) - { - return Err(Error::NoSuchUpload); - } - // Check source version is not inlined match source_version_data { ObjectVersionData::DeleteMarker => unreachable!(), @@ -242,23 +240,11 @@ pub async fn handle_upload_part_copy( // Fetch source versin with its block list, // and destination version to check part hasn't yet been uploaded - let (source_version, dest_version) = futures::try_join!( - garage - .version_table - .get(&source_object_version.uuid, &EmptyKey), - garage.version_table.get(&dest_version_uuid, &EmptyKey), - )?; - let source_version = source_version.ok_or(Error::NoSuchKey)?; - - // Check this part number hasn't yet been uploaded - if let Some(dv) = dest_version { - if dv.has_part_number(part_number) { - return Err(Error::bad_request(format!( - "Part number {} has already been uploaded", - part_number - ))); - } - } + let source_version = garage + .version_table + .get(&source_object_version.uuid, &EmptyKey) + .await? + .ok_or(Error::NoSuchKey)?; // We want to reuse blocks from the source version as much as possible. // However, we still need to get the data from these blocks @@ -299,6 +285,33 @@ pub async fn handle_upload_part_copy( current_offset = block_end; } + // Calculate the identity of destination part: timestamp, version id + let dest_version_id = gen_uuid(); + let dest_mpu_part_key = MpuPartKey { + part_number, + timestamp: dest_mpu.next_timestamp(part_number), + }; + + // Create the uploaded part + dest_mpu.parts.clear(); + dest_mpu.parts.put( + dest_mpu_part_key, + MpuPart { + version: dest_version_id, + etag: None, + size: None, + }, + ); + garage.mpu_table.insert(&dest_mpu).await?; + + let mut dest_version = Version::new( + dest_version_id, + VersionBacklink::MultipartUpload { + upload_id: dest_upload_id, + }, + false, + ); + // Now, actually copy the blocks let mut md5hasher = Md5::new(); @@ -348,8 +361,8 @@ pub async fn handle_upload_part_copy( let must_upload = existing_block_hash.is_none(); let final_hash = existing_block_hash.unwrap_or_else(|| blake2sum(&data[..])); - let mut version = Version::new(dest_version_uuid, dest_bucket_id, dest_key.clone(), false); - version.blocks.put( + dest_version.blocks.clear(); + dest_version.blocks.put( VersionBlockKey { part_number, offset: current_offset, @@ -363,7 +376,7 @@ pub async fn handle_upload_part_copy( let block_ref = BlockRef { block: final_hash, - version: dest_version_uuid, + version: dest_version_id, deleted: false.into(), }; @@ -378,23 +391,33 @@ pub async fn handle_upload_part_copy( Ok(()) } }, - // Thing 2: we need to insert the block in the version - garage.version_table.insert(&version), - // Thing 3: we need to add a block reference - garage.block_ref_table.insert(&block_ref), + async { + // Thing 2: we need to insert the block in the version + garage.version_table.insert(&dest_version).await?; + // Thing 3: we need to add a block reference + garage.block_ref_table.insert(&block_ref).await + }, // Thing 4: we need to prefetch the next block defragmenter.next(), )?; - next_block = res.3; + next_block = res.2; } + assert_eq!(current_offset, source_range.length); + let data_md5sum = md5hasher.finalize(); let etag = hex::encode(data_md5sum); // Put the part's ETag in the Versiontable - let mut version = Version::new(dest_version_uuid, dest_bucket_id, dest_key.clone(), false); - version.parts_etags.put(part_number, etag.clone()); - garage.version_table.insert(&version).await?; + dest_mpu.parts.put( + dest_mpu_part_key, + MpuPart { + version: dest_version_id, + etag: Some(etag.clone()), + size: Some(current_offset), + }, + ); + garage.mpu_table.insert(&dest_mpu).await?; // LGTM let resp_xml = s3_xml::to_xml_with_header(&CopyPartResult { diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index 2a99551a..aa391745 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -149,7 +149,6 @@ pub async fn handle_head( let (part_offset, part_end) = calculate_part_bounds(&version, pn).ok_or(Error::InvalidPart)?; - let n_parts = version.parts_etags.items().len(); Ok(object_headers(object_version, version_meta) .header(CONTENT_LENGTH, format!("{}", part_end - part_offset)) @@ -162,7 +161,7 @@ pub async fn handle_head( version_meta.size ), ) - .header(X_AMZ_MP_PARTS_COUNT, format!("{}", n_parts)) + .header(X_AMZ_MP_PARTS_COUNT, format!("{}", version.n_parts()?)) .status(StatusCode::PARTIAL_CONTENT) .body(Body::empty())?) } @@ -376,7 +375,6 @@ async fn handle_get_part( let (begin, end) = calculate_part_bounds(&version, part_number).ok_or(Error::InvalidPart)?; - let n_parts = version.parts_etags.items().len(); let body = body_from_blocks_range(garage, version.blocks.items(), begin, end); @@ -386,7 +384,7 @@ async fn handle_get_part( CONTENT_RANGE, format!("bytes {}-{}/{}", begin, end - 1, version_meta.size), ) - .header(X_AMZ_MP_PARTS_COUNT, format!("{}", n_parts)) + .header(X_AMZ_MP_PARTS_COUNT, format!("{}", version.n_parts()?)) .body(body)?) } _ => unreachable!(), diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs index 5cb0d65a..5a9eb133 100644 --- a/src/api/s3/list.rs +++ b/src/api/s3/list.rs @@ -1,4 +1,3 @@ -use std::cmp::Ordering; use std::collections::{BTreeMap, BTreeSet}; use std::iter::{Iterator, Peekable}; use std::sync::Arc; @@ -11,15 +10,15 @@ use garage_util::error::Error as GarageError; use garage_util::time::*; use garage_model::garage::Garage; +use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; -use garage_model::s3::version_table::Version; -use garage_table::{EmptyKey, EnumerationOrder}; +use garage_table::EnumerationOrder; use crate::encoding::*; use crate::helpers::key_after_prefix; use crate::s3::error::*; -use crate::s3::put as s3_put; +use crate::s3::multipart as s3_multipart; use crate::s3::xml as s3_xml; const DUMMY_NAME: &str = "Dummy Key"; @@ -176,7 +175,9 @@ pub async fn handle_list_multipart_upload( t.get_range( &bucket, key, - Some(ObjectFilter::IsUploading), + Some(ObjectFilter::IsUploading { + check_multipart: Some(true), + }), count, EnumerationOrder::Forward, ) @@ -272,23 +273,23 @@ pub async fn handle_list_parts( ) -> Result, Error> { debug!("ListParts {:?}", query); - let upload_id = s3_put::decode_upload_id(&query.upload_id)?; + let upload_id = s3_multipart::decode_upload_id(&query.upload_id)?; - let (object, version) = futures::try_join!( - garage.object_table.get(&query.bucket_id, &query.key), - garage.version_table.get(&upload_id, &EmptyKey), - )?; + let (_, _, mpu) = + s3_multipart::get_upload(&garage, &query.bucket_id, &query.key, &upload_id).await?; - let (info, next) = fetch_part_info(query, object, version, upload_id)?; + let (info, next) = fetch_part_info(query, &mpu)?; let result = s3_xml::ListPartsResult { xmlns: (), + // Query parameters bucket: s3_xml::Value(query.bucket_name.to_string()), key: s3_xml::Value(query.key.to_string()), upload_id: s3_xml::Value(query.upload_id.to_string()), part_number_marker: query.part_number_marker.map(|e| s3_xml::IntValue(e as i64)), - next_part_number_marker: next.map(|e| s3_xml::IntValue(e as i64)), max_parts: s3_xml::IntValue(query.max_parts as i64), + // Result values + next_part_number_marker: next.map(|e| s3_xml::IntValue(e as i64)), is_truncated: s3_xml::Value(next.map(|_| "true").unwrap_or("false").to_string()), parts: info .iter() @@ -299,6 +300,7 @@ pub async fn handle_list_parts( size: s3_xml::IntValue(part.size as i64), }) .collect(), + // Dummy result values (unsupported features) initiator: s3_xml::Initiator { display_name: s3_xml::Value(DUMMY_NAME.to_string()), id: s3_xml::Value(DUMMY_KEY.to_string()), @@ -335,8 +337,8 @@ struct UploadInfo { } #[derive(Debug, PartialEq)] -struct PartInfo { - etag: String, +struct PartInfo<'a> { + etag: &'a str, timestamp: u64, part_number: u64, size: u64, @@ -456,106 +458,52 @@ where } } -fn fetch_part_info( +fn fetch_part_info<'a>( query: &ListPartsQuery, - object: Option, - version: Option, - upload_id: Uuid, -) -> Result<(Vec, Option), Error> { - // Check results - let object = object.ok_or(Error::NoSuchKey)?; - - let obj_version = object - .versions() - .iter() - .find(|v| v.uuid == upload_id && v.is_uploading()) - .ok_or(Error::NoSuchUpload)?; - - let version = version.ok_or(Error::NoSuchKey)?; - - // Cut the beginning of our 2 vectors if required - let (etags, blocks) = match &query.part_number_marker { - Some(marker) => { - let next = marker + 1; - - let part_idx = into_ok_or_err( - version - .parts_etags - .items() - .binary_search_by(|(part_num, _)| part_num.cmp(&next)), - ); - let parts = &version.parts_etags.items()[part_idx..]; - - let block_idx = into_ok_or_err( - version - .blocks - .items() - .binary_search_by(|(vkey, _)| vkey.part_number.cmp(&next)), - ); - let blocks = &version.blocks.items()[block_idx..]; - - (parts, blocks) - } - None => (version.parts_etags.items(), version.blocks.items()), - }; - - // Use the block vector to compute a (part_number, size) vector - let mut size = Vec::<(u64, u64)>::new(); - blocks.iter().for_each(|(key, val)| { - let mut new_size = val.size; - match size.pop() { - Some((part_number, size)) if part_number == key.part_number => new_size += size, - Some(v) => size.push(v), - None => (), - } - size.push((key.part_number, new_size)) - }); - - // Merge the etag vector and size vector to build a PartInfo vector - let max_parts = query.max_parts as usize; - let (mut etag_iter, mut size_iter) = (etags.iter().peekable(), size.iter().peekable()); - - let mut info = Vec::::with_capacity(max_parts); - - while info.len() < max_parts { - match (etag_iter.peek(), size_iter.peek()) { - (Some((ep, etag)), Some((sp, size))) => match ep.cmp(sp) { - Ordering::Less => { - debug!("ETag information ignored due to missing corresponding block information. Query: {:?}", query); - etag_iter.next(); - } - Ordering::Equal => { - info.push(PartInfo { - etag: etag.to_string(), - timestamp: obj_version.timestamp, - part_number: *ep, - size: *size, - }); - etag_iter.next(); - size_iter.next(); + mpu: &'a MultipartUpload, +) -> Result<(Vec>, Option), Error> { + // Parse multipart upload part list, removing parts not yet finished + // and failed part uploads that were overwritten + let mut parts: Vec> = Vec::with_capacity(mpu.parts.items().len()); + for (pk, p) in mpu.parts.items().iter() { + if let (Some(etag), Some(size)) = (&p.etag, p.size) { + let part_info = PartInfo { + part_number: pk.part_number, + timestamp: pk.timestamp, + etag, + size, + }; + match parts.last_mut() { + Some(lastpart) if lastpart.part_number == pk.part_number => { + *lastpart = part_info; } - Ordering::Greater => { - debug!("Block information ignored due to missing corresponding ETag information. Query: {:?}", query); - size_iter.next(); + _ => { + parts.push(part_info); } - }, - (None, None) => return Ok((info, None)), - _ => { - debug!( - "Additional block or ETag information ignored. Query: {:?}", - query - ); - return Ok((info, None)); } } } - match info.last() { + // Cut the beginning and end + match &query.part_number_marker { + Some(marker) => { + let next = marker + 1; + let part_idx = + into_ok_or_err(parts.binary_search_by(|part| part.part_number.cmp(&next))); + parts.truncate(part_idx + query.max_parts as usize); + parts = parts.split_off(part_idx); + } + None => { + parts.truncate(query.max_parts as usize); + } + }; + + match parts.last() { Some(part_info) => { let pagination = Some(part_info.part_number); - Ok((info, pagination)) + Ok((parts, pagination)) } - None => Ok((info, None)), + None => Ok((parts, None)), } } @@ -651,7 +599,7 @@ impl ListMultipartUploadsQuery { }), uuid => Ok(RangeBegin::AfterUpload { key: key_marker.to_string(), - upload: s3_put::decode_upload_id(uuid)?, + upload: s3_multipart::decode_upload_id(uuid)?, }), }, @@ -843,7 +791,7 @@ impl ExtractAccumulator for UploadAccumulator { let mut uploads_for_key = object .versions() .iter() - .filter(|x| x.is_uploading()) + .filter(|x| x.is_uploading(Some(true))) .collect::>(); // S3 logic requires lexicographically sorted upload ids. @@ -991,10 +939,13 @@ mod tests { ObjectVersion { uuid: Uuid::from(uuid), timestamp: TS, - state: ObjectVersionState::Uploading(ObjectVersionHeaders { - content_type: "text/plain".to_string(), - other: BTreeMap::::new(), - }), + state: ObjectVersionState::Uploading { + multipart: true, + headers: ObjectVersionHeaders { + content_type: "text/plain".to_string(), + other: BTreeMap::::new(), + }, + }, } } @@ -1233,11 +1184,13 @@ mod tests { ]; Version { - bucket_id: uuid, - key: "a".to_string(), uuid, deleted: false.into(), blocks: crdt::Map::::from_iter(blocks), + backlink: VersionBacklink::Object { + bucket_id: uuid, + key: "a".to_string(), + }, parts_etags: crdt::Map::::from_iter(etags), } } diff --git a/src/api/s3/mod.rs b/src/api/s3/mod.rs index 7b56d4d8..b5237bf7 100644 --- a/src/api/s3/mod.rs +++ b/src/api/s3/mod.rs @@ -7,6 +7,7 @@ pub mod cors; mod delete; pub mod get; mod list; +mod multipart; mod post_object; mod put; mod website; diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs new file mode 100644 index 00000000..ecd7a212 --- /dev/null +++ b/src/api/s3/multipart.rs @@ -0,0 +1,422 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use futures::prelude::*; +use hyper::body::Body; +use hyper::{Request, Response}; +use md5::{Digest as Md5Digest, Md5}; + +use garage_table::*; +use garage_util::async_hash::*; +use garage_util::data::*; +use garage_util::time::*; + +use garage_model::bucket_table::Bucket; +use garage_model::garage::Garage; +use garage_model::s3::block_ref_table::*; +use garage_model::s3::mpu_table::*; +use garage_model::s3::object_table::*; +use garage_model::s3::version_table::*; + +use crate::s3::error::*; +use crate::s3::put::*; +use crate::s3::xml as s3_xml; +use crate::signature::verify_signed_content; + +// ---- + +pub async fn handle_create_multipart_upload( + garage: Arc, + req: &Request, + bucket_name: &str, + bucket_id: Uuid, + key: &str, +) -> Result, Error> { + let upload_id = gen_uuid(); + let headers = get_headers(req.headers())?; + + // Create object in object table + let object_version = ObjectVersion { + uuid: upload_id, + timestamp: now_msec(), + state: ObjectVersionState::Uploading { + multipart: true, + headers, + }, + }; + let object = Object::new(bucket_id, key.to_string(), vec![object_version]); + garage.object_table.insert(&object).await?; + + // Create multipart upload in mpu table + // This multipart upload will hold references to uploaded parts + // (which are entries in the Version table) + let mpu = MultipartUpload::new(upload_id, bucket_id, key.into(), false); + garage.mpu_table.insert(&mpu).await?; + + // Send success response + let result = s3_xml::InitiateMultipartUploadResult { + xmlns: (), + bucket: s3_xml::Value(bucket_name.to_string()), + key: s3_xml::Value(key.to_string()), + upload_id: s3_xml::Value(hex::encode(upload_id)), + }; + let xml = s3_xml::to_xml_with_header(&result)?; + + Ok(Response::new(Body::from(xml.into_bytes()))) +} + +pub async fn handle_put_part( + garage: Arc, + req: Request, + bucket_id: Uuid, + key: &str, + part_number: u64, + upload_id: &str, + content_sha256: Option, +) -> Result, Error> { + let upload_id = decode_upload_id(upload_id)?; + + let content_md5 = match req.headers().get("content-md5") { + Some(x) => Some(x.to_str()?.to_string()), + None => None, + }; + + // Read first chuck, and at the same time try to get object to see if it exists + let key = key.to_string(); + + let body = req.into_body().map_err(Error::from); + let mut chunker = StreamChunker::new(body, garage.config.block_size); + + let ((_, _, mut mpu), first_block) = futures::try_join!( + get_upload(&garage, &bucket_id, &key, &upload_id), + chunker.next(), + )?; + + // Check object is valid and part can be accepted + let first_block = first_block.ok_or_bad_request("Empty body")?; + + // Calculate part identity: timestamp, version id + let version_id = gen_uuid(); + let mpu_part_key = MpuPartKey { + part_number, + timestamp: mpu.next_timestamp(part_number), + }; + + // Create version and link version from MPU + mpu.parts.clear(); + mpu.parts.put( + mpu_part_key, + MpuPart { + version: version_id, + etag: None, + size: None, + }, + ); + garage.mpu_table.insert(&mpu).await?; + + let version = Version::new( + version_id, + VersionBacklink::MultipartUpload { upload_id }, + false, + ); + garage.version_table.insert(&version).await?; + + // Copy data to version + let first_block_hash = async_blake2sum(first_block.clone()).await; + + let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks( + &garage, + &version, + part_number, + first_block, + first_block_hash, + &mut chunker, + ) + .await?; + + // Verify that checksums map + ensure_checksum_matches( + data_md5sum.as_slice(), + data_sha256sum, + content_md5.as_deref(), + content_sha256, + )?; + + // Store part etag in version + let data_md5sum_hex = hex::encode(data_md5sum); + mpu.parts.put( + mpu_part_key, + MpuPart { + version: version_id, + etag: Some(data_md5sum_hex.clone()), + size: Some(total_size), + }, + ); + garage.mpu_table.insert(&mpu).await?; + + let response = Response::builder() + .header("ETag", format!("\"{}\"", data_md5sum_hex)) + .body(Body::empty()) + .unwrap(); + Ok(response) +} + +pub async fn handle_complete_multipart_upload( + garage: Arc, + req: Request, + bucket_name: &str, + bucket: &Bucket, + key: &str, + upload_id: &str, + content_sha256: Option, +) -> Result, Error> { + let body = hyper::body::to_bytes(req.into_body()).await?; + + if let Some(content_sha256) = content_sha256 { + verify_signed_content(content_sha256, &body[..])?; + } + + let body_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?; + let body_list_of_parts = parse_complete_multipart_upload_body(&body_xml) + .ok_or_bad_request("Invalid CompleteMultipartUpload XML")?; + debug!( + "CompleteMultipartUpload list of parts: {:?}", + body_list_of_parts + ); + + let upload_id = decode_upload_id(upload_id)?; + + // Get object and multipart upload + let key = key.to_string(); + let (_, mut object_version, mpu) = get_upload(&garage, &bucket.id, &key, &upload_id).await?; + + if mpu.parts.is_empty() { + return Err(Error::bad_request("No data was uploaded")); + } + + let headers = match object_version.state { + ObjectVersionState::Uploading { headers, .. } => headers, + _ => unreachable!(), + }; + + // Check that part numbers are an increasing sequence. + // (it doesn't need to start at 1 nor to be a continuous sequence, + // see discussion in #192) + if body_list_of_parts.is_empty() { + return Err(Error::EntityTooSmall); + } + if !body_list_of_parts + .iter() + .zip(body_list_of_parts.iter().skip(1)) + .all(|(p1, p2)| p1.part_number < p2.part_number) + { + return Err(Error::InvalidPartOrder); + } + + // Check that the list of parts they gave us corresponds to parts we have here + debug!("Parts stored in multipart upload: {:?}", mpu.parts.items()); + let mut have_parts = HashMap::new(); + for (pk, pv) in mpu.parts.items().iter() { + have_parts.insert(pk.part_number, pv); + } + let mut parts = vec![]; + for req_part in body_list_of_parts.iter() { + match have_parts.get(&req_part.part_number) { + Some(part) if part.etag.as_ref() == Some(&req_part.etag) && part.size.is_some() => { + parts.push(*part) + } + _ => return Err(Error::InvalidPart), + } + } + + let grg = &garage; + let parts_versions = futures::future::try_join_all(parts.iter().map(|p| async move { + grg.version_table + .get(&p.version, &EmptyKey) + .await? + .ok_or_internal_error("Part version missing from version table") + })) + .await?; + + // Create final version and block refs + let mut final_version = Version::new( + upload_id, + VersionBacklink::Object { + bucket_id: bucket.id, + key: key.to_string(), + }, + false, + ); + for (part_number, part_version) in parts_versions.iter().enumerate() { + if part_version.deleted.get() { + return Err(Error::InvalidPart); + } + for (vbk, vb) in part_version.blocks.items().iter() { + final_version.blocks.put( + VersionBlockKey { + part_number: part_number as u64, + offset: vbk.offset, + }, + *vb, + ); + } + } + garage.version_table.insert(&final_version).await?; + + let block_refs = final_version.blocks.items().iter().map(|(_, b)| BlockRef { + block: b.hash, + version: upload_id, + deleted: false.into(), + }); + garage.block_ref_table.insert_many(block_refs).await?; + + // Calculate etag of final object + // To understand how etags are calculated, read more here: + // https://teppen.io/2018/06/23/aws_s3_etags/ + let mut etag_md5_hasher = Md5::new(); + for part in parts.iter() { + etag_md5_hasher.update(part.etag.as_ref().unwrap().as_bytes()); + } + let etag = format!( + "{}-{}", + hex::encode(etag_md5_hasher.finalize()), + parts.len() + ); + + // Calculate total size of final object + let total_size = parts.iter().map(|x| x.size.unwrap()).sum(); + + if let Err(e) = check_quotas(&garage, bucket, &key, total_size).await { + object_version.state = ObjectVersionState::Aborted; + let final_object = Object::new(bucket.id, key.clone(), vec![object_version]); + garage.object_table.insert(&final_object).await?; + + return Err(e); + } + + // Write final object version + object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( + ObjectVersionMeta { + headers, + size: total_size, + etag: etag.clone(), + }, + final_version.blocks.items()[0].1.hash, + )); + + let final_object = Object::new(bucket.id, key.clone(), vec![object_version]); + garage.object_table.insert(&final_object).await?; + + // Send response saying ok we're done + let result = s3_xml::CompleteMultipartUploadResult { + xmlns: (), + location: None, + bucket: s3_xml::Value(bucket_name.to_string()), + key: s3_xml::Value(key), + etag: s3_xml::Value(format!("\"{}\"", etag)), + }; + let xml = s3_xml::to_xml_with_header(&result)?; + + Ok(Response::new(Body::from(xml.into_bytes()))) +} + +pub async fn handle_abort_multipart_upload( + garage: Arc, + bucket_id: Uuid, + key: &str, + upload_id: &str, +) -> Result, Error> { + let upload_id = decode_upload_id(upload_id)?; + + let (_, mut object_version, _) = + get_upload(&garage, &bucket_id, &key.to_string(), &upload_id).await?; + + object_version.state = ObjectVersionState::Aborted; + let final_object = Object::new(bucket_id, key.to_string(), vec![object_version]); + garage.object_table.insert(&final_object).await?; + + Ok(Response::new(Body::from(vec![]))) +} + +// ======== helpers ============ + +pub(crate) async fn get_upload( + garage: &Garage, + bucket_id: &Uuid, + key: &String, + upload_id: &Uuid, +) -> Result<(Object, ObjectVersion, MultipartUpload), Error> { + let (object, mpu) = futures::try_join!( + garage + .object_table + .get(&bucket_id, &key) + .map_err(Error::from), + garage + .mpu_table + .get(&upload_id, &EmptyKey) + .map_err(Error::from), + )?; + + let object = object.ok_or(Error::NoSuchUpload)?; + let mpu = mpu.ok_or(Error::NoSuchUpload)?; + + let object_version = object + .versions() + .iter() + .find(|v| v.uuid == *upload_id && v.is_uploading(Some(true))) + .ok_or(Error::NoSuchUpload)? + .clone(); + + Ok((object, object_version, mpu)) +} + +pub fn decode_upload_id(id: &str) -> Result { + let id_bin = hex::decode(id).map_err(|_| Error::NoSuchUpload)?; + if id_bin.len() != 32 { + return Err(Error::NoSuchUpload); + } + let mut uuid = [0u8; 32]; + uuid.copy_from_slice(&id_bin[..]); + Ok(Uuid::from(uuid)) +} + +#[derive(Debug)] +struct CompleteMultipartUploadPart { + etag: String, + part_number: u64, +} + +fn parse_complete_multipart_upload_body( + xml: &roxmltree::Document, +) -> Option> { + let mut parts = vec![]; + + let root = xml.root(); + let cmu = root.first_child()?; + if !cmu.has_tag_name("CompleteMultipartUpload") { + return None; + } + + for item in cmu.children() { + // Only parse nodes + if !item.is_element() { + continue; + } + + if item.has_tag_name("Part") { + let etag = item.children().find(|e| e.has_tag_name("ETag"))?.text()?; + let part_number = item + .children() + .find(|e| e.has_tag_name("PartNumber"))? + .text()?; + parts.push(CompleteMultipartUploadPart { + etag: etag.trim_matches('"').to_string(), + part_number: part_number.parse().ok()?, + }); + } else { + return None; + } + } + + Some(parts) +} diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 350ab884..804e1087 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, BTreeSet, HashMap}; +use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use base64::prelude::*; @@ -30,8 +30,6 @@ use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; use crate::s3::error::*; -use crate::s3::xml as s3_xml; -use crate::signature::verify_signed_content; pub async fn handle_put( garage: Arc, @@ -136,7 +134,10 @@ pub(crate) async fn save_stream> + Unpin>( let mut object_version = ObjectVersion { uuid: version_uuid, timestamp: version_timestamp, - state: ObjectVersionState::Uploading(headers.clone()), + state: ObjectVersionState::Uploading { + headers: headers.clone(), + multipart: false, + }, }; let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]); garage.object_table.insert(&object).await?; @@ -145,7 +146,14 @@ pub(crate) async fn save_stream> + Unpin>( // Write this entry now, even with empty block list, // to prevent block_ref entries from being deleted (they can be deleted // if the reference a version that isn't found in the version table) - let version = Version::new(version_uuid, bucket.id, key.into(), false); + let version = Version::new( + version_uuid, + VersionBacklink::Object { + bucket_id: bucket.id, + key: key.into(), + }, + false, + ); garage.version_table.insert(&version).await?; // Transfer data and verify checksum @@ -192,7 +200,7 @@ pub(crate) async fn save_stream> + Unpin>( /// Validate MD5 sum against content-md5 header /// and sha256sum against signed content-sha256 -fn ensure_checksum_matches( +pub(crate) fn ensure_checksum_matches( data_md5sum: &[u8], data_sha256sum: garage_util::data::FixedBytes32, content_md5: Option<&str>, @@ -218,7 +226,7 @@ fn ensure_checksum_matches( } /// Check that inserting this object with this size doesn't exceed bucket quotas -async fn check_quotas( +pub(crate) async fn check_quotas( garage: &Arc, bucket: &Bucket, key: &str, @@ -275,7 +283,7 @@ async fn check_quotas( Ok(()) } -async fn read_and_put_blocks> + Unpin>( +pub(crate) async fn read_and_put_blocks> + Unpin>( garage: &Garage, version: &Version, part_number: u64, @@ -381,7 +389,7 @@ async fn put_block_meta( Ok(()) } -struct StreamChunker>> { +pub(crate) struct StreamChunker>> { stream: S, read_all: bool, block_size: usize, @@ -389,7 +397,7 @@ struct StreamChunker>> { } impl> + Unpin> StreamChunker { - fn new(stream: S, block_size: usize) -> Self { + pub(crate) fn new(stream: S, block_size: usize) -> Self { Self { stream, read_all: false, @@ -398,7 +406,7 @@ impl> + Unpin> StreamChunker { } } - async fn next(&mut self) -> Result, Error> { + pub(crate) async fn next(&mut self) -> Result, Error> { while !self.read_all && self.buf.len() < self.block_size { if let Some(block) = self.stream.next().await { let bytes = block?; @@ -450,326 +458,9 @@ impl Drop for InterruptedCleanup { } } -// ---- - -pub async fn handle_create_multipart_upload( - garage: Arc, - req: &Request, - bucket_name: &str, - bucket_id: Uuid, - key: &str, -) -> Result, Error> { - let version_uuid = gen_uuid(); - let headers = get_headers(req.headers())?; - - // Create object in object table - let object_version = ObjectVersion { - uuid: version_uuid, - timestamp: now_msec(), - state: ObjectVersionState::Uploading(headers), - }; - let object = Object::new(bucket_id, key.to_string(), vec![object_version]); - garage.object_table.insert(&object).await?; - - // Insert empty version so that block_ref entries refer to something - // (they are inserted concurrently with blocks in the version table, so - // there is the possibility that they are inserted before the version table - // is created, in which case it is allowed to delete them, e.g. in repair_*) - let version = Version::new(version_uuid, bucket_id, key.into(), false); - garage.version_table.insert(&version).await?; - - // Send success response - let result = s3_xml::InitiateMultipartUploadResult { - xmlns: (), - bucket: s3_xml::Value(bucket_name.to_string()), - key: s3_xml::Value(key.to_string()), - upload_id: s3_xml::Value(hex::encode(version_uuid)), - }; - let xml = s3_xml::to_xml_with_header(&result)?; - - Ok(Response::new(Body::from(xml.into_bytes()))) -} - -pub async fn handle_put_part( - garage: Arc, - req: Request, - bucket_id: Uuid, - key: &str, - part_number: u64, - upload_id: &str, - content_sha256: Option, -) -> Result, Error> { - let version_uuid = decode_upload_id(upload_id)?; - - let content_md5 = match req.headers().get("content-md5") { - Some(x) => Some(x.to_str()?.to_string()), - None => None, - }; - - // Read first chuck, and at the same time try to get object to see if it exists - let key = key.to_string(); - - let body = req.into_body().map_err(Error::from); - let mut chunker = StreamChunker::new(body, garage.config.block_size); +// ============ helpers ============ - let (object, version, first_block) = futures::try_join!( - garage - .object_table - .get(&bucket_id, &key) - .map_err(Error::from), - garage - .version_table - .get(&version_uuid, &EmptyKey) - .map_err(Error::from), - chunker.next(), - )?; - - // Check object is valid and multipart block can be accepted - let first_block = first_block.ok_or_bad_request("Empty body")?; - let object = object.ok_or_bad_request("Object not found")?; - - if !object - .versions() - .iter() - .any(|v| v.uuid == version_uuid && v.is_uploading()) - { - return Err(Error::NoSuchUpload); - } - - // Check part hasn't already been uploaded - if let Some(v) = version { - if v.has_part_number(part_number) { - return Err(Error::bad_request(format!( - "Part number {} has already been uploaded", - part_number - ))); - } - } - - // Copy block to store - let version = Version::new(version_uuid, bucket_id, key, false); - - let first_block_hash = async_blake2sum(first_block.clone()).await; - - let (_, data_md5sum, data_sha256sum) = read_and_put_blocks( - &garage, - &version, - part_number, - first_block, - first_block_hash, - &mut chunker, - ) - .await?; - - // Verify that checksums map - ensure_checksum_matches( - data_md5sum.as_slice(), - data_sha256sum, - content_md5.as_deref(), - content_sha256, - )?; - - // Store part etag in version - let data_md5sum_hex = hex::encode(data_md5sum); - let mut version = version; - version - .parts_etags - .put(part_number, data_md5sum_hex.clone()); - garage.version_table.insert(&version).await?; - - let response = Response::builder() - .header("ETag", format!("\"{}\"", data_md5sum_hex)) - .body(Body::empty()) - .unwrap(); - Ok(response) -} - -pub async fn handle_complete_multipart_upload( - garage: Arc, - req: Request, - bucket_name: &str, - bucket: &Bucket, - key: &str, - upload_id: &str, - content_sha256: Option, -) -> Result, Error> { - let body = hyper::body::to_bytes(req.into_body()).await?; - - if let Some(content_sha256) = content_sha256 { - verify_signed_content(content_sha256, &body[..])?; - } - - let body_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?; - let body_list_of_parts = parse_complete_multipart_upload_body(&body_xml) - .ok_or_bad_request("Invalid CompleteMultipartUpload XML")?; - debug!( - "CompleteMultipartUpload list of parts: {:?}", - body_list_of_parts - ); - - let version_uuid = decode_upload_id(upload_id)?; - - // Get object and version - let key = key.to_string(); - let (object, version) = futures::try_join!( - garage.object_table.get(&bucket.id, &key), - garage.version_table.get(&version_uuid, &EmptyKey), - )?; - - let object = object.ok_or(Error::NoSuchKey)?; - let mut object_version = object - .versions() - .iter() - .find(|v| v.uuid == version_uuid && v.is_uploading()) - .cloned() - .ok_or(Error::NoSuchUpload)?; - - let version = version.ok_or(Error::NoSuchKey)?; - if version.blocks.is_empty() { - return Err(Error::bad_request("No data was uploaded")); - } - - let headers = match object_version.state { - ObjectVersionState::Uploading(headers) => headers, - _ => unreachable!(), - }; - - // Check that part numbers are an increasing sequence. - // (it doesn't need to start at 1 nor to be a continuous sequence, - // see discussion in #192) - if body_list_of_parts.is_empty() { - return Err(Error::EntityTooSmall); - } - if !body_list_of_parts - .iter() - .zip(body_list_of_parts.iter().skip(1)) - .all(|(p1, p2)| p1.part_number < p2.part_number) - { - return Err(Error::InvalidPartOrder); - } - - // Garage-specific restriction, see #204: part numbers must be - // consecutive starting at 1 - if body_list_of_parts[0].part_number != 1 - || !body_list_of_parts - .iter() - .zip(body_list_of_parts.iter().skip(1)) - .all(|(p1, p2)| p1.part_number + 1 == p2.part_number) - { - return Err(Error::NotImplemented("Garage does not support completing a Multipart upload with non-consecutive part numbers. This is a restriction of Garage's data model, which might be fixed in a future release. See issue #204 for more information on this topic.".into())); - } - - // Check that the list of parts they gave us corresponds to the parts we have here - debug!("Expected parts from request: {:?}", body_list_of_parts); - debug!("Parts stored in version: {:?}", version.parts_etags.items()); - let parts = version - .parts_etags - .items() - .iter() - .map(|pair| (&pair.0, &pair.1)); - let same_parts = body_list_of_parts - .iter() - .map(|x| (&x.part_number, &x.etag)) - .eq(parts); - if !same_parts { - return Err(Error::InvalidPart); - } - - // Check that all blocks belong to one of the parts - let block_parts = version - .blocks - .items() - .iter() - .map(|(bk, _)| bk.part_number) - .collect::>(); - let same_parts = body_list_of_parts - .iter() - .map(|x| x.part_number) - .eq(block_parts.into_iter()); - if !same_parts { - return Err(Error::bad_request( - "Part numbers in block list and part list do not match. This can happen if a part was partially uploaded. Please abort the multipart upload and try again." - )); - } - - // Calculate etag of final object - // To understand how etags are calculated, read more here: - // https://teppen.io/2018/06/23/aws_s3_etags/ - let num_parts = body_list_of_parts.len(); - let mut etag_md5_hasher = Md5::new(); - for (_, etag) in version.parts_etags.items().iter() { - etag_md5_hasher.update(etag.as_bytes()); - } - let etag = format!("{}-{}", hex::encode(etag_md5_hasher.finalize()), num_parts); - - // Calculate total size of final object - let total_size = version.blocks.items().iter().map(|x| x.1.size).sum(); - - if let Err(e) = check_quotas(&garage, bucket, &key, total_size).await { - object_version.state = ObjectVersionState::Aborted; - let final_object = Object::new(bucket.id, key.clone(), vec![object_version]); - garage.object_table.insert(&final_object).await?; - - return Err(e); - } - - // Write final object version - object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( - ObjectVersionMeta { - headers, - size: total_size, - etag: etag.clone(), - }, - version.blocks.items()[0].1.hash, - )); - - let final_object = Object::new(bucket.id, key.clone(), vec![object_version]); - garage.object_table.insert(&final_object).await?; - - // Send response saying ok we're done - let result = s3_xml::CompleteMultipartUploadResult { - xmlns: (), - location: None, - bucket: s3_xml::Value(bucket_name.to_string()), - key: s3_xml::Value(key), - etag: s3_xml::Value(format!("\"{}\"", etag)), - }; - let xml = s3_xml::to_xml_with_header(&result)?; - - Ok(Response::new(Body::from(xml.into_bytes()))) -} - -pub async fn handle_abort_multipart_upload( - garage: Arc, - bucket_id: Uuid, - key: &str, - upload_id: &str, -) -> Result, Error> { - let version_uuid = decode_upload_id(upload_id)?; - - let object = garage - .object_table - .get(&bucket_id, &key.to_string()) - .await?; - let object = object.ok_or(Error::NoSuchKey)?; - - let object_version = object - .versions() - .iter() - .find(|v| v.uuid == version_uuid && v.is_uploading()); - let mut object_version = match object_version { - None => return Err(Error::NoSuchUpload), - Some(x) => x.clone(), - }; - - object_version.state = ObjectVersionState::Aborted; - let final_object = Object::new(bucket_id, key.to_string(), vec![object_version]); - garage.object_table.insert(&final_object).await?; - - Ok(Response::new(Body::from(vec![]))) -} - -fn get_mime_type(headers: &HeaderMap) -> Result { +pub(crate) fn get_mime_type(headers: &HeaderMap) -> Result { Ok(headers .get(hyper::header::CONTENT_TYPE) .map(|x| x.to_str()) @@ -821,54 +512,3 @@ pub(crate) fn get_headers(headers: &HeaderMap) -> Result Result { - let id_bin = hex::decode(id).map_err(|_| Error::NoSuchUpload)?; - if id_bin.len() != 32 { - return Err(Error::NoSuchUpload); - } - let mut uuid = [0u8; 32]; - uuid.copy_from_slice(&id_bin[..]); - Ok(Uuid::from(uuid)) -} - -#[derive(Debug)] -struct CompleteMultipartUploadPart { - etag: String, - part_number: u64, -} - -fn parse_complete_multipart_upload_body( - xml: &roxmltree::Document, -) -> Option> { - let mut parts = vec![]; - - let root = xml.root(); - let cmu = root.first_child()?; - if !cmu.has_tag_name("CompleteMultipartUpload") { - return None; - } - - for item in cmu.children() { - // Only parse nodes - if !item.is_element() { - continue; - } - - if item.has_tag_name("Part") { - let etag = item.children().find(|e| e.has_tag_name("ETag"))?.text()?; - let part_number = item - .children() - .find(|e| e.has_tag_name("PartNumber"))? - .text()?; - parts.push(CompleteMultipartUploadPart { - etag: etag.trim_matches('"').to_string(), - part_number: part_number.parse().ok()?, - }); - } else { - return None; - } - } - - Some(parts) -} -- cgit v1.2.3 From 7ad7dae5d46c76432edd68c152531c65443cad7a Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 3 May 2023 19:49:36 +0200 Subject: fix s3 list test --- src/api/s3/list.rs | 195 ++++++++++++++++++++++++----------------------------- 1 file changed, 89 insertions(+), 106 deletions(-) (limited to 'src/api/s3') diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs index 5a9eb133..e6f0daac 100644 --- a/src/api/s3/list.rs +++ b/src/api/s3/list.rs @@ -484,27 +484,25 @@ fn fetch_part_info<'a>( } } - // Cut the beginning and end - match &query.part_number_marker { - Some(marker) => { - let next = marker + 1; - let part_idx = - into_ok_or_err(parts.binary_search_by(|part| part.part_number.cmp(&next))); - parts.truncate(part_idx + query.max_parts as usize); - parts = parts.split_off(part_idx); - } - None => { - parts.truncate(query.max_parts as usize); - } - }; + // Cut the beginning if we have a marker + if let Some(marker) = &query.part_number_marker { + let next = marker + 1; + let part_idx = parts + .binary_search_by(|part| part.part_number.cmp(&next)) + .unwrap_or_else(|x| x); + parts = parts.split_off(part_idx); + } - match parts.last() { - Some(part_info) => { + // Cut the end if we have too many parts + if parts.len() > query.max_parts as usize { + parts.truncate(query.max_parts as usize); + if let Some(part_info) = parts.last() { let pagination = Some(part_info.part_number); - Ok((parts, pagination)) + return Ok((parts, pagination)); } - None => Ok((parts, None)), } + + Ok((parts, None)) } /* @@ -866,14 +864,6 @@ impl ExtractAccumulator for UploadAccumulator { * Utility functions */ -/// This is a stub for Result::into_ok_or_err that is not yet in Rust stable -fn into_ok_or_err(r: Result) -> T { - match r { - Ok(r) => r, - Err(r) => r, - } -} - /// Returns the common prefix of the object given the query prefix and delimiter fn common_prefix<'a>(object: &'a Object, query: &ListQueryCommon) -> Option<&'a str> { match &query.delimiter { @@ -899,7 +889,6 @@ fn uriencode_maybe(s: &str, yes: bool) -> s3_xml::Value { #[cfg(test)] mod tests { use super::*; - use garage_model::s3::version_table::*; use garage_util::*; use std::iter::FromIterator; @@ -1120,85 +1109,76 @@ mod tests { Ok(()) } - fn version() -> Version { + fn mpu() -> MultipartUpload { let uuid = Uuid::from([0x08; 32]); - let blocks = vec![ + let parts = vec![ ( - VersionBlockKey { + MpuPartKey { part_number: 1, - offset: 1, + timestamp: TS, }, - VersionBlock { - hash: uuid, - size: 3, + MpuPart { + version: uuid, + size: Some(3), + etag: Some("etag1".into()), }, ), ( - VersionBlockKey { - part_number: 1, - offset: 2, + MpuPartKey { + part_number: 2, + timestamp: TS, }, - VersionBlock { - hash: uuid, - size: 2, + MpuPart { + version: uuid, + size: None, + etag: None, }, ), ( - VersionBlockKey { - part_number: 2, - offset: 1, + MpuPartKey { + part_number: 3, + timestamp: TS, }, - VersionBlock { - hash: uuid, - size: 8, + MpuPart { + version: uuid, + size: Some(10), + etag: Some("etag2".into()), }, ), ( - VersionBlockKey { + MpuPartKey { part_number: 5, - offset: 1, + timestamp: TS, }, - VersionBlock { - hash: uuid, - size: 7, + MpuPart { + version: uuid, + size: Some(7), + etag: Some("etag3".into()), }, ), ( - VersionBlockKey { + MpuPartKey { part_number: 8, - offset: 1, + timestamp: TS, }, - VersionBlock { - hash: uuid, - size: 5, + MpuPart { + version: uuid, + size: Some(5), + etag: Some("etag4".into()), }, ), ]; - let etags = vec![ - (1, "etag1".to_string()), - (3, "etag2".to_string()), - (5, "etag3".to_string()), - (8, "etag4".to_string()), - (9, "etag5".to_string()), - ]; - Version { - uuid, + MultipartUpload { + upload_id: uuid, deleted: false.into(), - blocks: crdt::Map::::from_iter(blocks), - backlink: VersionBacklink::Object { - bucket_id: uuid, - key: "a".to_string(), - }, - parts_etags: crdt::Map::::from_iter(etags), + parts: crdt::Map::::from_iter(parts), + bucket_id: uuid, + key: "a".into(), } } - fn obj() -> Object { - Object::new(bucket(), "d".to_string(), vec![objup_version([0x08; 32])]) - } - #[test] fn test_fetch_part_info() -> Result<(), Error> { let uuid = Uuid::from([0x08; 32]); @@ -1211,82 +1191,85 @@ mod tests { max_parts: 2, }; - assert!( - fetch_part_info(&query, None, None, uuid).is_err(), - "No object and version should fail" - ); - assert!( - fetch_part_info(&query, Some(obj()), None, uuid).is_err(), - "No version should faild" - ); - assert!( - fetch_part_info(&query, None, Some(version()), uuid).is_err(), - "No object should fail" - ); + let mpu = mpu(); // Start from the beginning but with limited size to trigger pagination - let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?; - assert_eq!(pagination.unwrap(), 5); + let (info, pagination) = fetch_part_info(&query, &mpu)?; + assert_eq!(pagination.unwrap(), 3); assert_eq!( info, vec![ PartInfo { - etag: "etag1".to_string(), + etag: "etag1", timestamp: TS, part_number: 1, - size: 5 + size: 3 }, PartInfo { - etag: "etag3".to_string(), + etag: "etag2", timestamp: TS, - part_number: 5, - size: 7 + part_number: 3, + size: 10 }, ] ); // Use previous pagination to make a new request query.part_number_marker = Some(pagination.unwrap()); - let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?; + let (info, pagination) = fetch_part_info(&query, &mpu)?; assert!(pagination.is_none()); assert_eq!( info, - vec![PartInfo { - etag: "etag4".to_string(), - timestamp: TS, - part_number: 8, - size: 5 - },] + vec![ + PartInfo { + etag: "etag3", + timestamp: TS, + part_number: 5, + size: 7 + }, + PartInfo { + etag: "etag4", + timestamp: TS, + part_number: 8, + size: 5 + }, + ] ); // Trying to access a part that is way larger than registered ones query.part_number_marker = Some(9999); - let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?; + let (info, pagination) = fetch_part_info(&query, &mpu)?; assert!(pagination.is_none()); assert_eq!(info, vec![]); // Try without any limitation query.max_parts = 1000; query.part_number_marker = None; - let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?; + let (info, pagination) = fetch_part_info(&query, &mpu)?; assert!(pagination.is_none()); assert_eq!( info, vec![ PartInfo { - etag: "etag1".to_string(), + etag: "etag1", timestamp: TS, part_number: 1, - size: 5 + size: 3 + }, + PartInfo { + etag: "etag2", + timestamp: TS, + part_number: 3, + size: 10 }, PartInfo { - etag: "etag3".to_string(), + etag: "etag3", timestamp: TS, part_number: 5, size: 7 }, PartInfo { - etag: "etag4".to_string(), + etag: "etag4", timestamp: TS, part_number: 8, size: 5 -- cgit v1.2.3 From 8644376ac2dd8015e9212c19f30df63811426e1c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 4 May 2023 10:09:52 +0200 Subject: fix test; simplify code --- src/api/s3/api_server.rs | 8 ++++---- src/api/s3/list.rs | 13 ++++++++----- 2 files changed, 12 insertions(+), 9 deletions(-) (limited to 'src/api/s3') diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 7c23de19..5e793082 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -257,7 +257,7 @@ impl ApiHandler for S3ApiServer { bucket_name, bucket_id, delimiter: delimiter.map(|d| d.to_string()), - page_size: max_keys.map(|p| p.clamp(1, 1000)).unwrap_or(1000), + page_size: max_keys.unwrap_or(1000).clamp(1, 1000), prefix: prefix.unwrap_or_default(), urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), }, @@ -287,7 +287,7 @@ impl ApiHandler for S3ApiServer { bucket_name, bucket_id, delimiter: delimiter.map(|d| d.to_string()), - page_size: max_keys.map(|p| p.clamp(1, 1000)).unwrap_or(1000), + page_size: max_keys.unwrap_or(1000).clamp(1, 1000), urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), prefix: prefix.unwrap_or_default(), }, @@ -320,7 +320,7 @@ impl ApiHandler for S3ApiServer { bucket_name, bucket_id, delimiter: delimiter.map(|d| d.to_string()), - page_size: max_uploads.map(|p| p.clamp(1, 1000)).unwrap_or(1000), + page_size: max_uploads.unwrap_or(1000).clamp(1, 1000), prefix: prefix.unwrap_or_default(), urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), }, @@ -344,7 +344,7 @@ impl ApiHandler for S3ApiServer { key, upload_id, part_number_marker: part_number_marker.map(|p| p.clamp(1, 10000)), - max_parts: max_parts.map(|p| p.clamp(1, 1000)).unwrap_or(1000), + max_parts: max_parts.unwrap_or(1000).clamp(1, 1000), }, ) .await diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs index e6f0daac..7408d4d3 100644 --- a/src/api/s3/list.rs +++ b/src/api/s3/list.rs @@ -282,15 +282,17 @@ pub async fn handle_list_parts( let result = s3_xml::ListPartsResult { xmlns: (), + // Query parameters bucket: s3_xml::Value(query.bucket_name.to_string()), key: s3_xml::Value(query.key.to_string()), upload_id: s3_xml::Value(query.upload_id.to_string()), part_number_marker: query.part_number_marker.map(|e| s3_xml::IntValue(e as i64)), max_parts: s3_xml::IntValue(query.max_parts as i64), + // Result values next_part_number_marker: next.map(|e| s3_xml::IntValue(e as i64)), - is_truncated: s3_xml::Value(next.map(|_| "true").unwrap_or("false").to_string()), + is_truncated: s3_xml::Value(format!("{}", next.is_some())), parts: info .iter() .map(|part| s3_xml::PartItem { @@ -300,6 +302,7 @@ pub async fn handle_list_parts( size: s3_xml::IntValue(part.size as i64), }) .collect(), + // Dummy result values (unsupported features) initiator: s3_xml::Initiator { display_name: s3_xml::Value(DUMMY_NAME.to_string()), @@ -462,6 +465,8 @@ fn fetch_part_info<'a>( query: &ListPartsQuery, mpu: &'a MultipartUpload, ) -> Result<(Vec>, Option), Error> { + assert!((1..=1000).contains(&query.max_parts)); // see s3/api_server.rs + // Parse multipart upload part list, removing parts not yet finished // and failed part uploads that were overwritten let mut parts: Vec> = Vec::with_capacity(mpu.parts.items().len()); @@ -496,10 +501,8 @@ fn fetch_part_info<'a>( // Cut the end if we have too many parts if parts.len() > query.max_parts as usize { parts.truncate(query.max_parts as usize); - if let Some(part_info) = parts.last() { - let pagination = Some(part_info.part_number); - return Ok((parts, pagination)); - } + let pagination = Some(parts.last().unwrap().part_number); + return Ok((parts, pagination)); } Ok((parts, None)) -- cgit v1.2.3 From 412ab77b0815f165539fe41713c0155a9878672f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 4 May 2023 19:44:01 +0200 Subject: comments and clippy lint fixes --- src/api/s3/get.rs | 4 ++-- src/api/s3/multipart.rs | 8 +++----- 2 files changed, 5 insertions(+), 7 deletions(-) (limited to 'src/api/s3') diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index aa391745..5e682726 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -441,7 +441,7 @@ fn body_from_blocks_range( // block.part_number, which is not the same in the case of a multipart upload) let mut blocks: Vec<(VersionBlock, u64)> = Vec::with_capacity(std::cmp::min( all_blocks.len(), - 4 + ((end - begin) / std::cmp::max(all_blocks[0].1.size as u64, 1024)) as usize, + 4 + ((end - begin) / std::cmp::max(all_blocks[0].1.size, 1024)) as usize, )); let mut block_offset: u64 = 0; for (_, b) in all_blocks.iter() { @@ -452,7 +452,7 @@ fn body_from_blocks_range( if block_offset < end && block_offset + b.size > begin { blocks.push((*b, block_offset)); } - block_offset += b.size as u64; + block_offset += b.size; } let order_stream = OrderTag::stream(); diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index ecd7a212..611cfd47 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -340,6 +340,7 @@ pub async fn handle_abort_multipart_upload( // ======== helpers ============ +#[allow(clippy::ptr_arg)] pub(crate) async fn get_upload( garage: &Garage, bucket_id: &Uuid, @@ -347,13 +348,10 @@ pub(crate) async fn get_upload( upload_id: &Uuid, ) -> Result<(Object, ObjectVersion, MultipartUpload), Error> { let (object, mpu) = futures::try_join!( - garage - .object_table - .get(&bucket_id, &key) - .map_err(Error::from), + garage.object_table.get(bucket_id, key).map_err(Error::from), garage .mpu_table - .get(&upload_id, &EmptyKey) + .get(upload_id, &EmptyKey) .map_err(Error::from), )?; -- cgit v1.2.3 From c14d3735e5514c395a691a2ab4bb93aef57035e2 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 9 May 2023 13:02:39 +0200 Subject: Add test for multipart uploads and fix part renumbering --- src/api/s3/multipart.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/api/s3') diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index 611cfd47..0e82f3d0 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -254,7 +254,7 @@ pub async fn handle_complete_multipart_upload( for (vbk, vb) in part_version.blocks.items().iter() { final_version.blocks.put( VersionBlockKey { - part_number: part_number as u64, + part_number: (part_number + 1) as u64, offset: vbk.offset, }, *vb, -- cgit v1.2.3 From a6cc563bdd1caab11892f9b7a2f538a2f33e375b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 6 Jun 2023 15:18:45 +0200 Subject: UploadPart: automatic cleanup of version (and reference blocked) when interrupted --- src/api/s3/multipart.rs | 52 +++++++++++++++++++++++++++++++++++++++++++++---- src/api/s3/put.rs | 29 ++++++++++++++++----------- 2 files changed, 66 insertions(+), 15 deletions(-) (limited to 'src/api/s3') diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index 0e82f3d0..7df0dafc 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -96,18 +96,27 @@ pub async fn handle_put_part( let first_block = first_block.ok_or_bad_request("Empty body")?; // Calculate part identity: timestamp, version id - let version_id = gen_uuid(); + let version_uuid = gen_uuid(); let mpu_part_key = MpuPartKey { part_number, timestamp: mpu.next_timestamp(part_number), }; + // The following consists in many steps that can each fail. + // Keep track that some cleanup will be needed if things fail + // before everything is finished (cleanup is done using the Drop trait). + let mut interrupted_cleanup = InterruptedCleanup(Some(InterruptedCleanupInner { + garage: garage.clone(), + upload_id, + version_uuid, + })); + // Create version and link version from MPU mpu.parts.clear(); mpu.parts.put( mpu_part_key, MpuPart { - version: version_id, + version: version_uuid, etag: None, size: None, }, @@ -115,7 +124,7 @@ pub async fn handle_put_part( garage.mpu_table.insert(&mpu).await?; let version = Version::new( - version_id, + version_uuid, VersionBacklink::MultipartUpload { upload_id }, false, ); @@ -147,13 +156,17 @@ pub async fn handle_put_part( mpu.parts.put( mpu_part_key, MpuPart { - version: version_id, + version: version_uuid, etag: Some(data_md5sum_hex.clone()), size: Some(total_size), }, ); garage.mpu_table.insert(&mpu).await?; + // We were not interrupted, everything went fine. + // We won't have to clean up on drop. + interrupted_cleanup.cancel(); + let response = Response::builder() .header("ETag", format!("\"{}\"", data_md5sum_hex)) .body(Body::empty()) @@ -161,6 +174,37 @@ pub async fn handle_put_part( Ok(response) } +struct InterruptedCleanup(Option); +struct InterruptedCleanupInner { + garage: Arc, + upload_id: Uuid, + version_uuid: Uuid, +} + +impl InterruptedCleanup { + fn cancel(&mut self) { + drop(self.0.take()); + } +} +impl Drop for InterruptedCleanup { + fn drop(&mut self) { + if let Some(info) = self.0.take() { + tokio::spawn(async move { + let version = Version::new( + info.version_uuid, + VersionBacklink::MultipartUpload { + upload_id: info.upload_id, + }, + true, + ); + if let Err(e) = info.garage.version_table.insert(&version).await { + warn!("Cannot cleanup after aborted UploadPart: {}", e); + } + }); + } + } +} + pub async fn handle_complete_multipart_upload( garage: Arc, req: Request, diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 804e1087..c7ac5030 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -121,13 +121,13 @@ pub(crate) async fn save_stream> + Unpin>( // The following consists in many steps that can each fail. // Keep track that some cleanup will be needed if things fail // before everything is finished (cleanup is done using the Drop trait). - let mut interrupted_cleanup = InterruptedCleanup(Some(( - garage.clone(), - bucket.id, - key.into(), + let mut interrupted_cleanup = InterruptedCleanup(Some(InterruptedCleanupInner { + garage: garage.clone(), + bucket_id: bucket.id, + key: key.into(), version_uuid, version_timestamp, - ))); + })); // Write version identifier in object table so that we have a trace // that we are uploading something @@ -433,7 +433,14 @@ pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response { .unwrap() } -struct InterruptedCleanup(Option<(Arc, Uuid, String, Uuid, u64)>); +struct InterruptedCleanup(Option); +struct InterruptedCleanupInner { + garage: Arc, + bucket_id: Uuid, + key: String, + version_uuid: Uuid, + version_timestamp: u64, +} impl InterruptedCleanup { fn cancel(&mut self) { @@ -442,15 +449,15 @@ impl InterruptedCleanup { } impl Drop for InterruptedCleanup { fn drop(&mut self) { - if let Some((garage, bucket_id, key, version_uuid, version_ts)) = self.0.take() { + if let Some(info) = self.0.take() { tokio::spawn(async move { let object_version = ObjectVersion { - uuid: version_uuid, - timestamp: version_ts, + uuid: info.version_uuid, + timestamp: info.version_timestamp, state: ObjectVersionState::Aborted, }; - let object = Object::new(bucket_id, key, vec![object_version]); - if let Err(e) = garage.object_table.insert(&object).await { + let object = Object::new(info.bucket_id, info.key, vec![object_version]); + if let Err(e) = info.garage.object_table.insert(&object).await { warn!("Cannot cleanup after aborted PutObject: {}", e); } }); -- cgit v1.2.3 From 942c1f1bfe138cbc4e49540cede852e4d462590e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 13 Jun 2023 10:48:22 +0200 Subject: multipart uploads: save timestamp --- src/api/s3/multipart.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'src/api/s3') diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index 7df0dafc..52ea8e78 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -33,12 +33,13 @@ pub async fn handle_create_multipart_upload( key: &str, ) -> Result, Error> { let upload_id = gen_uuid(); + let timestamp = now_msec(); let headers = get_headers(req.headers())?; // Create object in object table let object_version = ObjectVersion { uuid: upload_id, - timestamp: now_msec(), + timestamp, state: ObjectVersionState::Uploading { multipart: true, headers, @@ -50,7 +51,7 @@ pub async fn handle_create_multipart_upload( // Create multipart upload in mpu table // This multipart upload will hold references to uploaded parts // (which are entries in the Version table) - let mpu = MultipartUpload::new(upload_id, bucket_id, key.into(), false); + let mpu = MultipartUpload::new(upload_id, timestamp, bucket_id, key.into(), false); garage.mpu_table.insert(&mpu).await?; // Send success response -- cgit v1.2.3 From 567036712616c39563b5ebf1ae6210245dbd7cc4 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 13 Jun 2023 23:10:46 +0200 Subject: multipartupload in test: add forgotten timestamp --- src/api/s3/list.rs | 1 + 1 file changed, 1 insertion(+) (limited to 'src/api/s3') diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs index 7408d4d3..33d62518 100644 --- a/src/api/s3/list.rs +++ b/src/api/s3/list.rs @@ -1175,6 +1175,7 @@ mod tests { MultipartUpload { upload_id: uuid, + timestamp: TS, deleted: false.into(), parts: crdt::Map::::from_iter(parts), bucket_id: uuid, -- cgit v1.2.3 From 8041d9a8274619b9a7cb66735ed560bcfba16078 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 29 Aug 2023 17:44:17 +0200 Subject: s3: add xml structures to serialize/deserialize lifecycle configs --- src/api/s3/api_server.rs | 6 ++ src/api/s3/lifecycle.rs | 256 +++++++++++++++++++++++++++++++++++++++++++++++ src/api/s3/mod.rs | 1 + 3 files changed, 263 insertions(+) create mode 100644 src/api/s3/lifecycle.rs (limited to 'src/api/s3') diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 5e793082..06fef6d5 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -26,6 +26,7 @@ use crate::s3::copy::*; use crate::s3::cors::*; use crate::s3::delete::*; use crate::s3::get::*; +use crate::s3::lifecycle::*; use crate::s3::list::*; use crate::s3::multipart::*; use crate::s3::post_object::handle_post_object; @@ -362,6 +363,11 @@ impl ApiHandler for S3ApiServer { handle_put_cors(garage, bucket_id, req, content_sha256).await } Endpoint::DeleteBucketCors {} => handle_delete_cors(garage, bucket_id).await, + Endpoint::GetBucketLifecycleConfiguration {} => handle_get_lifecycle(&bucket).await, + Endpoint::PutBucketLifecycleConfiguration {} => { + handle_put_lifecycle(garage, bucket_id, req, content_sha256).await + } + Endpoint::DeleteBucketLifecycle {} => handle_delete_lifecycle(garage, bucket_id).await, endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())), }; diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs new file mode 100644 index 00000000..cb0cc83a --- /dev/null +++ b/src/api/s3/lifecycle.rs @@ -0,0 +1,256 @@ +use quick_xml::de::from_reader; +use std::sync::Arc; + +use hyper::{Body, Request, Response, StatusCode}; + +use serde::{Deserialize, Serialize}; + +use crate::s3::error::*; +use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; +use crate::signature::verify_signed_content; + +use garage_model::bucket_table::{ + Bucket, LifecycleExpiration as GarageLifecycleExpiration, + LifecycleFilter as GarageLifecycleFilter, LifecycleRule as GarageLifecycleRule, +}; +use garage_model::garage::Garage; +use garage_util::data::*; + +pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result, Error> { + let param = bucket + .params() + .ok_or_internal_error("Bucket should not be deleted at this point")?; + + if let Some(lifecycle) = param.lifecycle_config.get() { + let wc = LifecycleConfiguration { + xmlns: (), + lifecycle_rules: lifecycle + .iter() + .map(LifecycleRule::from_garage_lifecycle_rule) + .collect::>(), + }; + let xml = to_xml_with_header(&wc)?; + Ok(Response::builder() + .status(StatusCode::OK) + .header(http::header::CONTENT_TYPE, "application/xml") + .body(Body::from(xml))?) + } else { + Ok(Response::builder() + .status(StatusCode::NO_CONTENT) + .body(Body::empty())?) + } +} + +pub async fn handle_delete_lifecycle( + garage: Arc, + bucket_id: Uuid, +) -> Result, Error> { + let mut bucket = garage + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; + + let param = bucket.params_mut().unwrap(); + + param.lifecycle_config.update(None); + garage.bucket_table.insert(&bucket).await?; + + Ok(Response::builder() + .status(StatusCode::NO_CONTENT) + .body(Body::empty())?) +} + +pub async fn handle_put_lifecycle( + garage: Arc, + bucket_id: Uuid, + req: Request, + content_sha256: Option, +) -> Result, Error> { + let body = hyper::body::to_bytes(req.into_body()).await?; + + if let Some(content_sha256) = content_sha256 { + verify_signed_content(content_sha256, &body[..])?; + } + + let mut bucket = garage + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; + + let param = bucket.params_mut().unwrap(); + + let conf: LifecycleConfiguration = from_reader(&body as &[u8])?; + + param + .lifecycle_config + .update(Some(conf.validate_into_garage_lifecycle_config()?)); + garage.bucket_table.insert(&bucket).await?; + + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::empty())?) +} + +// ---- SERIALIZATION AND DESERIALIZATION TO/FROM S3 XML ---- + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +#[serde(rename = "LifecycleConfiguration")] +pub struct LifecycleConfiguration { + #[serde(serialize_with = "xmlns_tag", skip_deserializing)] + pub xmlns: (), + #[serde(rename = "Rule")] + pub lifecycle_rules: Vec, +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +pub struct LifecycleRule { + #[serde(rename = "ID")] + pub id: Option, + #[serde(rename = "Status")] + pub status: Value, + #[serde(rename = "Filter", default)] + pub filter: Filter, + #[serde(rename = "Expiration", default)] + pub expiration: Option, + #[serde(rename = "AbortIncompleteMultipartUpload", default)] + pub abort_incomplete_mpu: Option, +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Default)] +pub struct Filter { + #[serde(rename = "And")] + pub and: Option>, + #[serde(rename = "Prefix")] + pub prefix: Option, + #[serde(rename = "ObjectSizeGreaterThan")] + pub size_gt: Option, + #[serde(rename = "ObjectSizeLessThan")] + pub size_lt: Option, +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +pub struct Expiration { + #[serde(rename = "Days")] + pub days: Option, + #[serde(rename = "Date")] + pub at_date: Option, +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] +pub struct AbortIncompleteMpu { + #[serde(rename = "DaysAfterInitiation")] + pub days: Option, +} + +impl LifecycleConfiguration { + pub fn validate_into_garage_lifecycle_config(self) -> Result, Error> { + let mut ret = vec![]; + for rule in self.lifecycle_rules { + ret.push(rule.validate_into_garage_lifecycle_rule()?); + } + Ok(ret) + } + + pub fn from_garage_lifecycle_config(config: &[GarageLifecycleRule]) -> Self { + Self { + xmlns: (), + lifecycle_rules: config + .iter() + .map(LifecycleRule::from_garage_lifecycle_rule) + .collect(), + } + } +} + +impl LifecycleRule { + pub fn validate_into_garage_lifecycle_rule(self) -> Result { + todo!() + } + + pub fn from_garage_lifecycle_rule(rule: &GarageLifecycleRule) -> Self { + todo!() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use quick_xml::de::from_str; + + #[test] + fn test_deserialize_lifecycle_config() -> Result<(), Error> { + let message = r#" + + + id1 + Enabled + + documents/ + + + 7 + + + + id2 + Enabled + + + logs/ + 1000000 + + + + 365 + + +"#; + let conf: LifecycleConfiguration = from_str(message).unwrap(); + let ref_value = LifecycleConfiguration { + xmlns: (), + lifecycle_rules: vec![ + LifecycleRule { + id: Some("id1".into()), + status: "Enabled".into(), + filter: Filter { + prefix: Some("documents/".into()), + ..Default::default() + }, + expiration: None, + abort_incomplete_mpu: Some(AbortIncompleteMpu { + days: Some(IntValue(7)), + }), + }, + LifecycleRule { + id: Some("id2".into()), + status: "Enabled".into(), + filter: Filter { + and: Some(Box::new(Filter { + prefix: Some("logs/".into()), + size_gt: Some(IntValue(1000000)), + ..Default::default() + })), + ..Default::default() + }, + expiration: Some(Expiration { + days: Some(IntValue(365)), + at_date: None, + }), + abort_incomplete_mpu: None, + }, + ], + }; + assert_eq! { + ref_value, + conf + }; + + let message2 = to_xml_with_header(&ref_value)?; + + let cleanup = |c: &str| c.replace(char::is_whitespace, ""); + assert_eq!(cleanup(message), cleanup(&message2)); + + Ok(()) + } +} diff --git a/src/api/s3/mod.rs b/src/api/s3/mod.rs index b5237bf7..cbdb94ab 100644 --- a/src/api/s3/mod.rs +++ b/src/api/s3/mod.rs @@ -6,6 +6,7 @@ mod copy; pub mod cors; mod delete; pub mod get; +mod lifecycle; mod list; mod multipart; mod post_object; -- cgit v1.2.3 From abf011c2906d04200bb39d7bc82f7ed973215500 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 29 Aug 2023 18:22:03 +0200 Subject: lifecycle: implement validation into garage's internal data structure --- src/api/s3/lifecycle.rs | 200 ++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 177 insertions(+), 23 deletions(-) (limited to 'src/api/s3') diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs index cb0cc83a..48265870 100644 --- a/src/api/s3/lifecycle.rs +++ b/src/api/s3/lifecycle.rs @@ -22,13 +22,7 @@ pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result, Err .ok_or_internal_error("Bucket should not be deleted at this point")?; if let Some(lifecycle) = param.lifecycle_config.get() { - let wc = LifecycleConfiguration { - xmlns: (), - lifecycle_rules: lifecycle - .iter() - .map(LifecycleRule::from_garage_lifecycle_rule) - .collect::>(), - }; + let wc = LifecycleConfiguration::from_garage_lifecycle_config(lifecycle); let xml = to_xml_with_header(&wc)?; Ok(Response::builder() .status(StatusCode::OK) @@ -81,9 +75,10 @@ pub async fn handle_put_lifecycle( let conf: LifecycleConfiguration = from_reader(&body as &[u8])?; - param - .lifecycle_config - .update(Some(conf.validate_into_garage_lifecycle_config()?)); + let config = conf + .validate_into_garage_lifecycle_config() + .ok_or_bad_request("Invalid lifecycle configuration")?; + param.lifecycle_config.update(Some(config)); garage.bucket_table.insert(&bucket).await?; Ok(Response::builder() @@ -109,7 +104,7 @@ pub struct LifecycleRule { #[serde(rename = "Status")] pub status: Value, #[serde(rename = "Filter", default)] - pub filter: Filter, + pub filter: Option, #[serde(rename = "Expiration", default)] pub expiration: Option, #[serde(rename = "AbortIncompleteMultipartUpload", default)] @@ -139,11 +134,13 @@ pub struct Expiration { #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] pub struct AbortIncompleteMpu { #[serde(rename = "DaysAfterInitiation")] - pub days: Option, + pub days: IntValue, } impl LifecycleConfiguration { - pub fn validate_into_garage_lifecycle_config(self) -> Result, Error> { + pub fn validate_into_garage_lifecycle_config( + self, + ) -> Result, &'static str> { let mut ret = vec![]; for rule in self.lifecycle_rules { ret.push(rule.validate_into_garage_lifecycle_rule()?); @@ -163,12 +160,136 @@ impl LifecycleConfiguration { } impl LifecycleRule { - pub fn validate_into_garage_lifecycle_rule(self) -> Result { - todo!() + pub fn validate_into_garage_lifecycle_rule(self) -> Result { + let enabled = match self.status.0.as_str() { + "Enabled" => true, + "Disabled" => false, + _ => return Err("invalid value for "), + }; + + let filter = self + .filter + .map(Filter::validate_into_garage_lifecycle_filter) + .transpose()? + .unwrap_or_default(); + + let abort_incomplete_mpu_days = self.abort_incomplete_mpu.map(|x| x.days.0 as usize); + + let expiration = self + .expiration + .map(Expiration::validate_into_garage_lifecycle_expiration) + .transpose()?; + + Ok(GarageLifecycleRule { + id: self.id.map(|x| x.0), + enabled, + filter, + abort_incomplete_mpu_days, + expiration, + }) } pub fn from_garage_lifecycle_rule(rule: &GarageLifecycleRule) -> Self { - todo!() + Self { + id: rule.id.as_deref().map(Value::from), + status: if rule.enabled { + Value::from("Enabled") + } else { + Value::from("Disabled") + }, + filter: Filter::from_garage_lifecycle_filter(&rule.filter), + abort_incomplete_mpu: rule + .abort_incomplete_mpu_days + .map(|days| AbortIncompleteMpu { + days: IntValue(days as i64), + }), + expiration: rule + .expiration + .as_ref() + .map(Expiration::from_garage_lifecycle_expiration), + } + } +} + +impl Filter { + pub fn count(&self) -> i32 { + fn count(x: &Option) -> i32 { + x.as_ref().map(|_| 1).unwrap_or(0) + } + count(&self.prefix) + count(&self.size_gt) + count(&self.size_lt) + } + + pub fn validate_into_garage_lifecycle_filter( + self, + ) -> Result { + if self.count() > 0 && self.and.is_some() { + Err("Filter tag cannot contain both and another condition") + } else if let Some(and) = self.and { + if and.and.is_some() { + return Err("Nested tags"); + } + Ok(and.internal_into_garage_lifecycle_filter()) + } else if self.count() > 1 { + Err("Multiple Filter conditions must be wrapped in an tag") + } else { + Ok(self.internal_into_garage_lifecycle_filter()) + } + } + + fn internal_into_garage_lifecycle_filter(self) -> GarageLifecycleFilter { + GarageLifecycleFilter { + prefix: self.prefix.map(|x| x.0), + size_gt: self.size_gt.map(|x| x.0 as usize), + size_lt: self.size_lt.map(|x| x.0 as usize), + } + } + + pub fn from_garage_lifecycle_filter(rule: &GarageLifecycleFilter) -> Option { + let filter = Filter { + and: None, + prefix: rule.prefix.as_deref().map(Value::from), + size_gt: rule.size_gt.map(|x| IntValue(x as i64)), + size_lt: rule.size_lt.map(|x| IntValue(x as i64)), + }; + match filter.count() { + 0 => None, + 1 => Some(filter), + _ => Some(Filter { + and: Some(Box::new(filter)), + ..Default::default() + }), + } + } +} + +impl Expiration { + pub fn validate_into_garage_lifecycle_expiration( + self, + ) -> Result { + match (self.days, self.at_date) { + (Some(_), Some(_)) => Err("cannot have both and in "), + (None, None) => Err(" must contain either or "), + (Some(days), None) => Ok(GarageLifecycleExpiration::AfterDays(days.0 as usize)), + (None, Some(date)) => { + if date.0.parse::().is_err() { + return Err("Invalid expiration "); + } + Ok(GarageLifecycleExpiration::AtDate(date.0)) + } + } + } + + pub fn from_garage_lifecycle_expiration(exp: &GarageLifecycleExpiration) -> Self { + match exp { + GarageLifecycleExpiration::AfterDays(days) => Expiration { + days: Some(IntValue(*days as i64)), + at_date: None, + }, + GarageLifecycleExpiration::AtDate(days) => Expiration { + days: None, + at_date: Some(Value::from(days.as_str())), + }, + } } } @@ -213,26 +334,24 @@ mod tests { LifecycleRule { id: Some("id1".into()), status: "Enabled".into(), - filter: Filter { + filter: Some(Filter { prefix: Some("documents/".into()), ..Default::default() - }, - expiration: None, - abort_incomplete_mpu: Some(AbortIncompleteMpu { - days: Some(IntValue(7)), }), + expiration: None, + abort_incomplete_mpu: Some(AbortIncompleteMpu { days: IntValue(7) }), }, LifecycleRule { id: Some("id2".into()), status: "Enabled".into(), - filter: Filter { + filter: Some(Filter { and: Some(Box::new(Filter { prefix: Some("logs/".into()), size_gt: Some(IntValue(1000000)), ..Default::default() })), ..Default::default() - }, + }), expiration: Some(Expiration { days: Some(IntValue(365)), at_date: None, @@ -251,6 +370,41 @@ mod tests { let cleanup = |c: &str| c.replace(char::is_whitespace, ""); assert_eq!(cleanup(message), cleanup(&message2)); + // Check validation + let validated = ref_value + .validate_into_garage_lifecycle_config() + .ok_or_bad_request("invalid xml config")?; + + let ref_config = vec![ + GarageLifecycleRule { + id: Some("id1".into()), + enabled: true, + filter: GarageLifecycleFilter { + prefix: Some("documents/".into()), + ..Default::default() + }, + expiration: None, + abort_incomplete_mpu_days: Some(7), + }, + GarageLifecycleRule { + id: Some("id2".into()), + enabled: true, + filter: GarageLifecycleFilter { + prefix: Some("logs/".into()), + size_gt: Some(1000000), + ..Default::default() + }, + expiration: Some(GarageLifecycleExpiration::AfterDays(365)), + abort_incomplete_mpu_days: None, + }, + ]; + assert_eq!(validated, ref_config); + + let message3 = to_xml_with_header(&LifecycleConfiguration::from_garage_lifecycle_config( + &validated, + ))?; + assert_eq!(cleanup(message), cleanup(&message3)); + Ok(()) } } -- cgit v1.2.3 From f7b409f1140addd508c626b1e80f0f8de52a5639 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 30 Aug 2023 11:24:01 +0200 Subject: use a NaiveDate in data model, it serializes to string (iso 8601 format) --- src/api/s3/lifecycle.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) (limited to 'src/api/s3') diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs index 48265870..278cf26d 100644 --- a/src/api/s3/lifecycle.rs +++ b/src/api/s3/lifecycle.rs @@ -270,12 +270,11 @@ impl Expiration { (Some(_), Some(_)) => Err("cannot have both and in "), (None, None) => Err(" must contain either or "), (Some(days), None) => Ok(GarageLifecycleExpiration::AfterDays(days.0 as usize)), - (None, Some(date)) => { - if date.0.parse::().is_err() { - return Err("Invalid expiration "); - } - Ok(GarageLifecycleExpiration::AtDate(date.0)) - } + (None, Some(date)) => date + .0 + .parse::() + .map(GarageLifecycleExpiration::AtDate) + .map_err(|_| "Invalid expiration "), } } @@ -285,9 +284,9 @@ impl Expiration { days: Some(IntValue(*days as i64)), at_date: None, }, - GarageLifecycleExpiration::AtDate(days) => Expiration { + GarageLifecycleExpiration::AtDate(date) => Expiration { days: None, - at_date: Some(Value::from(days.as_str())), + at_date: Some(Value(date.to_string())), }, } } -- cgit v1.2.3 From 2996dc875fc378ec3597bfa3bdb8ba8951e1865c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 30 Aug 2023 14:28:48 +0200 Subject: lifecycle worker: implement main functionality --- src/api/s3/lifecycle.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/api/s3') diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs index 278cf26d..2d621eac 100644 --- a/src/api/s3/lifecycle.rs +++ b/src/api/s3/lifecycle.rs @@ -239,8 +239,8 @@ impl Filter { fn internal_into_garage_lifecycle_filter(self) -> GarageLifecycleFilter { GarageLifecycleFilter { prefix: self.prefix.map(|x| x.0), - size_gt: self.size_gt.map(|x| x.0 as usize), - size_lt: self.size_lt.map(|x| x.0 as usize), + size_gt: self.size_gt.map(|x| x.0 as u64), + size_lt: self.size_lt.map(|x| x.0 as u64), } } -- cgit v1.2.3 From 75ccc5a95c76f31235fcaab8a2c1795693733a4b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 30 Aug 2023 20:02:07 +0200 Subject: lifecycle config: store date as given, try to debug --- src/api/s3/lifecycle.rs | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) (limited to 'src/api/s3') diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs index 2d621eac..f0fde083 100644 --- a/src/api/s3/lifecycle.rs +++ b/src/api/s3/lifecycle.rs @@ -10,7 +10,7 @@ use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; use crate::signature::verify_signed_content; use garage_model::bucket_table::{ - Bucket, LifecycleExpiration as GarageLifecycleExpiration, + parse_lifecycle_date, Bucket, LifecycleExpiration as GarageLifecycleExpiration, LifecycleFilter as GarageLifecycleFilter, LifecycleRule as GarageLifecycleRule, }; use garage_model::garage::Garage; @@ -21,6 +21,8 @@ pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result, Err .params() .ok_or_internal_error("Bucket should not be deleted at this point")?; + trace!("bucket: {:#?}", bucket); + if let Some(lifecycle) = param.lifecycle_config.get() { let wc = LifecycleConfiguration::from_garage_lifecycle_config(lifecycle); let xml = to_xml_with_header(&wc)?; @@ -79,7 +81,15 @@ pub async fn handle_put_lifecycle( .validate_into_garage_lifecycle_config() .ok_or_bad_request("Invalid lifecycle configuration")?; param.lifecycle_config.update(Some(config)); + garage.bucket_table.insert(&bucket).await?; + trace!("new bucket: {:#?}", bucket); + + let bucket = garage + .bucket_helper() + .get_existing_bucket(bucket_id) + .await?; + trace!("new bucket again: {:#?}", bucket); Ok(Response::builder() .status(StatusCode::OK) @@ -270,11 +280,11 @@ impl Expiration { (Some(_), Some(_)) => Err("cannot have both and in "), (None, None) => Err(" must contain either or "), (Some(days), None) => Ok(GarageLifecycleExpiration::AfterDays(days.0 as usize)), - (None, Some(date)) => date - .0 - .parse::() - .map(GarageLifecycleExpiration::AtDate) - .map_err(|_| "Invalid expiration "), + (None, Some(date)) => { + trace!("date: {}", date.0); + parse_lifecycle_date(&date.0)?; + Ok(GarageLifecycleExpiration::AtDate(date.0)) + } } } -- cgit v1.2.3 From d2e94e36d64d4062ebe1fabac65ac1a6f265de17 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 30 Aug 2023 20:05:53 +0200 Subject: lifecycle config: add missing line in merge() and remove tracing --- src/api/s3/lifecycle.rs | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) (limited to 'src/api/s3') diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs index f0fde083..9036f84c 100644 --- a/src/api/s3/lifecycle.rs +++ b/src/api/s3/lifecycle.rs @@ -21,8 +21,6 @@ pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result, Err .params() .ok_or_internal_error("Bucket should not be deleted at this point")?; - trace!("bucket: {:#?}", bucket); - if let Some(lifecycle) = param.lifecycle_config.get() { let wc = LifecycleConfiguration::from_garage_lifecycle_config(lifecycle); let xml = to_xml_with_header(&wc)?; @@ -76,20 +74,12 @@ pub async fn handle_put_lifecycle( let param = bucket.params_mut().unwrap(); let conf: LifecycleConfiguration = from_reader(&body as &[u8])?; - let config = conf .validate_into_garage_lifecycle_config() .ok_or_bad_request("Invalid lifecycle configuration")?; - param.lifecycle_config.update(Some(config)); + param.lifecycle_config.update(Some(config)); garage.bucket_table.insert(&bucket).await?; - trace!("new bucket: {:#?}", bucket); - - let bucket = garage - .bucket_helper() - .get_existing_bucket(bucket_id) - .await?; - trace!("new bucket again: {:#?}", bucket); Ok(Response::builder() .status(StatusCode::OK) @@ -281,7 +271,6 @@ impl Expiration { (None, None) => Err(" must contain either or "), (Some(days), None) => Ok(GarageLifecycleExpiration::AfterDays(days.0 as usize)), (None, Some(date)) => { - trace!("date: {}", date.0); parse_lifecycle_date(&date.0)?; Ok(GarageLifecycleExpiration::AtDate(date.0)) } -- cgit v1.2.3 From f0a395e2e5db977caff0ea46e17061e02929178a Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 30 Aug 2023 23:39:28 +0200 Subject: s3 bucket apis: remove redundant call --- src/api/s3/api_server.rs | 14 ++++++++------ src/api/s3/cors.rs | 22 ++++++++-------------- src/api/s3/lifecycle.rs | 22 ++++++++-------------- src/api/s3/website.rs | 22 ++++++++-------------- 4 files changed, 32 insertions(+), 48 deletions(-) (limited to 'src/api/s3') diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 06fef6d5..3f995d34 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -355,19 +355,21 @@ impl ApiHandler for S3ApiServer { } Endpoint::GetBucketWebsite {} => handle_get_website(&bucket).await, Endpoint::PutBucketWebsite {} => { - handle_put_website(garage, bucket_id, req, content_sha256).await + handle_put_website(garage, bucket.clone(), req, content_sha256).await } - Endpoint::DeleteBucketWebsite {} => handle_delete_website(garage, bucket_id).await, + Endpoint::DeleteBucketWebsite {} => handle_delete_website(garage, bucket.clone()).await, Endpoint::GetBucketCors {} => handle_get_cors(&bucket).await, Endpoint::PutBucketCors {} => { - handle_put_cors(garage, bucket_id, req, content_sha256).await + handle_put_cors(garage, bucket.clone(), req, content_sha256).await } - Endpoint::DeleteBucketCors {} => handle_delete_cors(garage, bucket_id).await, + Endpoint::DeleteBucketCors {} => handle_delete_cors(garage, bucket.clone()).await, Endpoint::GetBucketLifecycleConfiguration {} => handle_get_lifecycle(&bucket).await, Endpoint::PutBucketLifecycleConfiguration {} => { - handle_put_lifecycle(garage, bucket_id, req, content_sha256).await + handle_put_lifecycle(garage, bucket.clone(), req, content_sha256).await + } + Endpoint::DeleteBucketLifecycle {} => { + handle_delete_lifecycle(garage, bucket.clone()).await } - Endpoint::DeleteBucketLifecycle {} => handle_delete_lifecycle(garage, bucket_id).await, endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())), }; diff --git a/src/api/s3/cors.rs b/src/api/s3/cors.rs index c7273464..49097ad1 100644 --- a/src/api/s3/cors.rs +++ b/src/api/s3/cors.rs @@ -44,14 +44,11 @@ pub async fn handle_get_cors(bucket: &Bucket) -> Result, Error> { pub async fn handle_delete_cors( garage: Arc, - bucket_id: Uuid, + mut bucket: Bucket, ) -> Result, Error> { - let mut bucket = garage - .bucket_helper() - .get_existing_bucket(bucket_id) - .await?; - - let param = bucket.params_mut().unwrap(); + let param = bucket + .params_mut() + .ok_or_internal_error("Bucket should not be deleted at this point")?; param.cors_config.update(None); garage.bucket_table.insert(&bucket).await?; @@ -63,7 +60,7 @@ pub async fn handle_delete_cors( pub async fn handle_put_cors( garage: Arc, - bucket_id: Uuid, + mut bucket: Bucket, req: Request, content_sha256: Option, ) -> Result, Error> { @@ -73,12 +70,9 @@ pub async fn handle_put_cors( verify_signed_content(content_sha256, &body[..])?; } - let mut bucket = garage - .bucket_helper() - .get_existing_bucket(bucket_id) - .await?; - - let param = bucket.params_mut().unwrap(); + let param = bucket + .params_mut() + .ok_or_internal_error("Bucket should not be deleted at this point")?; let conf: CorsConfiguration = from_reader(&body as &[u8])?; conf.validate()?; diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs index 9036f84c..11199190 100644 --- a/src/api/s3/lifecycle.rs +++ b/src/api/s3/lifecycle.rs @@ -37,14 +37,11 @@ pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result, Err pub async fn handle_delete_lifecycle( garage: Arc, - bucket_id: Uuid, + mut bucket: Bucket, ) -> Result, Error> { - let mut bucket = garage - .bucket_helper() - .get_existing_bucket(bucket_id) - .await?; - - let param = bucket.params_mut().unwrap(); + let param = bucket + .params_mut() + .ok_or_internal_error("Bucket should not be deleted at this point")?; param.lifecycle_config.update(None); garage.bucket_table.insert(&bucket).await?; @@ -56,7 +53,7 @@ pub async fn handle_delete_lifecycle( pub async fn handle_put_lifecycle( garage: Arc, - bucket_id: Uuid, + mut bucket: Bucket, req: Request, content_sha256: Option, ) -> Result, Error> { @@ -66,12 +63,9 @@ pub async fn handle_put_lifecycle( verify_signed_content(content_sha256, &body[..])?; } - let mut bucket = garage - .bucket_helper() - .get_existing_bucket(bucket_id) - .await?; - - let param = bucket.params_mut().unwrap(); + let param = bucket + .params_mut() + .ok_or_internal_error("Bucket should not be deleted at this point")?; let conf: LifecycleConfiguration = from_reader(&body as &[u8])?; let config = conf diff --git a/src/api/s3/website.rs b/src/api/s3/website.rs index 77738971..7f2ab925 100644 --- a/src/api/s3/website.rs +++ b/src/api/s3/website.rs @@ -43,14 +43,11 @@ pub async fn handle_get_website(bucket: &Bucket) -> Result, Error pub async fn handle_delete_website( garage: Arc, - bucket_id: Uuid, + mut bucket: Bucket, ) -> Result, Error> { - let mut bucket = garage - .bucket_helper() - .get_existing_bucket(bucket_id) - .await?; - - let param = bucket.params_mut().unwrap(); + let param = bucket + .params_mut() + .ok_or_internal_error("Bucket should not be deleted at this point")?; param.website_config.update(None); garage.bucket_table.insert(&bucket).await?; @@ -62,7 +59,7 @@ pub async fn handle_delete_website( pub async fn handle_put_website( garage: Arc, - bucket_id: Uuid, + mut bucket: Bucket, req: Request, content_sha256: Option, ) -> Result, Error> { @@ -72,12 +69,9 @@ pub async fn handle_put_website( verify_signed_content(content_sha256, &body[..])?; } - let mut bucket = garage - .bucket_helper() - .get_existing_bucket(bucket_id) - .await?; - - let param = bucket.params_mut().unwrap(); + let param = bucket + .params_mut() + .ok_or_internal_error("Bucket should not be deleted at this point")?; let conf: WebsiteConfiguration = from_reader(&body as &[u8])?; conf.validate()?; -- cgit v1.2.3 From be03a4610f4a6e3863e6113491e308bbcea9ca94 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 31 Aug 2023 00:00:26 +0200 Subject: s3api: remove redundant serde rename attribute --- src/api/s3/lifecycle.rs | 1 - 1 file changed, 1 deletion(-) (limited to 'src/api/s3') diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs index 11199190..1e7d6755 100644 --- a/src/api/s3/lifecycle.rs +++ b/src/api/s3/lifecycle.rs @@ -83,7 +83,6 @@ pub async fn handle_put_lifecycle( // ---- SERIALIZATION AND DESERIALIZATION TO/FROM S3 XML ---- #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] -#[serde(rename = "LifecycleConfiguration")] pub struct LifecycleConfiguration { #[serde(serialize_with = "xmlns_tag", skip_deserializing)] pub xmlns: (), -- cgit v1.2.3