diff options
author | Alex <alex@adnab.me> | 2023-06-09 15:34:09 +0000 |
---|---|---|
committer | Alex <alex@adnab.me> | 2023-06-09 15:34:09 +0000 |
commit | 0a06fda0da35f4018c39bf6aec90e55bdf42d241 (patch) | |
tree | d2b758400f336b63e236c431a965d191954322a8 /src | |
parent | e7e164a280dfc1c4adf9d6da6f3b2a9674eca4bd (diff) | |
parent | 3d477906d4ff418de10973db7bd3e940f2e47b2d (diff) | |
download | garage-0a06fda0da35f4018c39bf6aec90e55bdf42d241.tar.gz garage-0a06fda0da35f4018c39bf6aec90e55bdf42d241.zip |
Merge pull request 'Fix #204 (full Multipart Uploads semantics)' (#553) from nlnet-task1 into next
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/553
Diffstat (limited to 'src')
-rw-r--r-- | src/api/admin/bucket.rs | 28 | ||||
-rw-r--r-- | src/api/admin/key.rs | 4 | ||||
-rw-r--r-- | src/api/s3/api_server.rs | 9 | ||||
-rw-r--r-- | src/api/s3/copy.rs | 121 | ||||
-rw-r--r-- | src/api/s3/get.rs | 10 | ||||
-rw-r--r-- | src/api/s3/list.rs | 341 | ||||
-rw-r--r-- | src/api/s3/mod.rs | 1 | ||||
-rw-r--r-- | src/api/s3/multipart.rs | 464 | ||||
-rw-r--r-- | src/api/s3/put.rs | 431 | ||||
-rw-r--r-- | src/garage/admin/block.rs | 106 | ||||
-rw-r--r-- | src/garage/admin/bucket.rs | 10 | ||||
-rw-r--r-- | src/garage/admin/mod.rs | 3 | ||||
-rw-r--r-- | src/garage/cli/cmd.rs | 6 | ||||
-rw-r--r-- | src/garage/cli/structs.rs | 13 | ||||
-rw-r--r-- | src/garage/cli/util.rs | 66 | ||||
-rw-r--r-- | src/garage/repair/online.rs | 226 | ||||
-rw-r--r-- | src/garage/tests/s3/multipart.rs | 223 | ||||
-rw-r--r-- | src/model/garage.rs | 24 | ||||
-rw-r--r-- | src/model/helper/bucket.rs | 10 | ||||
-rw-r--r-- | src/model/s3/mod.rs | 1 | ||||
-rw-r--r-- | src/model/s3/mpu_table.rs | 245 | ||||
-rw-r--r-- | src/model/s3/object_table.rs | 167 | ||||
-rw-r--r-- | src/model/s3/version_table.rs | 95 | ||||
-rw-r--r-- | src/rpc/layout.rs | 2 | ||||
-rw-r--r-- | src/table/data.rs | 4 | ||||
-rw-r--r-- | src/table/schema.rs | 6 | ||||
-rw-r--r-- | src/web/web_server.rs | 2 |
27 files changed, 1760 insertions, 858 deletions
diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs index e60f07ca..17f46c30 100644 --- a/src/api/admin/bucket.rs +++ b/src/api/admin/bucket.rs @@ -14,6 +14,7 @@ use garage_model::bucket_alias_table::*; use garage_model::bucket_table::*; use garage_model::garage::Garage; use garage_model::permission::*; +use garage_model::s3::mpu_table; use garage_model::s3::object_table::*; use crate::admin::error::*; @@ -124,6 +125,14 @@ async fn bucket_info_results( .map(|x| x.filtered_values(&garage.system.ring.borrow())) .unwrap_or_default(); + let mpu_counters = garage + .mpu_counter_table + .table + .get(&bucket_id, &EmptyKey) + .await? + .map(|x| x.filtered_values(&garage.system.ring.borrow())) + .unwrap_or_default(); + let mut relevant_keys = HashMap::new(); for (k, _) in bucket .state @@ -183,8 +192,8 @@ async fn bucket_info_results( } }), keys: relevant_keys - .into_iter() - .map(|(_, key)| { + .into_values() + .map(|key| { let p = key.state.as_option().unwrap(); GetBucketInfoKey { access_key_id: key.key_id, @@ -208,12 +217,12 @@ async fn bucket_info_results( } }) .collect::<Vec<_>>(), - objects: counters.get(OBJECTS).cloned().unwrap_or_default(), - bytes: counters.get(BYTES).cloned().unwrap_or_default(), - unfinished_uploads: counters - .get(UNFINISHED_UPLOADS) - .cloned() - .unwrap_or_default(), + objects: *counters.get(OBJECTS).unwrap_or(&0), + bytes: *counters.get(BYTES).unwrap_or(&0), + unfinished_uploads: *counters.get(UNFINISHED_UPLOADS).unwrap_or(&0), + unfinished_multipart_uploads: *mpu_counters.get(mpu_table::UPLOADS).unwrap_or(&0), + unfinished_multipart_upload_parts: *mpu_counters.get(mpu_table::PARTS).unwrap_or(&0), + unfinished_multipart_upload_bytes: *mpu_counters.get(mpu_table::BYTES).unwrap_or(&0), quotas: ApiBucketQuotas { max_size: quotas.max_size, max_objects: quotas.max_objects, @@ -235,6 +244,9 @@ struct GetBucketInfoResult { objects: i64, bytes: i64, unfinished_uploads: i64, + unfinished_multipart_uploads: i64, + unfinished_multipart_upload_parts: i64, + unfinished_multipart_upload_bytes: i64, quotas: ApiBucketQuotas, } diff --git a/src/api/admin/key.rs b/src/api/admin/key.rs index 2bbabb7b..d74ca361 100644 --- a/src/api/admin/key.rs +++ b/src/api/admin/key.rs @@ -183,8 +183,8 @@ async fn key_info_results(garage: &Arc<Garage>, key: Key) -> Result<Response<Bod create_bucket: *key_state.allow_create_bucket.get(), }, buckets: relevant_buckets - .into_iter() - .map(|(_, bucket)| { + .into_values() + .map(|bucket| { let state = bucket.state.as_option().unwrap(); KeyInfoBucketResult { id: hex::encode(bucket.id), diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 27837297..5e793082 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -27,6 +27,7 @@ use crate::s3::cors::*; use crate::s3::delete::*; use crate::s3::get::*; use crate::s3::list::*; +use crate::s3::multipart::*; use crate::s3::post_object::handle_post_object; use crate::s3::put::*; use crate::s3::router::Endpoint; @@ -256,7 +257,7 @@ impl ApiHandler for S3ApiServer { bucket_name, bucket_id, delimiter: delimiter.map(|d| d.to_string()), - page_size: max_keys.map(|p| p.clamp(1, 1000)).unwrap_or(1000), + page_size: max_keys.unwrap_or(1000).clamp(1, 1000), prefix: prefix.unwrap_or_default(), urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), }, @@ -286,7 +287,7 @@ impl ApiHandler for S3ApiServer { bucket_name, bucket_id, delimiter: delimiter.map(|d| d.to_string()), - page_size: max_keys.map(|p| p.clamp(1, 1000)).unwrap_or(1000), + page_size: max_keys.unwrap_or(1000).clamp(1, 1000), urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), prefix: prefix.unwrap_or_default(), }, @@ -319,7 +320,7 @@ impl ApiHandler for S3ApiServer { bucket_name, bucket_id, delimiter: delimiter.map(|d| d.to_string()), - page_size: max_uploads.map(|p| p.clamp(1, 1000)).unwrap_or(1000), + page_size: max_uploads.unwrap_or(1000).clamp(1, 1000), prefix: prefix.unwrap_or_default(), urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), }, @@ -343,7 +344,7 @@ impl ApiHandler for S3ApiServer { key, upload_id, part_number_marker: part_number_marker.map(|p| p.clamp(1, 10000)), - max_parts: max_parts.map(|p| p.clamp(1, 1000)).unwrap_or(1000), + max_parts: max_parts.unwrap_or(1000).clamp(1, 1000), }, ) .await diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 7eb6459d..68b4f0c9 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -2,7 +2,7 @@ use std::pin::Pin; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use futures::{stream, stream::Stream, StreamExt, TryFutureExt}; +use futures::{stream, stream::Stream, StreamExt}; use md5::{Digest as Md5Digest, Md5}; use bytes::Bytes; @@ -18,12 +18,14 @@ use garage_util::time::*; use garage_model::garage::Garage; use garage_model::key_table::Key; use garage_model::s3::block_ref_table::*; +use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; use crate::helpers::parse_bucket_key; use crate::s3::error::*; -use crate::s3::put::{decode_upload_id, get_headers}; +use crate::s3::multipart; +use crate::s3::put::get_headers; use crate::s3::xml::{self as s3_xml, xmlns_tag}; pub async fn handle_copy( @@ -92,7 +94,10 @@ pub async fn handle_copy( let tmp_dest_object_version = ObjectVersion { uuid: new_uuid, timestamp: new_timestamp, - state: ObjectVersionState::Uploading(new_meta.headers.clone()), + state: ObjectVersionState::Uploading { + headers: new_meta.headers.clone(), + multipart: false, + }, }; let tmp_dest_object = Object::new( dest_bucket_id, @@ -105,8 +110,14 @@ pub async fn handle_copy( // this means that the BlockRef entries linked to this version cannot be // marked as deleted (they are marked as deleted only if the Version // doesn't exist or is marked as deleted). - let mut dest_version = - Version::new(new_uuid, dest_bucket_id, dest_key.to_string(), false); + let mut dest_version = Version::new( + new_uuid, + VersionBacklink::Object { + bucket_id: dest_bucket_id, + key: dest_key.to_string(), + }, + false, + ); garage.version_table.insert(&dest_version).await?; // Fill in block list for version and insert block refs @@ -179,17 +190,13 @@ pub async fn handle_upload_part_copy( ) -> Result<Response<Body>, Error> { let copy_precondition = CopyPreconditionHeaders::parse(req)?; - let dest_version_uuid = decode_upload_id(upload_id)?; + let dest_upload_id = multipart::decode_upload_id(upload_id)?; let dest_key = dest_key.to_string(); - let (source_object, dest_object) = futures::try_join!( + let (source_object, (_, _, mut dest_mpu)) = futures::try_join!( get_copy_source(&garage, api_key, req), - garage - .object_table - .get(&dest_bucket_id, &dest_key) - .map_err(Error::from), + multipart::get_upload(&garage, &dest_bucket_id, &dest_key, &dest_upload_id) )?; - let dest_object = dest_object.ok_or(Error::NoSuchKey)?; let (source_object_version, source_version_data, source_version_meta) = extract_source_info(&source_object)?; @@ -217,15 +224,6 @@ pub async fn handle_upload_part_copy( }, }; - // Check destination version is indeed in uploading state - if !dest_object - .versions() - .iter() - .any(|v| v.uuid == dest_version_uuid && v.is_uploading()) - { - return Err(Error::NoSuchUpload); - } - // Check source version is not inlined match source_version_data { ObjectVersionData::DeleteMarker => unreachable!(), @@ -242,23 +240,11 @@ pub async fn handle_upload_part_copy( // Fetch source versin with its block list, // and destination version to check part hasn't yet been uploaded - let (source_version, dest_version) = futures::try_join!( - garage - .version_table - .get(&source_object_version.uuid, &EmptyKey), - garage.version_table.get(&dest_version_uuid, &EmptyKey), - )?; - let source_version = source_version.ok_or(Error::NoSuchKey)?; - - // Check this part number hasn't yet been uploaded - if let Some(dv) = dest_version { - if dv.has_part_number(part_number) { - return Err(Error::bad_request(format!( - "Part number {} has already been uploaded", - part_number - ))); - } - } + let source_version = garage + .version_table + .get(&source_object_version.uuid, &EmptyKey) + .await? + .ok_or(Error::NoSuchKey)?; // We want to reuse blocks from the source version as much as possible. // However, we still need to get the data from these blocks @@ -299,6 +285,33 @@ pub async fn handle_upload_part_copy( current_offset = block_end; } + // Calculate the identity of destination part: timestamp, version id + let dest_version_id = gen_uuid(); + let dest_mpu_part_key = MpuPartKey { + part_number, + timestamp: dest_mpu.next_timestamp(part_number), + }; + + // Create the uploaded part + dest_mpu.parts.clear(); + dest_mpu.parts.put( + dest_mpu_part_key, + MpuPart { + version: dest_version_id, + etag: None, + size: None, + }, + ); + garage.mpu_table.insert(&dest_mpu).await?; + + let mut dest_version = Version::new( + dest_version_id, + VersionBacklink::MultipartUpload { + upload_id: dest_upload_id, + }, + false, + ); + // Now, actually copy the blocks let mut md5hasher = Md5::new(); @@ -348,8 +361,8 @@ pub async fn handle_upload_part_copy( let must_upload = existing_block_hash.is_none(); let final_hash = existing_block_hash.unwrap_or_else(|| blake2sum(&data[..])); - let mut version = Version::new(dest_version_uuid, dest_bucket_id, dest_key.clone(), false); - version.blocks.put( + dest_version.blocks.clear(); + dest_version.blocks.put( VersionBlockKey { part_number, offset: current_offset, @@ -363,7 +376,7 @@ pub async fn handle_upload_part_copy( let block_ref = BlockRef { block: final_hash, - version: dest_version_uuid, + version: dest_version_id, deleted: false.into(), }; @@ -378,23 +391,33 @@ pub async fn handle_upload_part_copy( Ok(()) } }, - // Thing 2: we need to insert the block in the version - garage.version_table.insert(&version), - // Thing 3: we need to add a block reference - garage.block_ref_table.insert(&block_ref), + async { + // Thing 2: we need to insert the block in the version + garage.version_table.insert(&dest_version).await?; + // Thing 3: we need to add a block reference + garage.block_ref_table.insert(&block_ref).await + }, // Thing 4: we need to prefetch the next block defragmenter.next(), )?; - next_block = res.3; + next_block = res.2; } + assert_eq!(current_offset, source_range.length); + let data_md5sum = md5hasher.finalize(); let etag = hex::encode(data_md5sum); // Put the part's ETag in the Versiontable - let mut version = Version::new(dest_version_uuid, dest_bucket_id, dest_key.clone(), false); - version.parts_etags.put(part_number, etag.clone()); - garage.version_table.insert(&version).await?; + dest_mpu.parts.put( + dest_mpu_part_key, + MpuPart { + version: dest_version_id, + etag: Some(etag.clone()), + size: Some(current_offset), + }, + ); + garage.mpu_table.insert(&dest_mpu).await?; // LGTM let resp_xml = s3_xml::to_xml_with_header(&CopyPartResult { diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index 2a99551a..5e682726 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -149,7 +149,6 @@ pub async fn handle_head( let (part_offset, part_end) = calculate_part_bounds(&version, pn).ok_or(Error::InvalidPart)?; - let n_parts = version.parts_etags.items().len(); Ok(object_headers(object_version, version_meta) .header(CONTENT_LENGTH, format!("{}", part_end - part_offset)) @@ -162,7 +161,7 @@ pub async fn handle_head( version_meta.size ), ) - .header(X_AMZ_MP_PARTS_COUNT, format!("{}", n_parts)) + .header(X_AMZ_MP_PARTS_COUNT, format!("{}", version.n_parts()?)) .status(StatusCode::PARTIAL_CONTENT) .body(Body::empty())?) } @@ -376,7 +375,6 @@ async fn handle_get_part( let (begin, end) = calculate_part_bounds(&version, part_number).ok_or(Error::InvalidPart)?; - let n_parts = version.parts_etags.items().len(); let body = body_from_blocks_range(garage, version.blocks.items(), begin, end); @@ -386,7 +384,7 @@ async fn handle_get_part( CONTENT_RANGE, format!("bytes {}-{}/{}", begin, end - 1, version_meta.size), ) - .header(X_AMZ_MP_PARTS_COUNT, format!("{}", n_parts)) + .header(X_AMZ_MP_PARTS_COUNT, format!("{}", version.n_parts()?)) .body(body)?) } _ => unreachable!(), @@ -443,7 +441,7 @@ fn body_from_blocks_range( // block.part_number, which is not the same in the case of a multipart upload) let mut blocks: Vec<(VersionBlock, u64)> = Vec::with_capacity(std::cmp::min( all_blocks.len(), - 4 + ((end - begin) / std::cmp::max(all_blocks[0].1.size as u64, 1024)) as usize, + 4 + ((end - begin) / std::cmp::max(all_blocks[0].1.size, 1024)) as usize, )); let mut block_offset: u64 = 0; for (_, b) in all_blocks.iter() { @@ -454,7 +452,7 @@ fn body_from_blocks_range( if block_offset < end && block_offset + b.size > begin { blocks.push((*b, block_offset)); } - block_offset += b.size as u64; + block_offset += b.size; } let order_stream = OrderTag::stream(); diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs index 5cb0d65a..7408d4d3 100644 --- a/src/api/s3/list.rs +++ b/src/api/s3/list.rs @@ -1,4 +1,3 @@ -use std::cmp::Ordering; use std::collections::{BTreeMap, BTreeSet}; use std::iter::{Iterator, Peekable}; use std::sync::Arc; @@ -11,15 +10,15 @@ use garage_util::error::Error as GarageError; use garage_util::time::*; use garage_model::garage::Garage; +use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; -use garage_model::s3::version_table::Version; -use garage_table::{EmptyKey, EnumerationOrder}; +use garage_table::EnumerationOrder; use crate::encoding::*; use crate::helpers::key_after_prefix; use crate::s3::error::*; -use crate::s3::put as s3_put; +use crate::s3::multipart as s3_multipart; use crate::s3::xml as s3_xml; const DUMMY_NAME: &str = "Dummy Key"; @@ -176,7 +175,9 @@ pub async fn handle_list_multipart_upload( t.get_range( &bucket, key, - Some(ObjectFilter::IsUploading), + Some(ObjectFilter::IsUploading { + check_multipart: Some(true), + }), count, EnumerationOrder::Forward, ) @@ -272,24 +273,26 @@ pub async fn handle_list_parts( ) -> Result<Response<Body>, Error> { debug!("ListParts {:?}", query); - let upload_id = s3_put::decode_upload_id(&query.upload_id)?; + let upload_id = s3_multipart::decode_upload_id(&query.upload_id)?; - let (object, version) = futures::try_join!( - garage.object_table.get(&query.bucket_id, &query.key), - garage.version_table.get(&upload_id, &EmptyKey), - )?; + let (_, _, mpu) = + s3_multipart::get_upload(&garage, &query.bucket_id, &query.key, &upload_id).await?; - let (info, next) = fetch_part_info(query, object, version, upload_id)?; + let (info, next) = fetch_part_info(query, &mpu)?; let result = s3_xml::ListPartsResult { xmlns: (), + + // Query parameters bucket: s3_xml::Value(query.bucket_name.to_string()), key: s3_xml::Value(query.key.to_string()), upload_id: s3_xml::Value(query.upload_id.to_string()), part_number_marker: query.part_number_marker.map(|e| s3_xml::IntValue(e as i64)), - next_part_number_marker: next.map(|e| s3_xml::IntValue(e as i64)), max_parts: s3_xml::IntValue(query.max_parts as i64), - is_truncated: s3_xml::Value(next.map(|_| "true").unwrap_or("false").to_string()), + + // Result values + next_part_number_marker: next.map(|e| s3_xml::IntValue(e as i64)), + is_truncated: s3_xml::Value(format!("{}", next.is_some())), parts: info .iter() .map(|part| s3_xml::PartItem { @@ -299,6 +302,8 @@ pub async fn handle_list_parts( size: s3_xml::IntValue(part.size as i64), }) .collect(), + + // Dummy result values (unsupported features) initiator: s3_xml::Initiator { display_name: s3_xml::Value(DUMMY_NAME.to_string()), id: s3_xml::Value(DUMMY_KEY.to_string()), @@ -335,8 +340,8 @@ struct UploadInfo { } #[derive(Debug, PartialEq)] -struct PartInfo { - etag: String, +struct PartInfo<'a> { + etag: &'a str, timestamp: u64, part_number: u64, size: u64, @@ -456,107 +461,51 @@ where } } -fn fetch_part_info( +fn fetch_part_info<'a>( query: &ListPartsQuery, - object: Option<Object>, - version: Option<Version>, - upload_id: Uuid, -) -> Result<(Vec<PartInfo>, Option<u64>), Error> { - // Check results - let object = object.ok_or(Error::NoSuchKey)?; - - let obj_version = object - .versions() - .iter() - .find(|v| v.uuid == upload_id && v.is_uploading()) - .ok_or(Error::NoSuchUpload)?; - - let version = version.ok_or(Error::NoSuchKey)?; - - // Cut the beginning of our 2 vectors if required - let (etags, blocks) = match &query.part_number_marker { - Some(marker) => { - let next = marker + 1; - - let part_idx = into_ok_or_err( - version - .parts_etags - .items() - .binary_search_by(|(part_num, _)| part_num.cmp(&next)), - ); - let parts = &version.parts_etags.items()[part_idx..]; - - let block_idx = into_ok_or_err( - version - .blocks - .items() - .binary_search_by(|(vkey, _)| vkey.part_number.cmp(&next)), - ); - let blocks = &version.blocks.items()[block_idx..]; - - (parts, blocks) - } - None => (version.parts_etags.items(), version.blocks.items()), - }; - - // Use the block vector to compute a (part_number, size) vector - let mut size = Vec::<(u64, u64)>::new(); - blocks.iter().for_each(|(key, val)| { - let mut new_size = val.size; - match size.pop() { - Some((part_number, size)) if part_number == key.part_number => new_size += size, - Some(v) => size.push(v), - None => (), - } - size.push((key.part_number, new_size)) - }); - - // Merge the etag vector and size vector to build a PartInfo vector - let max_parts = query.max_parts as usize; - let (mut etag_iter, mut size_iter) = (etags.iter().peekable(), size.iter().peekable()); - - let mut info = Vec::<PartInfo>::with_capacity(max_parts); - - while info.len() < max_parts { - match (etag_iter.peek(), size_iter.peek()) { - (Some((ep, etag)), Some((sp, size))) => match ep.cmp(sp) { - Ordering::Less => { - debug!("ETag information ignored due to missing corresponding block information. Query: {:?}", query); - etag_iter.next(); - } - Ordering::Equal => { - info.push(PartInfo { - etag: etag.to_string(), - timestamp: obj_version.timestamp, - part_number: *ep, - size: *size, - }); - etag_iter.next(); - size_iter.next(); + mpu: &'a MultipartUpload, +) -> Result<(Vec<PartInfo<'a>>, Option<u64>), Error> { + assert!((1..=1000).contains(&query.max_parts)); // see s3/api_server.rs + + // Parse multipart upload part list, removing parts not yet finished + // and failed part uploads that were overwritten + let mut parts: Vec<PartInfo<'a>> = Vec::with_capacity(mpu.parts.items().len()); + for (pk, p) in mpu.parts.items().iter() { + if let (Some(etag), Some(size)) = (&p.etag, p.size) { + let part_info = PartInfo { + part_number: pk.part_number, + timestamp: pk.timestamp, + etag, + size, + }; + match parts.last_mut() { + Some(lastpart) if lastpart.part_number == pk.part_number => { + *lastpart = part_info; } - Ordering::Greater => { - debug!("Block information ignored due to missing corresponding ETag information. Query: {:?}", query); - size_iter.next(); + _ => { + parts.push(part_info); } - }, - (None, None) => return Ok((info, None)), - _ => { - debug!( - "Additional block or ETag information ignored. Query: {:?}", - query - ); - return Ok((info, None)); } } } - match info.last() { - Some(part_info) => { - let pagination = Some(part_info.part_number); - Ok((info, pagination)) - } - None => Ok((info, None)), + // Cut the beginning if we have a marker + if let Some(marker) = &query.part_number_marker { + let next = marker + 1; + let part_idx = parts + .binary_search_by(|part| part.part_number.cmp(&next)) + .unwrap_or_else(|x| x); + parts = parts.split_off(part_idx); + } + + // Cut the end if we have too many parts + if parts.len() > query.max_parts as usize { + parts.truncate(query.max_parts as usize); + let pagination = Some(parts.last().unwrap().part_number); + return Ok((parts, pagination)); } + + Ok((parts, None)) } /* @@ -651,7 +600,7 @@ impl ListMultipartUploadsQuery { }), uuid => Ok(RangeBegin::AfterUpload { key: key_marker.to_string(), - upload: s3_put::decode_upload_id(uuid)?, + upload: s3_multipart::decode_upload_id(uuid)?, }), }, @@ -843,7 +792,7 @@ impl ExtractAccumulator for UploadAccumulator { let mut uploads_for_key = object .versions() .iter() - .filter(|x| x.is_uploading()) + .filter(|x| x.is_uploading(Some(true))) .collect::<Vec<&ObjectVersion>>(); // S3 logic requires lexicographically sorted upload ids. @@ -918,14 +867,6 @@ impl ExtractAccumulator for UploadAccumulator { * Utility functions */ -/// This is a stub for Result::into_ok_or_err that is not yet in Rust stable -fn into_ok_or_err<T>(r: Result<T, T>) -> T { - match r { - Ok(r) => r, - Err(r) => r, - } -} - /// Returns the common prefix of the object given the query prefix and delimiter fn common_prefix<'a>(object: &'a Object, query: &ListQueryCommon) -> Option<&'a str> { match &query.delimiter { @@ -951,7 +892,6 @@ fn uriencode_maybe(s: &str, yes: bool) -> s3_xml::Value { #[cfg(test)] mod tests { use super::*; - use garage_model::s3::version_table::*; use garage_util::*; use std::iter::FromIterator; @@ -991,10 +931,13 @@ mod tests { ObjectVersion { uuid: Uuid::from(uuid), timestamp: TS, - state: ObjectVersionState::Uploading(ObjectVersionHeaders { - content_type: "text/plain".to_string(), - other: BTreeMap::<String, String>::new(), - }), + state: ObjectVersionState::Uploading { + multipart: true, + headers: ObjectVersionHeaders { + content_type: "text/plain".to_string(), + other: BTreeMap::<String, String>::new(), + }, + }, } } @@ -1169,83 +1112,76 @@ mod tests { Ok(()) } - fn version() -> Version { + fn mpu() -> MultipartUpload { let uuid = Uuid::from([0x08; 32]); - let blocks = vec![ + let parts = vec![ ( - VersionBlockKey { + MpuPartKey { part_number: 1, - offset: 1, + timestamp: TS, }, - VersionBlock { - hash: uuid, - size: 3, + MpuPart { + version: uuid, + size: Some(3), + etag: Some("etag1".into()), }, ), ( - VersionBlockKey { - part_number: 1, - offset: 2, + MpuPartKey { + part_number: 2, + timestamp: TS, }, - VersionBlock { - hash: uuid, - size: 2, + MpuPart { + version: uuid, + size: None, + etag: None, }, ), ( - VersionBlockKey { - part_number: 2, - offset: 1, + MpuPartKey { + part_number: 3, + timestamp: TS, }, - VersionBlock { - hash: uuid, - size: 8, + MpuPart { + version: uuid, + size: Some(10), + etag: Some("etag2".into()), }, ), ( - VersionBlockKey { + MpuPartKey { part_number: 5, - offset: 1, + timestamp: TS, }, - VersionBlock { - hash: uuid, - size: 7, + MpuPart { + version: uuid, + size: Some(7), + etag: Some("etag3".into()), }, ), ( - VersionBlockKey { + MpuPartKey { part_number: 8, - offset: 1, + timestamp: TS, }, - VersionBlock { - hash: uuid, - size: 5, + MpuPart { + version: uuid, + size: Some(5), + etag: Some("etag4".into()), }, ), ]; - let etags = vec![ - (1, "etag1".to_string()), - (3, "etag2".to_string()), - (5, "etag3".to_string()), - (8, "etag4".to_string()), - (9, "etag5".to_string()), - ]; - Version { - bucket_id: uuid, - key: "a".to_string(), - uuid, + MultipartUpload { + upload_id: uuid, deleted: false.into(), - blocks: crdt::Map::<VersionBlockKey, VersionBlock>::from_iter(blocks), - parts_etags: crdt::Map::<u64, String>::from_iter(etags), + parts: crdt::Map::<MpuPartKey, MpuPart>::from_iter(parts), + bucket_id: uuid, + key: "a".into(), } } - fn obj() -> Object { - Object::new(bucket(), "d".to_string(), vec![objup_version([0x08; 32])]) - } - #[test] fn test_fetch_part_info() -> Result<(), Error> { let uuid = Uuid::from([0x08; 32]); @@ -1258,82 +1194,85 @@ mod tests { max_parts: 2, }; - assert!( - fetch_part_info(&query, None, None, uuid).is_err(), - "No object and version should fail" - ); - assert!( - fetch_part_info(&query, Some(obj()), None, uuid).is_err(), - "No version should faild" - ); - assert!( - fetch_part_info(&query, None, Some(version()), uuid).is_err(), - "No object should fail" - ); + let mpu = mpu(); // Start from the beginning but with limited size to trigger pagination - let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?; - assert_eq!(pagination.unwrap(), 5); + let (info, pagination) = fetch_part_info(&query, &mpu)?; + assert_eq!(pagination.unwrap(), 3); assert_eq!( info, vec![ PartInfo { - etag: "etag1".to_string(), + etag: "etag1", timestamp: TS, part_number: 1, - size: 5 + size: 3 }, PartInfo { - etag: "etag3".to_string(), + etag: "etag2", timestamp: TS, - part_number: 5, - size: 7 + part_number: 3, + size: 10 }, ] ); // Use previous pagination to make a new request query.part_number_marker = Some(pagination.unwrap()); - let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?; + let (info, pagination) = fetch_part_info(&query, &mpu)?; assert!(pagination.is_none()); assert_eq!( info, - vec![PartInfo { - etag: "etag4".to_string(), - timestamp: TS, - part_number: 8, - size: 5 - },] + vec![ + PartInfo { + etag: "etag3", + timestamp: TS, + part_number: 5, + size: 7 + }, + PartInfo { + etag: "etag4", + timestamp: TS, + part_number: 8, + size: 5 + }, + ] ); // Trying to access a part that is way larger than registered ones query.part_number_marker = Some(9999); - let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?; + let (info, pagination) = fetch_part_info(&query, &mpu)?; assert!(pagination.is_none()); assert_eq!(info, vec![]); // Try without any limitation query.max_parts = 1000; query.part_number_marker = None; - let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?; + let (info, pagination) = fetch_part_info(&query, &mpu)?; assert!(pagination.is_none()); assert_eq!( info, vec![ PartInfo { - etag: "etag1".to_string(), + etag: "etag1", timestamp: TS, part_number: 1, - size: 5 + size: 3 + }, + PartInfo { + etag: "etag2", + timestamp: TS, + part_number: 3, + size: 10 }, PartInfo { - etag: "etag3".to_string(), + etag: "etag3", timestamp: TS, part_number: 5, size: 7 }, PartInfo { - etag: "etag4".to_string(), + etag: "etag4", timestamp: TS, part_number: 8, size: 5 diff --git a/src/api/s3/mod.rs b/src/api/s3/mod.rs index 7b56d4d8..b5237bf7 100644 --- a/src/api/s3/mod.rs +++ b/src/api/s3/mod.rs @@ -7,6 +7,7 @@ pub mod cors; mod delete; pub mod get; mod list; +mod multipart; mod post_object; mod put; mod website; diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs new file mode 100644 index 00000000..7df0dafc --- /dev/null +++ b/src/api/s3/multipart.rs @@ -0,0 +1,464 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use futures::prelude::*; +use hyper::body::Body; +use hyper::{Request, Response}; +use md5::{Digest as Md5Digest, Md5}; + +use garage_table::*; +use garage_util::async_hash::*; +use garage_util::data::*; +use garage_util::time::*; + +use garage_model::bucket_table::Bucket; +use garage_model::garage::Garage; +use garage_model::s3::block_ref_table::*; +use garage_model::s3::mpu_table::*; +use garage_model::s3::object_table::*; +use garage_model::s3::version_table::*; + +use crate::s3::error::*; +use crate::s3::put::*; +use crate::s3::xml as s3_xml; +use crate::signature::verify_signed_content; + +// ---- + +pub async fn handle_create_multipart_upload( + garage: Arc<Garage>, + req: &Request<Body>, + bucket_name: &str, + bucket_id: Uuid, + key: &str, +) -> Result<Response<Body>, Error> { + let upload_id = gen_uuid(); + let headers = get_headers(req.headers())?; + + // Create object in object table + let object_version = ObjectVersion { + uuid: upload_id, + timestamp: now_msec(), + state: ObjectVersionState::Uploading { + multipart: true, + headers, + }, + }; + let object = Object::new(bucket_id, key.to_string(), vec![object_version]); + garage.object_table.insert(&object).await?; + + // Create multipart upload in mpu table + // This multipart upload will hold references to uploaded parts + // (which are entries in the Version table) + let mpu = MultipartUpload::new(upload_id, bucket_id, key.into(), false); + garage.mpu_table.insert(&mpu).await?; + + // Send success response + let result = s3_xml::InitiateMultipartUploadResult { + xmlns: (), + bucket: s3_xml::Value(bucket_name.to_string()), + key: s3_xml::Value(key.to_string()), + upload_id: s3_xml::Value(hex::encode(upload_id)), + }; + let xml = s3_xml::to_xml_with_header(&result)?; + + Ok(Response::new(Body::from(xml.into_bytes()))) +} + +pub async fn handle_put_part( + garage: Arc<Garage>, + req: Request<Body>, + bucket_id: Uuid, + key: &str, + part_number: u64, + upload_id: &str, + content_sha256: Option<Hash>, +) -> Result<Response<Body>, Error> { + let upload_id = decode_upload_id(upload_id)?; + + let content_md5 = match req.headers().get("content-md5") { + Some(x) => Some(x.to_str()?.to_string()), + None => None, + }; + + // Read first chuck, and at the same time try to get object to see if it exists + let key = key.to_string(); + + let body = req.into_body().map_err(Error::from); + let mut chunker = StreamChunker::new(body, garage.config.block_size); + + let ((_, _, mut mpu), first_block) = futures::try_join!( + get_upload(&garage, &bucket_id, &key, &upload_id), + chunker.next(), + )?; + + // Check object is valid and part can be accepted + let first_block = first_block.ok_or_bad_request("Empty body")?; + + // Calculate part identity: timestamp, version id + let version_uuid = gen_uuid(); + let mpu_part_key = MpuPartKey { + part_number, + timestamp: mpu.next_timestamp(part_number), + }; + + // The following consists in many steps that can each fail. + // Keep track that some cleanup will be needed if things fail + // before everything is finished (cleanup is done using the Drop trait). + let mut interrupted_cleanup = InterruptedCleanup(Some(InterruptedCleanupInner { + garage: garage.clone(), + upload_id, + version_uuid, + })); + + // Create version and link version from MPU + mpu.parts.clear(); + mpu.parts.put( + mpu_part_key, + MpuPart { + version: version_uuid, + etag: None, + size: None, + }, + ); + garage.mpu_table.insert(&mpu).await?; + + let version = Version::new( + version_uuid, + VersionBacklink::MultipartUpload { upload_id }, + false, + ); + garage.version_table.insert(&version).await?; + + // Copy data to version + let first_block_hash = async_blake2sum(first_block.clone()).await; + + let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks( + &garage, + &version, + part_number, + first_block, + first_block_hash, + &mut chunker, + ) + .await?; + + // Verify that checksums map + ensure_checksum_matches( + data_md5sum.as_slice(), + data_sha256sum, + content_md5.as_deref(), + content_sha256, + )?; + + // Store part etag in version + let data_md5sum_hex = hex::encode(data_md5sum); + mpu.parts.put( + mpu_part_key, + MpuPart { + version: version_uuid, + etag: Some(data_md5sum_hex.clone()), + size: Some(total_size), + }, + ); + garage.mpu_table.insert(&mpu).await?; + + // We were not interrupted, everything went fine. + // We won't have to clean up on drop. + interrupted_cleanup.cancel(); + + let response = Response::builder() + .header("ETag", format!("\"{}\"", data_md5sum_hex)) + .body(Body::empty()) + .unwrap(); + Ok(response) +} + +struct InterruptedCleanup(Option<InterruptedCleanupInner>); +struct InterruptedCleanupInner { + garage: Arc<Garage>, + upload_id: Uuid, + version_uuid: Uuid, +} + +impl InterruptedCleanup { + fn cancel(&mut self) { + drop(self.0.take()); + } +} +impl Drop for InterruptedCleanup { + fn drop(&mut self) { + if let Some(info) = self.0.take() { + tokio::spawn(async move { + let version = Version::new( + info.version_uuid, + VersionBacklink::MultipartUpload { + upload_id: info.upload_id, + }, + true, + ); + if let Err(e) = info.garage.version_table.insert(&version).await { + warn!("Cannot cleanup after aborted UploadPart: {}", e); + } + }); + } + } +} + +pub async fn handle_complete_multipart_upload( + garage: Arc<Garage>, + req: Request<Body>, + bucket_name: &str, + bucket: &Bucket, + key: &str, + upload_id: &str, + content_sha256: Option<Hash>, +) -> Result<Response<Body>, Error> { + let body = hyper::body::to_bytes(req.into_body()).await?; + + if let Some(content_sha256) = content_sha256 { + verify_signed_content(content_sha256, &body[..])?; + } + + let body_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?; + let body_list_of_parts = parse_complete_multipart_upload_body(&body_xml) + .ok_or_bad_request("Invalid CompleteMultipartUpload XML")?; + debug!( + "CompleteMultipartUpload list of parts: {:?}", + body_list_of_parts + ); + + let upload_id = decode_upload_id(upload_id)?; + + // Get object and multipart upload + let key = key.to_string(); + let (_, mut object_version, mpu) = get_upload(&garage, &bucket.id, &key, &upload_id).await?; + + if mpu.parts.is_empty() { + return Err(Error::bad_request("No data was uploaded")); + } + + let headers = match object_version.state { + ObjectVersionState::Uploading { headers, .. } => headers, + _ => unreachable!(), + }; + + // Check that part numbers are an increasing sequence. + // (it doesn't need to start at 1 nor to be a continuous sequence, + // see discussion in #192) + if body_list_of_parts.is_empty() { + return Err(Error::EntityTooSmall); + } + if !body_list_of_parts + .iter() + .zip(body_list_of_parts.iter().skip(1)) + .all(|(p1, p2)| p1.part_number < p2.part_number) + { + return Err(Error::InvalidPartOrder); + } + + // Check that the list of parts they gave us corresponds to parts we have here + debug!("Parts stored in multipart upload: {:?}", mpu.parts.items()); + let mut have_parts = HashMap::new(); + for (pk, pv) in mpu.parts.items().iter() { + have_parts.insert(pk.part_number, pv); + } + let mut parts = vec![]; + for req_part in body_list_of_parts.iter() { + match have_parts.get(&req_part.part_number) { + Some(part) if part.etag.as_ref() == Some(&req_part.etag) && part.size.is_some() => { + parts.push(*part) + } + _ => return Err(Error::InvalidPart), + } + } + + let grg = &garage; + let parts_versions = futures::future::try_join_all(parts.iter().map(|p| async move { + grg.version_table + .get(&p.version, &EmptyKey) + .await? + .ok_or_internal_error("Part version missing from version table") + })) + .await?; + + // Create final version and block refs + let mut final_version = Version::new( + upload_id, + VersionBacklink::Object { + bucket_id: bucket.id, + key: key.to_string(), + }, + false, + ); + for (part_number, part_version) in parts_versions.iter().enumerate() { + if part_version.deleted.get() { + return Err(Error::InvalidPart); + } + for (vbk, vb) in part_version.blocks.items().iter() { + final_version.blocks.put( + VersionBlockKey { + part_number: (part_number + 1) as u64, + offset: vbk.offset, + }, + *vb, + ); + } + } + garage.version_table.insert(&final_version).await?; + + let block_refs = final_version.blocks.items().iter().map(|(_, b)| BlockRef { + block: b.hash, + version: upload_id, + deleted: false.into(), + }); + garage.block_ref_table.insert_many(block_refs).await?; + + // Calculate etag of final object + // To understand how etags are calculated, read more here: + // https://teppen.io/2018/06/23/aws_s3_etags/ + let mut etag_md5_hasher = Md5::new(); + for part in parts.iter() { + etag_md5_hasher.update(part.etag.as_ref().unwrap().as_bytes()); + } + let etag = format!( + "{}-{}", + hex::encode(etag_md5_hasher.finalize()), + parts.len() + ); + + // Calculate total size of final object + let total_size = parts.iter().map(|x| x.size.unwrap()).sum(); + + if let Err(e) = check_quotas(&garage, bucket, &key, total_size).await { + object_version.state = ObjectVersionState::Aborted; + let final_object = Object::new(bucket.id, key.clone(), vec![object_version]); + garage.object_table.insert(&final_object).await?; + + return Err(e); + } + + // Write final object version + object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( + ObjectVersionMeta { + headers, + size: total_size, + etag: etag.clone(), + }, + final_version.blocks.items()[0].1.hash, + )); + + let final_object = Object::new(bucket.id, key.clone(), vec![object_version]); + garage.object_table.insert(&final_object).await?; + + // Send response saying ok we're done + let result = s3_xml::CompleteMultipartUploadResult { + xmlns: (), + location: None, + bucket: s3_xml::Value(bucket_name.to_string()), + key: s3_xml::Value(key), + etag: s3_xml::Value(format!("\"{}\"", etag)), + }; + let xml = s3_xml::to_xml_with_header(&result)?; + + Ok(Response::new(Body::from(xml.into_bytes()))) +} + +pub async fn handle_abort_multipart_upload( + garage: Arc<Garage>, + bucket_id: Uuid, + key: &str, + upload_id: &str, +) -> Result<Response<Body>, Error> { + let upload_id = decode_upload_id(upload_id)?; + + let (_, mut object_version, _) = + get_upload(&garage, &bucket_id, &key.to_string(), &upload_id).await?; + + object_version.state = ObjectVersionState::Aborted; + let final_object = Object::new(bucket_id, key.to_string(), vec![object_version]); + garage.object_table.insert(&final_object).await?; + + Ok(Response::new(Body::from(vec![]))) +} + +// ======== helpers ============ + +#[allow(clippy::ptr_arg)] +pub(crate) async fn get_upload( + garage: &Garage, + bucket_id: &Uuid, + key: &String, + upload_id: &Uuid, +) -> Result<(Object, ObjectVersion, MultipartUpload), Error> { + let (object, mpu) = futures::try_join!( + garage.object_table.get(bucket_id, key).map_err(Error::from), + garage + .mpu_table + .get(upload_id, &EmptyKey) + .map_err(Error::from), + )?; + + let object = object.ok_or(Error::NoSuchUpload)?; + let mpu = mpu.ok_or(Error::NoSuchUpload)?; + + let object_version = object + .versions() + .iter() + .find(|v| v.uuid == *upload_id && v.is_uploading(Some(true))) + .ok_or(Error::NoSuchUpload)? + .clone(); + + Ok((object, object_version, mpu)) +} + +pub fn decode_upload_id(id: &str) -> Result<Uuid, Error> { + let id_bin = hex::decode(id).map_err(|_| Error::NoSuchUpload)?; + if id_bin.len() != 32 { + return Err(Error::NoSuchUpload); + } + let mut uuid = [0u8; 32]; + uuid.copy_from_slice(&id_bin[..]); + Ok(Uuid::from(uuid)) +} + +#[derive(Debug)] +struct CompleteMultipartUploadPart { + etag: String, + part_number: u64, +} + +fn parse_complete_multipart_upload_body( + xml: &roxmltree::Document, +) -> Option<Vec<CompleteMultipartUploadPart>> { + let mut parts = vec![]; + + let root = xml.root(); + let cmu = root.first_child()?; + if !cmu.has_tag_name("CompleteMultipartUpload") { + return None; + } + + for item in cmu.children() { + // Only parse <Part> nodes + if !item.is_element() { + continue; + } + + if item.has_tag_name("Part") { + let etag = item.children().find(|e| e.has_tag_name("ETag"))?.text()?; + let part_number = item + .children() + .find(|e| e.has_tag_name("PartNumber"))? + .text()?; + parts.push(CompleteMultipartUploadPart { + etag: etag.trim_matches('"').to_string(), + part_number: part_number.parse().ok()?, + }); + } else { + return None; + } + } + + Some(parts) +} diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 350ab884..c7ac5030 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, BTreeSet, HashMap}; +use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use base64::prelude::*; @@ -30,8 +30,6 @@ use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; use crate::s3::error::*; -use crate::s3::xml as s3_xml; -use crate::signature::verify_signed_content; pub async fn handle_put( garage: Arc<Garage>, @@ -123,20 +121,23 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( // The following consists in many steps that can each fail. // Keep track that some cleanup will be needed if things fail // before everything is finished (cleanup is done using the Drop trait). - let mut interrupted_cleanup = InterruptedCleanup(Some(( - garage.clone(), - bucket.id, - key.into(), + let mut interrupted_cleanup = InterruptedCleanup(Some(InterruptedCleanupInner { + garage: garage.clone(), + bucket_id: bucket.id, + key: key.into(), version_uuid, version_timestamp, - ))); + })); // Write version identifier in object table so that we have a trace // that we are uploading something let mut object_version = ObjectVersion { uuid: version_uuid, timestamp: version_timestamp, - state: ObjectVersionState::Uploading(headers.clone()), + state: ObjectVersionState::Uploading { + headers: headers.clone(), + multipart: false, + }, }; let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]); garage.object_table.insert(&object).await?; @@ -145,7 +146,14 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( // Write this entry now, even with empty block list, // to prevent block_ref entries from being deleted (they can be deleted // if the reference a version that isn't found in the version table) - let version = Version::new(version_uuid, bucket.id, key.into(), false); + let version = Version::new( + version_uuid, + VersionBacklink::Object { + bucket_id: bucket.id, + key: key.into(), + }, + false, + ); garage.version_table.insert(&version).await?; // Transfer data and verify checksum @@ -192,7 +200,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( /// Validate MD5 sum against content-md5 header /// and sha256sum against signed content-sha256 -fn ensure_checksum_matches( +pub(crate) fn ensure_checksum_matches( data_md5sum: &[u8], data_sha256sum: garage_util::data::FixedBytes32, content_md5: Option<&str>, @@ -218,7 +226,7 @@ fn ensure_checksum_matches( } /// Check that inserting this object with this size doesn't exceed bucket quotas -async fn check_quotas( +pub(crate) async fn check_quotas( garage: &Arc<Garage>, bucket: &Bucket, key: &str, @@ -275,7 +283,7 @@ async fn check_quotas( Ok(()) } -async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>( +pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>( garage: &Garage, version: &Version, part_number: u64, @@ -381,7 +389,7 @@ async fn put_block_meta( Ok(()) } -struct StreamChunker<S: Stream<Item = Result<Bytes, Error>>> { +pub(crate) struct StreamChunker<S: Stream<Item = Result<Bytes, Error>>> { stream: S, read_all: bool, block_size: usize, @@ -389,7 +397,7 @@ struct StreamChunker<S: Stream<Item = Result<Bytes, Error>>> { } impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> { - fn new(stream: S, block_size: usize) -> Self { + pub(crate) fn new(stream: S, block_size: usize) -> Self { Self { stream, read_all: false, @@ -398,7 +406,7 @@ impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> { } } - async fn next(&mut self) -> Result<Option<Bytes>, Error> { + pub(crate) async fn next(&mut self) -> Result<Option<Bytes>, Error> { while !self.read_all && self.buf.len() < self.block_size { if let Some(block) = self.stream.next().await { let bytes = block?; @@ -425,7 +433,14 @@ pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<Body> { .unwrap() } -struct InterruptedCleanup(Option<(Arc<Garage>, Uuid, String, Uuid, u64)>); +struct InterruptedCleanup(Option<InterruptedCleanupInner>); +struct InterruptedCleanupInner { + garage: Arc<Garage>, + bucket_id: Uuid, + key: String, + version_uuid: Uuid, + version_timestamp: u64, +} impl InterruptedCleanup { fn cancel(&mut self) { @@ -434,15 +449,15 @@ impl InterruptedCleanup { } impl Drop for InterruptedCleanup { fn drop(&mut self) { - if let Some((garage, bucket_id, key, version_uuid, version_ts)) = self.0.take() { + if let Some(info) = self.0.take() { tokio::spawn(async move { let object_version = ObjectVersion { - uuid: version_uuid, - timestamp: version_ts, + uuid: info.version_uuid, + timestamp: info.version_timestamp, state: ObjectVersionState::Aborted, }; - let object = Object::new(bucket_id, key, vec![object_version]); - if let Err(e) = garage.object_table.insert(&object).await { + let object = Object::new(info.bucket_id, info.key, vec![object_version]); + if let Err(e) = info.garage.object_table.insert(&object).await { warn!("Cannot cleanup after aborted PutObject: {}", e); } }); @@ -450,326 +465,9 @@ impl Drop for InterruptedCleanup { } } -// ---- - -pub async fn handle_create_multipart_upload( - garage: Arc<Garage>, - req: &Request<Body>, - bucket_name: &str, - bucket_id: Uuid, - key: &str, -) -> Result<Response<Body>, Error> { - let version_uuid = gen_uuid(); - let headers = get_headers(req.headers())?; - - // Create object in object table - let object_version = ObjectVersion { - uuid: version_uuid, - timestamp: now_msec(), - state: ObjectVersionState::Uploading(headers), - }; - let object = Object::new(bucket_id, key.to_string(), vec![object_version]); - garage.object_table.insert(&object).await?; - - // Insert empty version so that block_ref entries refer to something - // (they are inserted concurrently with blocks in the version table, so - // there is the possibility that they are inserted before the version table - // is created, in which case it is allowed to delete them, e.g. in repair_*) - let version = Version::new(version_uuid, bucket_id, key.into(), false); - garage.version_table.insert(&version).await?; - - // Send success response - let result = s3_xml::InitiateMultipartUploadResult { - xmlns: (), - bucket: s3_xml::Value(bucket_name.to_string()), - key: s3_xml::Value(key.to_string()), - upload_id: s3_xml::Value(hex::encode(version_uuid)), - }; - let xml = s3_xml::to_xml_with_header(&result)?; - - Ok(Response::new(Body::from(xml.into_bytes()))) -} - -pub async fn handle_put_part( - garage: Arc<Garage>, - req: Request<Body>, - bucket_id: Uuid, - key: &str, - part_number: u64, - upload_id: &str, - content_sha256: Option<Hash>, -) -> Result<Response<Body>, Error> { - let version_uuid = decode_upload_id(upload_id)?; - - let content_md5 = match req.headers().get("content-md5") { - Some(x) => Some(x.to_str()?.to_string()), - None => None, - }; - - // Read first chuck, and at the same time try to get object to see if it exists - let key = key.to_string(); - - let body = req.into_body().map_err(Error::from); - let mut chunker = StreamChunker::new(body, garage.config.block_size); - - let (object, version, first_block) = futures::try_join!( - garage - .object_table - .get(&bucket_id, &key) - .map_err(Error::from), - garage - .version_table - .get(&version_uuid, &EmptyKey) - .map_err(Error::from), - chunker.next(), - )?; - - // Check object is valid and multipart block can be accepted - let first_block = first_block.ok_or_bad_request("Empty body")?; - let object = object.ok_or_bad_request("Object not found")?; +// ============ helpers ============ - if !object - .versions() - .iter() - .any(|v| v.uuid == version_uuid && v.is_uploading()) - { - return Err(Error::NoSuchUpload); - } - - // Check part hasn't already been uploaded - if let Some(v) = version { - if v.has_part_number(part_number) { - return Err(Error::bad_request(format!( - "Part number {} has already been uploaded", - part_number - ))); - } - } - - // Copy block to store - let version = Version::new(version_uuid, bucket_id, key, false); - - let first_block_hash = async_blake2sum(first_block.clone()).await; - - let (_, data_md5sum, data_sha256sum) = read_and_put_blocks( - &garage, - &version, - part_number, - first_block, - first_block_hash, - &mut chunker, - ) - .await?; - - // Verify that checksums map - ensure_checksum_matches( - data_md5sum.as_slice(), - data_sha256sum, - content_md5.as_deref(), - content_sha256, - )?; - - // Store part etag in version - let data_md5sum_hex = hex::encode(data_md5sum); - let mut version = version; - version - .parts_etags - .put(part_number, data_md5sum_hex.clone()); - garage.version_table.insert(&version).await?; - - let response = Response::builder() - .header("ETag", format!("\"{}\"", data_md5sum_hex)) - .body(Body::empty()) - .unwrap(); - Ok(response) -} - -pub async fn handle_complete_multipart_upload( - garage: Arc<Garage>, - req: Request<Body>, - bucket_name: &str, - bucket: &Bucket, - key: &str, - upload_id: &str, - content_sha256: Option<Hash>, -) -> Result<Response<Body>, Error> { - let body = hyper::body::to_bytes(req.into_body()).await?; - - if let Some(content_sha256) = content_sha256 { - verify_signed_content(content_sha256, &body[..])?; - } - - let body_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?; - let body_list_of_parts = parse_complete_multipart_upload_body(&body_xml) - .ok_or_bad_request("Invalid CompleteMultipartUpload XML")?; - debug!( - "CompleteMultipartUpload list of parts: {:?}", - body_list_of_parts - ); - - let version_uuid = decode_upload_id(upload_id)?; - - // Get object and version - let key = key.to_string(); - let (object, version) = futures::try_join!( - garage.object_table.get(&bucket.id, &key), - garage.version_table.get(&version_uuid, &EmptyKey), - )?; - - let object = object.ok_or(Error::NoSuchKey)?; - let mut object_version = object - .versions() - .iter() - .find(|v| v.uuid == version_uuid && v.is_uploading()) - .cloned() - .ok_or(Error::NoSuchUpload)?; - - let version = version.ok_or(Error::NoSuchKey)?; - if version.blocks.is_empty() { - return Err(Error::bad_request("No data was uploaded")); - } - - let headers = match object_version.state { - ObjectVersionState::Uploading(headers) => headers, - _ => unreachable!(), - }; - - // Check that part numbers are an increasing sequence. - // (it doesn't need to start at 1 nor to be a continuous sequence, - // see discussion in #192) - if body_list_of_parts.is_empty() { - return Err(Error::EntityTooSmall); - } - if !body_list_of_parts - .iter() - .zip(body_list_of_parts.iter().skip(1)) - .all(|(p1, p2)| p1.part_number < p2.part_number) - { - return Err(Error::InvalidPartOrder); - } - - // Garage-specific restriction, see #204: part numbers must be - // consecutive starting at 1 - if body_list_of_parts[0].part_number != 1 - || !body_list_of_parts - .iter() - .zip(body_list_of_parts.iter().skip(1)) - .all(|(p1, p2)| p1.part_number + 1 == p2.part_number) - { - return Err(Error::NotImplemented("Garage does not support completing a Multipart upload with non-consecutive part numbers. This is a restriction of Garage's data model, which might be fixed in a future release. See issue #204 for more information on this topic.".into())); - } - - // Check that the list of parts they gave us corresponds to the parts we have here - debug!("Expected parts from request: {:?}", body_list_of_parts); - debug!("Parts stored in version: {:?}", version.parts_etags.items()); - let parts = version - .parts_etags - .items() - .iter() - .map(|pair| (&pair.0, &pair.1)); - let same_parts = body_list_of_parts - .iter() - .map(|x| (&x.part_number, &x.etag)) - .eq(parts); - if !same_parts { - return Err(Error::InvalidPart); - } - - // Check that all blocks belong to one of the parts - let block_parts = version - .blocks - .items() - .iter() - .map(|(bk, _)| bk.part_number) - .collect::<BTreeSet<_>>(); - let same_parts = body_list_of_parts - .iter() - .map(|x| x.part_number) - .eq(block_parts.into_iter()); - if !same_parts { - return Err(Error::bad_request( - "Part numbers in block list and part list do not match. This can happen if a part was partially uploaded. Please abort the multipart upload and try again." - )); - } - - // Calculate etag of final object - // To understand how etags are calculated, read more here: - // https://teppen.io/2018/06/23/aws_s3_etags/ - let num_parts = body_list_of_parts.len(); - let mut etag_md5_hasher = Md5::new(); - for (_, etag) in version.parts_etags.items().iter() { - etag_md5_hasher.update(etag.as_bytes()); - } - let etag = format!("{}-{}", hex::encode(etag_md5_hasher.finalize()), num_parts); - - // Calculate total size of final object - let total_size = version.blocks.items().iter().map(|x| x.1.size).sum(); - - if let Err(e) = check_quotas(&garage, bucket, &key, total_size).await { - object_version.state = ObjectVersionState::Aborted; - let final_object = Object::new(bucket.id, key.clone(), vec![object_version]); - garage.object_table.insert(&final_object).await?; - - return Err(e); - } - - // Write final object version - object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( - ObjectVersionMeta { - headers, - size: total_size, - etag: etag.clone(), - }, - version.blocks.items()[0].1.hash, - )); - - let final_object = Object::new(bucket.id, key.clone(), vec![object_version]); - garage.object_table.insert(&final_object).await?; - - // Send response saying ok we're done - let result = s3_xml::CompleteMultipartUploadResult { - xmlns: (), - location: None, - bucket: s3_xml::Value(bucket_name.to_string()), - key: s3_xml::Value(key), - etag: s3_xml::Value(format!("\"{}\"", etag)), - }; - let xml = s3_xml::to_xml_with_header(&result)?; - - Ok(Response::new(Body::from(xml.into_bytes()))) -} - -pub async fn handle_abort_multipart_upload( - garage: Arc<Garage>, - bucket_id: Uuid, - key: &str, - upload_id: &str, -) -> Result<Response<Body>, Error> { - let version_uuid = decode_upload_id(upload_id)?; - - let object = garage - .object_table - .get(&bucket_id, &key.to_string()) - .await?; - let object = object.ok_or(Error::NoSuchKey)?; - - let object_version = object - .versions() - .iter() - .find(|v| v.uuid == version_uuid && v.is_uploading()); - let mut object_version = match object_version { - None => return Err(Error::NoSuchUpload), - Some(x) => x.clone(), - }; - - object_version.state = ObjectVersionState::Aborted; - let final_object = Object::new(bucket_id, key.to_string(), vec![object_version]); - garage.object_table.insert(&final_object).await?; - - Ok(Response::new(Body::from(vec![]))) -} - -fn get_mime_type(headers: &HeaderMap<HeaderValue>) -> Result<String, Error> { +pub(crate) fn get_mime_type(headers: &HeaderMap<HeaderValue>) -> Result<String, Error> { Ok(headers .get(hyper::header::CONTENT_TYPE) .map(|x| x.to_str()) @@ -821,54 +519,3 @@ pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<ObjectVers other, }) } - -pub fn decode_upload_id(id: &str) -> Result<Uuid, Error> { - let id_bin = hex::decode(id).map_err(|_| Error::NoSuchUpload)?; - if id_bin.len() != 32 { - return Err(Error::NoSuchUpload); - } - let mut uuid = [0u8; 32]; - uuid.copy_from_slice(&id_bin[..]); - Ok(Uuid::from(uuid)) -} - -#[derive(Debug)] -struct CompleteMultipartUploadPart { - etag: String, - part_number: u64, -} - -fn parse_complete_multipart_upload_body( - xml: &roxmltree::Document, -) -> Option<Vec<CompleteMultipartUploadPart>> { - let mut parts = vec![]; - - let root = xml.root(); - let cmu = root.first_child()?; - if !cmu.has_tag_name("CompleteMultipartUpload") { - return None; - } - - for item in cmu.children() { - // Only parse <Part> nodes - if !item.is_element() { - continue; - } - - if item.has_tag_name("Part") { - let etag = item.children().find(|e| e.has_tag_name("ETag"))?.text()?; - let part_number = item - .children() - .find(|e| e.has_tag_name("PartNumber"))? - .text()?; - parts.push(CompleteMultipartUploadPart { - etag: etag.trim_matches('"').to_string(), - part_number: part_number.parse().ok()?, - }); - } else { - return None; - } - } - - Some(parts) -} diff --git a/src/garage/admin/block.rs b/src/garage/admin/block.rs index e9e3ff96..c4a45738 100644 --- a/src/garage/admin/block.rs +++ b/src/garage/admin/block.rs @@ -34,6 +34,7 @@ impl AdminRpcHandler { .get_range(&hash, None, None, 10000, Default::default()) .await?; let mut versions = vec![]; + let mut uploads = vec![]; for br in block_refs { if let Some(v) = self .garage @@ -41,6 +42,11 @@ impl AdminRpcHandler { .get(&br.version, &EmptyKey) .await? { + if let VersionBacklink::MultipartUpload { upload_id } = &v.backlink { + if let Some(u) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? { + uploads.push(u); + } + } versions.push(Ok(v)); } else { versions.push(Err(br.version)); @@ -50,6 +56,7 @@ impl AdminRpcHandler { hash, refcount, versions, + uploads, }) } @@ -93,6 +100,7 @@ impl AdminRpcHandler { } let mut obj_dels = 0; + let mut mpu_dels = 0; let mut ver_dels = 0; for hash in blocks { @@ -105,56 +113,80 @@ impl AdminRpcHandler { .await?; for br in block_refs { - let version = match self + if let Some(version) = self .garage .version_table .get(&br.version, &EmptyKey) .await? { - Some(v) => v, - None => continue, - }; + self.handle_block_purge_version_backlink( + &version, + &mut obj_dels, + &mut mpu_dels, + ) + .await?; - if let Some(object) = self - .garage - .object_table - .get(&version.bucket_id, &version.key) - .await? - { - let ov = object.versions().iter().rev().find(|v| v.is_complete()); - if let Some(ov) = ov { - if ov.uuid == br.version { - let del_uuid = gen_uuid(); - let deleted_object = Object::new( - version.bucket_id, - version.key.clone(), - vec![ObjectVersion { - uuid: del_uuid, - timestamp: ov.timestamp + 1, - state: ObjectVersionState::Complete( - ObjectVersionData::DeleteMarker, - ), - }], - ); - self.garage.object_table.insert(&deleted_object).await?; - obj_dels += 1; - } + if !version.deleted.get() { + let deleted_version = Version::new(version.uuid, version.backlink, true); + self.garage.version_table.insert(&deleted_version).await?; + ver_dels += 1; } } - - if !version.deleted.get() { - let deleted_version = - Version::new(version.uuid, version.bucket_id, version.key.clone(), true); - self.garage.version_table.insert(&deleted_version).await?; - ver_dels += 1; - } } } + Ok(AdminRpc::Ok(format!( - "{} blocks were purged: {} object deletion markers added, {} versions marked deleted", + "Purged {} blocks, {} versions, {} objects, {} multipart uploads", blocks.len(), + ver_dels, obj_dels, - ver_dels + mpu_dels, ))) } + + async fn handle_block_purge_version_backlink( + &self, + version: &Version, + obj_dels: &mut usize, + mpu_dels: &mut usize, + ) -> Result<(), Error> { + let (bucket_id, key, ov_id) = match &version.backlink { + VersionBacklink::Object { bucket_id, key } => (*bucket_id, key.clone(), version.uuid), + VersionBacklink::MultipartUpload { upload_id } => { + if let Some(mut mpu) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? { + if !mpu.deleted.get() { + mpu.parts.clear(); + mpu.deleted.set(); + self.garage.mpu_table.insert(&mpu).await?; + *mpu_dels += 1; + } + (mpu.bucket_id, mpu.key.clone(), *upload_id) + } else { + return Ok(()); + } + } + }; + + if let Some(object) = self.garage.object_table.get(&bucket_id, &key).await? { + let ov = object.versions().iter().rev().find(|v| v.is_complete()); + if let Some(ov) = ov { + if ov.uuid == ov_id { + let del_uuid = gen_uuid(); + let deleted_object = Object::new( + bucket_id, + key, + vec![ObjectVersion { + uuid: del_uuid, + timestamp: ov.timestamp + 1, + state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker), + }], + ); + self.garage.object_table.insert(&deleted_object).await?; + *obj_dels += 1; + } + } + } + + Ok(()) + } } diff --git a/src/garage/admin/bucket.rs b/src/garage/admin/bucket.rs index 11bb8730..0781cb8b 100644 --- a/src/garage/admin/bucket.rs +++ b/src/garage/admin/bucket.rs @@ -73,6 +73,15 @@ impl AdminRpcHandler { .map(|x| x.filtered_values(&self.garage.system.ring.borrow())) .unwrap_or_default(); + let mpu_counters = self + .garage + .mpu_counter_table + .table + .get(&bucket_id, &EmptyKey) + .await? + .map(|x| x.filtered_values(&self.garage.system.ring.borrow())) + .unwrap_or_default(); + let mut relevant_keys = HashMap::new(); for (k, _) in bucket .state @@ -112,6 +121,7 @@ impl AdminRpcHandler { bucket, relevant_keys, counters, + mpu_counters, }) } diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs index 93f6dd08..33c21eba 100644 --- a/src/garage/admin/mod.rs +++ b/src/garage/admin/mod.rs @@ -27,6 +27,7 @@ use garage_model::garage::Garage; use garage_model::helper::error::{Error, OkOrBadRequest}; use garage_model::key_table::*; use garage_model::migrate::Migrate; +use garage_model::s3::mpu_table::MultipartUpload; use garage_model::s3::version_table::Version; use crate::cli::*; @@ -52,6 +53,7 @@ pub enum AdminRpc { bucket: Bucket, relevant_keys: HashMap<String, Key>, counters: HashMap<String, i64>, + mpu_counters: HashMap<String, i64>, }, KeyList(Vec<(String, String)>), KeyInfo(Key, HashMap<Uuid, Bucket>), @@ -66,6 +68,7 @@ pub enum AdminRpc { hash: Hash, refcount: u64, versions: Vec<Result<Version, Uuid>>, + uploads: Vec<MultipartUpload>, }, } diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index 905b14d3..045f050c 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -190,8 +190,9 @@ pub async fn cmd_admin( bucket, relevant_keys, counters, + mpu_counters, } => { - print_bucket_info(&bucket, &relevant_keys, &counters); + print_bucket_info(&bucket, &relevant_keys, &counters, &mpu_counters); } AdminRpc::KeyList(kl) => { print_key_list(kl); @@ -215,8 +216,9 @@ pub async fn cmd_admin( hash, refcount, versions, + uploads, } => { - print_block_info(hash, refcount, versions); + print_block_info(hash, refcount, versions, uploads); } r => { error!("Unexpected response: {:?}", r); diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 986592ae..5dc99a0d 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -443,19 +443,22 @@ pub struct RepairOpt { #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] pub enum RepairWhat { - /// Only do a full sync of metadata tables + /// Do a full sync of metadata tables #[structopt(name = "tables", version = garage_version())] Tables, - /// Only repair (resync/rebalance) the set of stored blocks + /// Repair (resync/rebalance) the set of stored blocks #[structopt(name = "blocks", version = garage_version())] Blocks, - /// Only redo the propagation of object deletions to the version table (slow) + /// Repropagate object deletions to the version table #[structopt(name = "versions", version = garage_version())] Versions, - /// Only redo the propagation of version deletions to the block ref table (extremely slow) + /// Repropagate object deletions to the multipart upload table + #[structopt(name = "mpu", version = garage_version())] + MultipartUploads, + /// Repropagate version deletions to the block ref table #[structopt(name = "block_refs", version = garage_version())] BlockRefs, - /// Verify integrity of all blocks on disc (extremely slow, i/o intensive) + /// Verify integrity of all blocks on disc #[structopt(name = "scrub", version = garage_version())] Scrub { #[structopt(subcommand)] diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index 2c6be2f4..d87f9eab 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -12,8 +12,9 @@ use garage_block::manager::BlockResyncErrorInfo; use garage_model::bucket_table::*; use garage_model::key_table::*; -use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS}; -use garage_model::s3::version_table::Version; +use garage_model::s3::mpu_table::{self, MultipartUpload}; +use garage_model::s3::object_table; +use garage_model::s3::version_table::*; use crate::cli::structs::WorkerListOpt; @@ -135,6 +136,7 @@ pub fn print_bucket_info( bucket: &Bucket, relevant_keys: &HashMap<String, Key>, counters: &HashMap<String, i64>, + mpu_counters: &HashMap<String, i64>, ) { let key_name = |k| { relevant_keys @@ -148,7 +150,7 @@ pub fn print_bucket_info( Deletable::Deleted => println!("Bucket is deleted."), Deletable::Present(p) => { let size = - bytesize::ByteSize::b(counters.get(BYTES).cloned().unwrap_or_default() as u64); + bytesize::ByteSize::b(*counters.get(object_table::BYTES).unwrap_or(&0) as u64); println!( "\nSize: {} ({})", size.to_string_as(true), @@ -156,14 +158,22 @@ pub fn print_bucket_info( ); println!( "Objects: {}", - counters.get(OBJECTS).cloned().unwrap_or_default() + *counters.get(object_table::OBJECTS).unwrap_or(&0) + ); + println!( + "Unfinished uploads (multipart and non-multipart): {}", + *counters.get(object_table::UNFINISHED_UPLOADS).unwrap_or(&0) ); println!( "Unfinished multipart uploads: {}", - counters - .get(UNFINISHED_UPLOADS) - .cloned() - .unwrap_or_default() + *mpu_counters.get(mpu_table::UPLOADS).unwrap_or(&0) + ); + let mpu_size = + bytesize::ByteSize::b(*mpu_counters.get(mpu_table::BYTES).unwrap_or(&0) as u64); + println!( + "Size of unfinished multipart uploads: {} ({})", + mpu_size.to_string_as(true), + mpu_size.to_string_as(false), ); println!("\nWebsite access: {}", p.website_config.get().is_some()); @@ -385,29 +395,49 @@ pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) { format_table(table); } -pub fn print_block_info(hash: Hash, refcount: u64, versions: Vec<Result<Version, Uuid>>) { +pub fn print_block_info( + hash: Hash, + refcount: u64, + versions: Vec<Result<Version, Uuid>>, + uploads: Vec<MultipartUpload>, +) { println!("Block hash: {}", hex::encode(hash.as_slice())); println!("Refcount: {}", refcount); println!(); - let mut table = vec!["Version\tBucket\tKey\tDeleted".into()]; + let mut table = vec!["Version\tBucket\tKey\tMPU\tDeleted".into()]; let mut nondeleted_count = 0; for v in versions.iter() { match v { Ok(ver) => { - table.push(format!( - "{:?}\t{:?}\t{}\t{:?}", - ver.uuid, - ver.bucket_id, - ver.key, - ver.deleted.get() - )); + match &ver.backlink { + VersionBacklink::Object { bucket_id, key } => { + table.push(format!( + "{:?}\t{:?}\t{}\t\t{:?}", + ver.uuid, + bucket_id, + key, + ver.deleted.get() + )); + } + VersionBacklink::MultipartUpload { upload_id } => { + let upload = uploads.iter().find(|x| x.upload_id == *upload_id); + table.push(format!( + "{:?}\t{:?}\t{}\t{:?}\t{:?}", + ver.uuid, + upload.map(|u| u.bucket_id).unwrap_or_default(), + upload.map(|u| u.key.as_str()).unwrap_or_default(), + upload_id, + ver.deleted.get() + )); + } + } if !ver.deleted.get() { nondeleted_count += 1; } } Err(vh) => { - table.push(format!("{:?}\t\t\tyes", vh)); + table.push(format!("{:?}\t\t\t\tyes", vh)); } } } diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 0e14ed51..abfaf9f9 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -5,11 +5,16 @@ use async_trait::async_trait; use tokio::sync::watch; use garage_block::repair::ScrubWorkerCommand; + use garage_model::garage::Garage; use garage_model::s3::block_ref_table::*; +use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; + +use garage_table::replication::*; use garage_table::*; + use garage_util::background::*; use garage_util::error::Error; use garage_util::migrate::Migrate; @@ -32,11 +37,15 @@ pub async fn launch_online_repair( } RepairWhat::Versions => { info!("Repairing the versions table"); - bg.spawn_worker(RepairVersionsWorker::new(garage.clone())); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions)); + } + RepairWhat::MultipartUploads => { + info!("Repairing the multipart uploads table"); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairMpu)); } RepairWhat::BlockRefs => { info!("Repairing the block refs table"); - bg.spawn_worker(RepairBlockrefsWorker::new(garage.clone())); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs)); } RepairWhat::Blocks => { info!("Repairing the stored blocks"); @@ -67,70 +76,70 @@ pub async fn launch_online_repair( // ---- -struct RepairVersionsWorker { +#[async_trait] +trait TableRepair: Send + Sync + 'static { + type T: TableSchema; + + fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication>; + + async fn process( + &mut self, + garage: &Garage, + entry: <<Self as TableRepair>::T as TableSchema>::E, + ) -> Result<bool, Error>; +} + +struct TableRepairWorker<T: TableRepair> { garage: Arc<Garage>, pos: Vec<u8>, counter: usize, + repairs: usize, + inner: T, } -impl RepairVersionsWorker { - fn new(garage: Arc<Garage>) -> Self { +impl<R: TableRepair> TableRepairWorker<R> { + fn new(garage: Arc<Garage>, inner: R) -> Self { Self { garage, + inner, pos: vec![], counter: 0, + repairs: 0, } } } #[async_trait] -impl Worker for RepairVersionsWorker { +impl<R: TableRepair> Worker for TableRepairWorker<R> { fn name(&self) -> String { - "Version repair worker".into() + format!("{} repair worker", R::T::TABLE_NAME) } fn status(&self) -> WorkerStatus { WorkerStatus { - progress: Some(self.counter.to_string()), + progress: Some(format!("{} ({})", self.counter, self.repairs)), ..Default::default() } } async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { - let (item_bytes, next_pos) = match self.garage.version_table.data.store.get_gt(&self.pos)? { + let (item_bytes, next_pos) = match R::table(&self.garage).data.store.get_gt(&self.pos)? { Some((k, v)) => (v, k), None => { - info!("repair_versions: finished, done {}", self.counter); + info!( + "{}: finished, done {}, fixed {}", + self.name(), + self.counter, + self.repairs + ); return Ok(WorkerState::Done); } }; - let version = Version::decode(&item_bytes).ok_or_message("Cannot decode Version")?; - if !version.deleted.get() { - let object = self - .garage - .object_table - .get(&version.bucket_id, &version.key) - .await?; - let version_exists = match object { - Some(o) => o - .versions() - .iter() - .any(|x| x.uuid == version.uuid && x.state != ObjectVersionState::Aborted), - None => false, - }; - if !version_exists { - info!("Repair versions: marking version as deleted: {:?}", version); - self.garage - .version_table - .insert(&Version::new( - version.uuid, - version.bucket_id, - version.key, - true, - )) - .await?; - } + let entry = <R::T as TableSchema>::E::decode(&item_bytes) + .ok_or_message("Cannot decode table entry")?; + if self.inner.process(&self.garage, entry).await? { + self.repairs += 1; } self.counter += 1; @@ -146,77 +155,124 @@ impl Worker for RepairVersionsWorker { // ---- -struct RepairBlockrefsWorker { - garage: Arc<Garage>, - pos: Vec<u8>, - counter: usize, -} +struct RepairVersions; -impl RepairBlockrefsWorker { - fn new(garage: Arc<Garage>) -> Self { - Self { - garage, - pos: vec![], - counter: 0, +#[async_trait] +impl TableRepair for RepairVersions { + type T = VersionTable; + + fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> { + &garage.version_table + } + + async fn process(&mut self, garage: &Garage, version: Version) -> Result<bool, Error> { + if !version.deleted.get() { + let ref_exists = match &version.backlink { + VersionBacklink::Object { bucket_id, key } => garage + .object_table + .get(bucket_id, key) + .await? + .map(|o| { + o.versions().iter().any(|x| { + x.uuid == version.uuid && x.state != ObjectVersionState::Aborted + }) + }) + .unwrap_or(false), + VersionBacklink::MultipartUpload { upload_id } => garage + .mpu_table + .get(upload_id, &EmptyKey) + .await? + .map(|u| !u.deleted.get()) + .unwrap_or(false), + }; + + if !ref_exists { + info!("Repair versions: marking version as deleted: {:?}", version); + garage + .version_table + .insert(&Version::new(version.uuid, version.backlink, true)) + .await?; + return Ok(true); + } } + + Ok(false) } } +// ---- + +struct RepairBlockRefs; + #[async_trait] -impl Worker for RepairBlockrefsWorker { - fn name(&self) -> String { - "Block refs repair worker".into() - } +impl TableRepair for RepairBlockRefs { + type T = BlockRefTable; - fn status(&self) -> WorkerStatus { - WorkerStatus { - progress: Some(self.counter.to_string()), - ..Default::default() - } + fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> { + &garage.block_ref_table } - async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { - let (item_bytes, next_pos) = - match self.garage.block_ref_table.data.store.get_gt(&self.pos)? { - Some((k, v)) => (v, k), - None => { - info!("repair_block_ref: finished, done {}", self.counter); - return Ok(WorkerState::Done); - } - }; - - let block_ref = BlockRef::decode(&item_bytes).ok_or_message("Cannot decode BlockRef")?; + async fn process(&mut self, garage: &Garage, mut block_ref: BlockRef) -> Result<bool, Error> { if !block_ref.deleted.get() { - let version = self - .garage + let ref_exists = garage .version_table .get(&block_ref.version, &EmptyKey) - .await?; - // The version might not exist if it has been GC'ed - let ref_exists = version.map(|v| !v.deleted.get()).unwrap_or(false); + .await? + .map(|v| !v.deleted.get()) + .unwrap_or(false); + if !ref_exists { info!( "Repair block ref: marking block_ref as deleted: {:?}", block_ref ); - self.garage - .block_ref_table - .insert(&BlockRef { - block: block_ref.block, - version: block_ref.version, - deleted: true.into(), - }) - .await?; + block_ref.deleted.set(); + garage.block_ref_table.insert(&block_ref).await?; + return Ok(true); } } - self.counter += 1; - self.pos = next_pos; + Ok(false) + } +} - Ok(WorkerState::Busy) +// ---- + +struct RepairMpu; + +#[async_trait] +impl TableRepair for RepairMpu { + type T = MultipartUploadTable; + + fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication> { + &garage.mpu_table } - async fn wait_for_work(&mut self) -> WorkerState { - unreachable!() + async fn process(&mut self, garage: &Garage, mut mpu: MultipartUpload) -> Result<bool, Error> { + if !mpu.deleted.get() { + let ref_exists = garage + .object_table + .get(&mpu.bucket_id, &mpu.key) + .await? + .map(|o| { + o.versions() + .iter() + .any(|x| x.uuid == mpu.upload_id && x.is_uploading(Some(true))) + }) + .unwrap_or(false); + + if !ref_exists { + info!( + "Repair multipart uploads: marking mpu as deleted: {:?}", + mpu + ); + mpu.parts.clear(); + mpu.deleted.set(); + garage.mpu_table.insert(&mpu).await?; + return Ok(true); + } + } + + Ok(false) } } diff --git a/src/garage/tests/s3/multipart.rs b/src/garage/tests/s3/multipart.rs index 895a2993..8ae6b66e 100644 --- a/src/garage/tests/s3/multipart.rs +++ b/src/garage/tests/s3/multipart.rs @@ -6,6 +6,190 @@ const SZ_5MB: usize = 5 * 1024 * 1024; const SZ_10MB: usize = 10 * 1024 * 1024; #[tokio::test] +async fn test_multipart_upload() { + let ctx = common::context(); + let bucket = ctx.create_bucket("testmpu"); + + let u1 = vec![0x11; SZ_5MB]; + let u2 = vec![0x22; SZ_5MB]; + let u3 = vec![0x33; SZ_5MB]; + let u4 = vec![0x44; SZ_5MB]; + let u5 = vec![0x55; SZ_5MB]; + + let up = ctx + .client + .create_multipart_upload() + .bucket(&bucket) + .key("a") + .send() + .await + .unwrap(); + assert!(up.upload_id.is_some()); + + let uid = up.upload_id.as_ref().unwrap(); + + let p3 = ctx + .client + .upload_part() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .part_number(3) + .body(ByteStream::from(u3.clone())) + .send() + .await + .unwrap(); + + let _p1 = ctx + .client + .upload_part() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .part_number(1) + .body(ByteStream::from(u1)) + .send() + .await + .unwrap(); + + let _p4 = ctx + .client + .upload_part() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .part_number(4) + .body(ByteStream::from(u4)) + .send() + .await + .unwrap(); + + let p1bis = ctx + .client + .upload_part() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .part_number(1) + .body(ByteStream::from(u2.clone())) + .send() + .await + .unwrap(); + + let p6 = ctx + .client + .upload_part() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .part_number(6) + .body(ByteStream::from(u5.clone())) + .send() + .await + .unwrap(); + + { + let r = ctx + .client + .list_parts() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .send() + .await + .unwrap(); + assert_eq!(r.parts.unwrap().len(), 4); + } + + let cmp = CompletedMultipartUpload::builder() + .parts( + CompletedPart::builder() + .part_number(1) + .e_tag(p1bis.e_tag.unwrap()) + .build(), + ) + .parts( + CompletedPart::builder() + .part_number(3) + .e_tag(p3.e_tag.unwrap()) + .build(), + ) + .parts( + CompletedPart::builder() + .part_number(6) + .e_tag(p6.e_tag.unwrap()) + .build(), + ) + .build(); + + ctx.client + .complete_multipart_upload() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .multipart_upload(cmp) + .send() + .await + .unwrap(); + + // The multipart upload must not appear anymore + assert!(ctx + .client + .list_parts() + .bucket(&bucket) + .key("a") + .upload_id(uid) + .send() + .await + .is_err()); + + { + // The object must appear as a regular object + let r = ctx + .client + .head_object() + .bucket(&bucket) + .key("a") + .send() + .await + .unwrap(); + + assert_eq!(r.content_length, (SZ_5MB * 3) as i64); + } + + { + let o = ctx + .client + .get_object() + .bucket(&bucket) + .key("a") + .send() + .await + .unwrap(); + + assert_bytes_eq!(o.body, &[&u2[..], &u3[..], &u5[..]].concat()); + } + + { + for (part_number, data) in [(1, &u2), (2, &u3), (3, &u5)] { + let o = ctx + .client + .get_object() + .bucket(&bucket) + .key("a") + .part_number(part_number) + .send() + .await + .unwrap(); + + eprintln!("get_object with part_number = {}", part_number); + assert_eq!(o.content_length, SZ_5MB as i64); + assert_bytes_eq!(o.body, data); + } + } +} + +#[tokio::test] async fn test_uploadlistpart() { let ctx = common::context(); let bucket = ctx.create_bucket("uploadpart"); @@ -65,7 +249,8 @@ async fn test_uploadlistpart() { let ps = r.parts.unwrap(); assert_eq!(ps.len(), 1); - let fp = ps.iter().find(|x| x.part_number == 2).unwrap(); + assert_eq!(ps[0].part_number, 2); + let fp = &ps[0]; assert!(fp.last_modified.is_some()); assert_eq!( fp.e_tag.as_ref().unwrap(), @@ -100,13 +285,24 @@ async fn test_uploadlistpart() { let ps = r.parts.unwrap(); assert_eq!(ps.len(), 2); - let fp = ps.iter().find(|x| x.part_number == 1).unwrap(); + + assert_eq!(ps[0].part_number, 1); + let fp = &ps[0]; assert!(fp.last_modified.is_some()); assert_eq!( fp.e_tag.as_ref().unwrap(), "\"3c484266f9315485694556e6c693bfa2\"" ); assert_eq!(fp.size, SZ_5MB as i64); + + assert_eq!(ps[1].part_number, 2); + let sp = &ps[1]; + assert!(sp.last_modified.is_some()); + assert_eq!( + sp.e_tag.as_ref().unwrap(), + "\"3366bb9dcf710d6801b5926467d02e19\"" + ); + assert_eq!(sp.size, SZ_5MB as i64); } { @@ -123,12 +319,19 @@ async fn test_uploadlistpart() { .unwrap(); assert!(r.part_number_marker.is_none()); - assert!(r.next_part_number_marker.is_some()); + assert_eq!(r.next_part_number_marker.as_deref(), Some("1")); assert_eq!(r.max_parts, 1_i32); assert!(r.is_truncated); assert_eq!(r.key.unwrap(), "a"); assert_eq!(r.upload_id.unwrap().as_str(), uid.as_str()); - assert_eq!(r.parts.unwrap().len(), 1); + let parts = r.parts.unwrap(); + assert_eq!(parts.len(), 1); + let fp = &parts[0]; + assert_eq!(fp.part_number, 1); + assert_eq!( + fp.e_tag.as_ref().unwrap(), + "\"3c484266f9315485694556e6c693bfa2\"" + ); let r2 = ctx .client @@ -147,10 +350,18 @@ async fn test_uploadlistpart() { r.next_part_number_marker.as_ref().unwrap() ); assert_eq!(r2.max_parts, 1_i32); - assert!(r2.is_truncated); assert_eq!(r2.key.unwrap(), "a"); assert_eq!(r2.upload_id.unwrap().as_str(), uid.as_str()); - assert_eq!(r2.parts.unwrap().len(), 1); + let parts = r2.parts.unwrap(); + assert_eq!(parts.len(), 1); + let fp = &parts[0]; + assert_eq!(fp.part_number, 2); + assert_eq!( + fp.e_tag.as_ref().unwrap(), + "\"3366bb9dcf710d6801b5926467d02e19\"" + ); + //assert!(r2.is_truncated); // WHY? (this was the test before) + assert!(!r2.is_truncated); } let cmp = CompletedMultipartUpload::builder() diff --git a/src/model/garage.rs b/src/model/garage.rs index 9b7121db..db2475ed 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -17,6 +17,7 @@ use garage_table::replication::TableShardedReplication; use garage_table::*; use crate::s3::block_ref_table::*; +use crate::s3::mpu_table::*; use crate::s3::object_table::*; use crate::s3::version_table::*; @@ -57,6 +58,10 @@ pub struct Garage { pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>, /// Counting table containing object counters pub object_counter_table: Arc<IndexCounter<Object>>, + /// Table containing S3 multipart uploads + pub mpu_table: Arc<Table<MultipartUploadTable, TableShardedReplication>>, + /// Counting table containing multipart object counters + pub mpu_counter_table: Arc<IndexCounter<MultipartUpload>>, /// Table containing S3 object versions pub version_table: Arc<Table<VersionTable, TableShardedReplication>>, /// Table containing S3 block references (not blocks themselves) @@ -261,6 +266,20 @@ impl Garage { &db, ); + info!("Initialize multipart upload counter table..."); + let mpu_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db); + + info!("Initialize multipart upload table..."); + let mpu_table = Table::new( + MultipartUploadTable { + version_table: version_table.clone(), + mpu_counter_table: mpu_counter_table.clone(), + }, + meta_rep_param.clone(), + system.clone(), + &db, + ); + info!("Initialize object counter table..."); let object_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db); @@ -269,6 +288,7 @@ impl Garage { let object_table = Table::new( ObjectTable { version_table: version_table.clone(), + mpu_table: mpu_table.clone(), object_counter_table: object_counter_table.clone(), }, meta_rep_param.clone(), @@ -297,6 +317,8 @@ impl Garage { key_table, object_table, object_counter_table, + mpu_table, + mpu_counter_table, version_table, block_ref_table, #[cfg(feature = "k2v")] @@ -313,6 +335,8 @@ impl Garage { self.object_table.spawn_workers(bg); self.object_counter_table.spawn_workers(bg); + self.mpu_table.spawn_workers(bg); + self.mpu_counter_table.spawn_workers(bg); self.version_table.spawn_workers(bg); self.block_ref_table.spawn_workers(bg); diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs index 4a488d7f..576d03f3 100644 --- a/src/model/helper/bucket.rs +++ b/src/model/helper/bucket.rs @@ -478,7 +478,9 @@ impl<'a> BucketHelper<'a> { // ---- /// Deletes all incomplete multipart uploads that are older than a certain time. - /// Returns the number of uploads aborted + /// Returns the number of uploads aborted. + /// This will also include non-multipart uploads, which may be lingering + /// after a node crash pub async fn cleanup_incomplete_uploads( &self, bucket_id: &Uuid, @@ -496,7 +498,9 @@ impl<'a> BucketHelper<'a> { .get_range( bucket_id, start, - Some(ObjectFilter::IsUploading), + Some(ObjectFilter::IsUploading { + check_multipart: None, + }), 1000, EnumerationOrder::Forward, ) @@ -508,7 +512,7 @@ impl<'a> BucketHelper<'a> { let aborted_versions = object .versions() .iter() - .filter(|v| v.is_uploading() && v.timestamp < older_than) + .filter(|v| v.is_uploading(None) && v.timestamp < older_than) .map(|v| ObjectVersion { state: ObjectVersionState::Aborted, uuid: v.uuid, diff --git a/src/model/s3/mod.rs b/src/model/s3/mod.rs index 4e94337d..36d67093 100644 --- a/src/model/s3/mod.rs +++ b/src/model/s3/mod.rs @@ -1,3 +1,4 @@ pub mod block_ref_table; +pub mod mpu_table; pub mod object_table; pub mod version_table; diff --git a/src/model/s3/mpu_table.rs b/src/model/s3/mpu_table.rs new file mode 100644 index 00000000..63a4f1af --- /dev/null +++ b/src/model/s3/mpu_table.rs @@ -0,0 +1,245 @@ +use std::sync::Arc; + +use garage_db as db; + +use garage_util::crdt::Crdt; +use garage_util::data::*; +use garage_util::time::*; + +use garage_table::replication::TableShardedReplication; +use garage_table::*; + +use crate::index_counter::*; +use crate::s3::version_table::*; + +pub const UPLOADS: &str = "uploads"; +pub const PARTS: &str = "parts"; +pub const BYTES: &str = "bytes"; + +mod v09 { + use garage_util::crdt; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + + /// A part of a multipart upload + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct MultipartUpload { + /// Partition key = Upload id = UUID of the object version + pub upload_id: Uuid, + + /// Is this multipart upload deleted + /// The MultipartUpload is marked as deleted as soon as the + /// multipart upload is either completed or aborted + pub deleted: crdt::Bool, + /// List of uploaded parts, key = (part number, timestamp) + /// In case of retries, all versions for each part are kept + /// Everything is cleaned up only once the MultipartUpload is marked deleted + pub parts: crdt::Map<MpuPartKey, MpuPart>, + + // Back link to bucket+key so that we can find the object this mpu + // belongs to and check whether it is still valid + /// Bucket in which the related object is stored + pub bucket_id: Uuid, + /// Key in which the related object is stored + pub key: String, + } + + #[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] + pub struct MpuPartKey { + /// Number of the part + pub part_number: u64, + /// Timestamp of part upload + pub timestamp: u64, + } + + /// The version of an uploaded part + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct MpuPart { + /// Links to a Version in VersionTable + pub version: Uuid, + /// ETag of the content of this part (known only once done uploading) + pub etag: Option<String>, + /// Size of this part (known only once done uploading) + pub size: Option<u64>, + } + + impl garage_util::migrate::InitialFormat for MultipartUpload { + const VERSION_MARKER: &'static [u8] = b"G09s3mpu"; + } +} + +pub use v09::*; + +impl Ord for MpuPartKey { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.part_number + .cmp(&other.part_number) + .then(self.timestamp.cmp(&other.timestamp)) + } +} + +impl PartialOrd for MpuPartKey { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) + } +} + +impl MultipartUpload { + pub fn new(upload_id: Uuid, bucket_id: Uuid, key: String, deleted: bool) -> Self { + Self { + upload_id, + deleted: crdt::Bool::new(deleted), + parts: crdt::Map::new(), + bucket_id, + key, + } + } + + pub fn next_timestamp(&self, part_number: u64) -> u64 { + std::cmp::max( + now_msec(), + 1 + self + .parts + .items() + .iter() + .filter(|(x, _)| x.part_number == part_number) + .map(|(x, _)| x.timestamp) + .max() + .unwrap_or(0), + ) + } +} + +impl Entry<Uuid, EmptyKey> for MultipartUpload { + fn partition_key(&self) -> &Uuid { + &self.upload_id + } + fn sort_key(&self) -> &EmptyKey { + &EmptyKey + } + fn is_tombstone(&self) -> bool { + self.deleted.get() + } +} + +impl Crdt for MultipartUpload { + fn merge(&mut self, other: &Self) { + self.deleted.merge(&other.deleted); + + if self.deleted.get() { + self.parts.clear(); + } else { + self.parts.merge(&other.parts); + } + } +} + +impl Crdt for MpuPart { + fn merge(&mut self, other: &Self) { + self.etag = match (self.etag.take(), &other.etag) { + (None, Some(_)) => other.etag.clone(), + (Some(x), Some(y)) if x < *y => other.etag.clone(), + (x, _) => x, + }; + self.size = match (self.size, other.size) { + (None, Some(_)) => other.size, + (Some(x), Some(y)) if x < y => other.size, + (x, _) => x, + }; + } +} + +pub struct MultipartUploadTable { + pub version_table: Arc<Table<VersionTable, TableShardedReplication>>, + pub mpu_counter_table: Arc<IndexCounter<MultipartUpload>>, +} + +impl TableSchema for MultipartUploadTable { + const TABLE_NAME: &'static str = "multipart_upload"; + + type P = Uuid; + type S = EmptyKey; + type E = MultipartUpload; + type Filter = DeletedFilter; + + fn updated( + &self, + tx: &mut db::Transaction, + old: Option<&Self::E>, + new: Option<&Self::E>, + ) -> db::TxOpResult<()> { + // 1. Count + let counter_res = self.mpu_counter_table.count(tx, old, new); + if let Err(e) = db::unabort(counter_res)? { + error!( + "Unable to update multipart object part counter: {}. Index values will be wrong!", + e + ); + } + + // 2. Propagate deletions to version table + if let (Some(old_mpu), Some(new_mpu)) = (old, new) { + if new_mpu.deleted.get() && !old_mpu.deleted.get() { + let deleted_versions = old_mpu.parts.items().iter().map(|(_k, p)| { + Version::new( + p.version, + VersionBacklink::MultipartUpload { + upload_id: old_mpu.upload_id, + }, + true, + ) + }); + for version in deleted_versions { + let res = self.version_table.queue_insert(tx, &version); + if let Err(e) = db::unabort(res)? { + error!("Unable to enqueue version deletion propagation: {}. A repair will be needed.", e); + } + } + } + } + + Ok(()) + } + + fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { + filter.apply(entry.is_tombstone()) + } +} + +impl CountedItem for MultipartUpload { + const COUNTER_TABLE_NAME: &'static str = "bucket_mpu_counter"; + + // Partition key = bucket id + type CP = Uuid; + // Sort key = nothing + type CS = EmptyKey; + + fn counter_partition_key(&self) -> &Uuid { + &self.bucket_id + } + fn counter_sort_key(&self) -> &EmptyKey { + &EmptyKey + } + + fn counts(&self) -> Vec<(&'static str, i64)> { + let uploads = if self.deleted.get() { 0 } else { 1 }; + let mut parts = self + .parts + .items() + .iter() + .map(|(k, _)| k.part_number) + .collect::<Vec<_>>(); + parts.dedup(); + let bytes = self + .parts + .items() + .iter() + .map(|(_, p)| p.size.unwrap_or(0)) + .sum::<u64>(); + vec![ + (UPLOADS, uploads), + (PARTS, parts.len() as i64), + (BYTES, bytes as i64), + ] + } +} diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index 518acc95..db5ccf96 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -10,6 +10,7 @@ use garage_table::replication::TableShardedReplication; use garage_table::*; use crate::index_counter::*; +use crate::s3::mpu_table::*; use crate::s3::version_table::*; pub const OBJECTS: &str = "objects"; @@ -130,7 +131,86 @@ mod v08 { } } -pub use v08::*; +mod v09 { + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + + use super::v08; + + pub use v08::{ObjectVersionData, ObjectVersionHeaders, ObjectVersionMeta}; + + /// An object + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct Object { + /// The bucket in which the object is stored, used as partition key + pub bucket_id: Uuid, + + /// The key at which the object is stored in its bucket, used as sorting key + pub key: String, + + /// The list of currenty stored versions of the object + pub(super) versions: Vec<ObjectVersion>, + } + + /// Informations about a version of an object + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct ObjectVersion { + /// Id of the version + pub uuid: Uuid, + /// Timestamp of when the object was created + pub timestamp: u64, + /// State of the version + pub state: ObjectVersionState, + } + + /// State of an object version + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub enum ObjectVersionState { + /// The version is being received + Uploading { + /// Indicates whether this is a multipart upload + multipart: bool, + /// Headers to be included in the final object + headers: ObjectVersionHeaders, + }, + /// The version is fully received + Complete(ObjectVersionData), + /// The version uploaded containded errors or the upload was explicitly aborted + Aborted, + } + + impl garage_util::migrate::Migrate for Object { + const VERSION_MARKER: &'static [u8] = b"G09s3o"; + + type Previous = v08::Object; + + fn migrate(old: v08::Object) -> Object { + let versions = old + .versions + .into_iter() + .map(|x| ObjectVersion { + uuid: x.uuid, + timestamp: x.timestamp, + state: match x.state { + v08::ObjectVersionState::Uploading(h) => ObjectVersionState::Uploading { + multipart: false, + headers: h, + }, + v08::ObjectVersionState::Complete(d) => ObjectVersionState::Complete(d), + v08::ObjectVersionState::Aborted => ObjectVersionState::Aborted, + }, + }) + .collect(); + Object { + bucket_id: old.bucket_id, + key: old.key, + versions, + } + } + } +} + +pub use v09::*; impl Object { /// Initialize an Object struct from parts @@ -180,11 +260,11 @@ impl Crdt for ObjectVersionState { Complete(a) => { a.merge(b); } - Uploading(_) => { + Uploading { .. } => { *self = Complete(b.clone()); } }, - Uploading(_) => {} + Uploading { .. } => {} } } } @@ -199,8 +279,17 @@ impl ObjectVersion { } /// Is the object version currently being uploaded - pub fn is_uploading(&self) -> bool { - matches!(self.state, ObjectVersionState::Uploading(_)) + /// + /// matches only multipart uploads if check_multipart is Some(true) + /// matches only non-multipart uploads if check_multipart is Some(false) + /// matches both if check_multipart is None + pub fn is_uploading(&self, check_multipart: Option<bool>) -> bool { + match &self.state { + ObjectVersionState::Uploading { multipart, .. } => { + check_multipart.map(|x| x == *multipart).unwrap_or(true) + } + _ => false, + } } /// Is the object version completely received @@ -267,13 +356,20 @@ impl Crdt for Object { pub struct ObjectTable { pub version_table: Arc<Table<VersionTable, TableShardedReplication>>, + pub mpu_table: Arc<Table<MultipartUploadTable, TableShardedReplication>>, pub object_counter_table: Arc<IndexCounter<Object>>, } #[derive(Clone, Copy, Debug, Serialize, Deserialize)] pub enum ObjectFilter { + /// Is the object version available (received and not a tombstone) IsData, - IsUploading, + /// Is the object version currently being uploaded + /// + /// matches only multipart uploads if check_multipart is Some(true) + /// matches only non-multipart uploads if check_multipart is Some(false) + /// matches both if check_multipart is None + IsUploading { check_multipart: Option<bool> }, } impl TableSchema for ObjectTable { @@ -301,21 +397,28 @@ impl TableSchema for ObjectTable { // 2. Enqueue propagation deletions to version table if let (Some(old_v), Some(new_v)) = (old, new) { - // Propagate deletion of old versions for v in old_v.versions.iter() { - let newly_deleted = match new_v + let new_v_id = new_v .versions - .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())) - { + .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())); + + // Propagate deletion of old versions to the Version table + let delete_version = match new_v_id { Err(_) => true, Ok(i) => { new_v.versions[i].state == ObjectVersionState::Aborted && v.state != ObjectVersionState::Aborted } }; - if newly_deleted { - let deleted_version = - Version::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true); + if delete_version { + let deleted_version = Version::new( + v.uuid, + VersionBacklink::Object { + bucket_id: old_v.bucket_id, + key: old_v.key.clone(), + }, + true, + ); let res = self.version_table.queue_insert(tx, &deleted_version); if let Err(e) = db::unabort(res)? { error!( @@ -324,6 +427,34 @@ impl TableSchema for ObjectTable { ); } } + + // After abortion or completion of multipart uploads, delete MPU table entry + if matches!( + v.state, + ObjectVersionState::Uploading { + multipart: true, + .. + } + ) { + let delete_mpu = match new_v_id { + Err(_) => true, + Ok(i) => !matches!( + new_v.versions[i].state, + ObjectVersionState::Uploading { .. } + ), + }; + if delete_mpu { + let deleted_mpu = + MultipartUpload::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true); + let res = self.mpu_table.queue_insert(tx, &deleted_mpu); + if let Err(e) = db::unabort(res)? { + error!( + "Unable to enqueue multipart upload deletion propagation: {}. A repair will be needed.", + e + ); + } + } + } } } @@ -333,7 +464,10 @@ impl TableSchema for ObjectTable { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { match filter { ObjectFilter::IsData => entry.versions.iter().any(|v| v.is_data()), - ObjectFilter::IsUploading => entry.versions.iter().any(|v| v.is_uploading()), + ObjectFilter::IsUploading { check_multipart } => entry + .versions + .iter() + .any(|v| v.is_uploading(*check_multipart)), } } } @@ -360,10 +494,7 @@ impl CountedItem for Object { } else { 0 }; - let n_unfinished_uploads = versions - .iter() - .filter(|v| matches!(v.state, ObjectVersionState::Uploading(_))) - .count(); + let n_unfinished_uploads = versions.iter().filter(|v| v.is_uploading(None)).count(); let n_bytes = versions .iter() .map(|v| match &v.state { diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs index 6edc83f4..5c032f9f 100644 --- a/src/model/s3/version_table.rs +++ b/src/model/s3/version_table.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use garage_db as db; use garage_util::data::*; +use garage_util::error::*; use garage_table::crdt::*; use garage_table::replication::TableShardedReplication; @@ -66,6 +67,8 @@ mod v08 { use super::v05; + pub use v05::{VersionBlock, VersionBlockKey}; + /// A version of an object #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct Version { @@ -90,8 +93,6 @@ mod v08 { pub key: String, } - pub use v05::{VersionBlock, VersionBlockKey}; - impl garage_util::migrate::Migrate for Version { type Previous = v05::Version; @@ -110,32 +111,94 @@ mod v08 { } } -pub use v08::*; +pub(crate) mod v09 { + use garage_util::crdt; + use garage_util::data::Uuid; + use serde::{Deserialize, Serialize}; + + use super::v08; + + pub use v08::{VersionBlock, VersionBlockKey}; + + /// A version of an object + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub struct Version { + /// UUID of the version, used as partition key + pub uuid: Uuid, + + // Actual data: the blocks for this version + // In the case of a multipart upload, also store the etags + // of individual parts and check them when doing CompleteMultipartUpload + /// Is this version deleted + pub deleted: crdt::Bool, + /// list of blocks of data composing the version + pub blocks: crdt::Map<VersionBlockKey, VersionBlock>, + + // Back link to owner of this version (either an object or a multipart + // upload), used to find whether it has been deleted and this version + // should in turn be deleted (see versions repair procedure) + pub backlink: VersionBacklink, + } + + #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] + pub enum VersionBacklink { + Object { + /// Bucket in which the related object is stored + bucket_id: Uuid, + /// Key in which the related object is stored + key: String, + }, + MultipartUpload { + upload_id: Uuid, + }, + } + + impl garage_util::migrate::Migrate for Version { + const VERSION_MARKER: &'static [u8] = b"G09s3v"; + + type Previous = v08::Version; + + fn migrate(old: v08::Version) -> Version { + Version { + uuid: old.uuid, + deleted: old.deleted, + blocks: old.blocks, + backlink: VersionBacklink::Object { + bucket_id: old.bucket_id, + key: old.key, + }, + } + } + } +} + +pub use v09::*; impl Version { - pub fn new(uuid: Uuid, bucket_id: Uuid, key: String, deleted: bool) -> Self { + pub fn new(uuid: Uuid, backlink: VersionBacklink, deleted: bool) -> Self { Self { uuid, deleted: deleted.into(), blocks: crdt::Map::new(), - parts_etags: crdt::Map::new(), - bucket_id, - key, + backlink, } } pub fn has_part_number(&self, part_number: u64) -> bool { - let case1 = self - .parts_etags + self.blocks .items() - .binary_search_by(|(k, _)| k.cmp(&part_number)) - .is_ok(); - let case2 = self + .binary_search_by(|(k, _)| k.part_number.cmp(&part_number)) + .is_ok() + } + + pub fn n_parts(&self) -> Result<u64, Error> { + Ok(self .blocks .items() - .binary_search_by(|(k, _)| k.part_number.cmp(&part_number)) - .is_ok(); - case1 || case2 + .last() + .ok_or_message("version has no parts")? + .0 + .part_number) } } @@ -175,10 +238,8 @@ impl Crdt for Version { if self.deleted.get() { self.blocks.clear(); - self.parts_etags.clear(); } else { self.blocks.merge(&other.blocks); - self.parts_etags.merge(&other.parts_etags); } } } diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs index b6c2fd27..c2655e59 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout.rs @@ -119,7 +119,7 @@ mod v09 { } impl garage_util::migrate::Migrate for ClusterLayout { - const VERSION_MARKER: &'static [u8] = b"Glayout09"; + const VERSION_MARKER: &'static [u8] = b"G09layout"; type Previous = v08::ClusterLayout; diff --git a/src/table/data.rs b/src/table/data.rs index 26cc3a5a..73fa93c8 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -347,9 +347,7 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> { // ---- Utility functions ---- pub fn tree_key(&self, p: &F::P, s: &F::S) -> Vec<u8> { - let mut ret = p.hash().to_vec(); - ret.extend(s.sort_key()); - ret + [p.hash().as_slice(), s.sort_key()].concat() } pub fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> { diff --git a/src/table/schema.rs b/src/table/schema.rs index 5cbf6c95..fc1a465e 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -6,6 +6,8 @@ use garage_util::migrate::Migrate; use crate::crdt::Crdt; +// =================================== PARTITION KEYS + /// Trait for field used to partition data pub trait PartitionKey: Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static @@ -29,6 +31,8 @@ impl PartitionKey for FixedBytes32 { } } +// =================================== SORT KEYS + /// Trait for field used to sort data pub trait SortKey: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static { /// Get the key used to sort @@ -47,6 +51,8 @@ impl SortKey for FixedBytes32 { } } +// =================================== SCHEMA + /// Trait for an entry in a table. It must be sortable and partitionnable. pub trait Entry<P: PartitionKey, S: SortKey>: Crdt + PartialEq + Clone + Migrate + Send + Sync + 'static diff --git a/src/web/web_server.rs b/src/web/web_server.rs index 0c7edf23..de63b842 100644 --- a/src/web/web_server.rs +++ b/src/web/web_server.rs @@ -106,7 +106,7 @@ impl WebServer { addr: SocketAddr, ) -> Result<Response<Body>, Infallible> { if let Ok(forwarded_for_ip_addr) = - forwarded_headers::handle_forwarded_for_headers(&req.headers()) + forwarded_headers::handle_forwarded_for_headers(req.headers()) { info!( "{} (via {}) {} {}", |