From 6617a72220f2890fba0c0b7c099baf56142c494c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 11 Jan 2022 17:31:09 +0100 Subject: Implement UploadPartCopy --- src/api/api_server.rs | 35 ++-- src/api/s3_copy.rs | 387 +++++++++++++++++++++++++++++++++++++++++---- src/api/s3_put.rs | 21 ++- src/api/s3_xml.rs | 10 ++ src/model/version_table.rs | 14 ++ 5 files changed, 413 insertions(+), 54 deletions(-) (limited to 'src') diff --git a/src/api/api_server.rs b/src/api/api_server.rs index 16156e74..a38a3c5b 100644 --- a/src/api/api_server.rs +++ b/src/api/api_server.rs @@ -156,19 +156,24 @@ async fn handler_inner(garage: Arc, req: Request) -> Result { - let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?; - let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?; - let (source_bucket, source_key) = parse_bucket_key(©_source, None)?; - let source_bucket_id = - resolve_bucket(&garage, &source_bucket.to_string(), &api_key).await?; - if !api_key.allow_read(&source_bucket_id) { - return Err(Error::Forbidden(format!( - "Reading from bucket {} not allowed for this key", - source_bucket - ))); - } - let source_key = source_key.ok_or_bad_request("No source key specified")?; - handle_copy(garage, &req, bucket_id, &key, source_bucket_id, source_key).await + handle_copy(garage, &api_key, &req, bucket_id, &key).await + } + Endpoint::UploadPartCopy { + key, + part_number, + upload_id, + .. + } => { + handle_upload_part_copy( + garage, + &api_key, + &req, + bucket_id, + &key, + part_number, + &upload_id, + ) + .await } Endpoint::PutObject { key, .. } => { handle_put(garage, req, bucket_id, &key, content_sha256).await @@ -321,7 +326,7 @@ async fn handle_request_without_bucket( } #[allow(clippy::ptr_arg)] -async fn resolve_bucket( +pub async fn resolve_bucket( garage: &Garage, bucket_name: &String, api_key: &Key, @@ -347,7 +352,7 @@ async fn resolve_bucket( /// /// S3 internally manages only buckets and keys. This function splits /// an HTTP path to get the corresponding bucket name and key. -fn parse_bucket_key<'a>( +pub fn parse_bucket_key<'a>( path: &'a str, host_bucket: Option<&'a str>, ) -> Result<(&'a str, Option<&'a str>), Error> { diff --git a/src/api/s3_copy.rs b/src/api/s3_copy.rs index cab173b1..c37bb138 100644 --- a/src/api/s3_copy.rs +++ b/src/api/s3_copy.rs @@ -1,7 +1,11 @@ use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use futures::TryFutureExt; +use md5::{Digest as Md5Digest, Md5}; + use hyper::{Body, Request, Response}; +use serde::Serialize; use garage_table::*; use garage_util::data::*; @@ -9,68 +13,50 @@ use garage_util::time::*; use garage_model::block_ref_table::*; use garage_model::garage::Garage; +use garage_model::key_table::Key; use garage_model::object_table::*; use garage_model::version_table::*; +use crate::api_server::{parse_bucket_key, resolve_bucket}; use crate::error::*; -use crate::s3_put::get_headers; -use crate::s3_xml; +use crate::s3_put::{decode_upload_id, get_headers}; +use crate::s3_xml::{self, xmlns_tag}; pub async fn handle_copy( garage: Arc, + api_key: &Key, req: &Request, dest_bucket_id: Uuid, dest_key: &str, - source_bucket_id: Uuid, - source_key: &str, ) -> Result, Error> { let copy_precondition = CopyPreconditionHeaders::parse(req)?; - let source_object = garage - .object_table - .get(&source_bucket_id, &source_key.to_string()) - .await? - .ok_or(Error::NoSuchKey)?; + let source_object = get_copy_source(&garage, api_key, req).await?; - let source_last_v = source_object - .versions() - .iter() - .rev() - .find(|v| v.is_complete()) - .ok_or(Error::NoSuchKey)?; + let (source_version, source_version_data, source_version_meta) = + extract_source_info(&source_object)?; - let source_last_state = match &source_last_v.state { - ObjectVersionState::Complete(x) => x, - _ => unreachable!(), - }; + // Check precondition, e.g. x-amz-copy-source-if-match + copy_precondition.check(source_version, &source_version_meta.etag)?; + // Generate parameters for copied object let new_uuid = gen_uuid(); let new_timestamp = now_msec(); // Implement x-amz-metadata-directive: REPLACE - let old_meta = match source_last_state { - ObjectVersionData::DeleteMarker => { - return Err(Error::NoSuchKey); - } - ObjectVersionData::Inline(meta, _bytes) => meta, - ObjectVersionData::FirstBlock(meta, _fbh) => meta, - }; let new_meta = match req.headers().get("x-amz-metadata-directive") { Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => ObjectVersionMeta { headers: get_headers(req)?, - size: old_meta.size, - etag: old_meta.etag.clone(), + size: source_version_meta.size, + etag: source_version_meta.etag.clone(), }, - _ => old_meta.clone(), + _ => source_version_meta.clone(), }; let etag = new_meta.etag.to_string(); - // Check precondition, e.g. x-amz-copy-source-if-match - copy_precondition.check(source_last_v, etag.as_str())?; - // Save object copy - match source_last_state { + match source_version_data { ObjectVersionData::DeleteMarker => unreachable!(), ObjectVersionData::Inline(_meta, bytes) => { let dest_object_version = ObjectVersion { @@ -92,7 +78,7 @@ pub async fn handle_copy( // Get block list from source version let source_version = garage .version_table - .get(&source_last_v.uuid, &EmptyKey) + .get(&source_version.uuid, &EmptyKey) .await?; let source_version = source_version.ok_or(Error::NoSuchKey)?; @@ -173,11 +159,309 @@ pub async fn handle_copy( .header("x-amz-version-id", hex::encode(new_uuid)) .header( "x-amz-copy-source-version-id", - hex::encode(source_last_v.uuid), + hex::encode(source_version.uuid), ) .body(Body::from(xml))?) } +pub async fn handle_upload_part_copy( + garage: Arc, + api_key: &Key, + req: &Request, + dest_bucket_id: Uuid, + dest_key: &str, + part_number: u64, + upload_id: &str, +) -> Result, Error> { + let copy_precondition = CopyPreconditionHeaders::parse(req)?; + + let dest_version_uuid = decode_upload_id(upload_id)?; + + let dest_key = dest_key.to_string(); + let (source_object, dest_object) = futures::try_join!( + get_copy_source(&garage, api_key, req), + garage + .object_table + .get(&dest_bucket_id, &dest_key) + .map_err(Error::from), + )?; + let dest_object = dest_object.ok_or(Error::NoSuchKey)?; + + let (source_object_version, source_version_data, source_version_meta) = + extract_source_info(&source_object)?; + + // Check precondition on source, e.g. x-amz-copy-source-if-match + copy_precondition.check(source_object_version, &source_version_meta.etag)?; + + // Check source range is valid + let source_range = match req.headers().get("x-amz-copy-source-range") { + Some(range) => { + let range_str = range.to_str()?; + let mut ranges = http_range::HttpRange::parse(range_str, source_version_meta.size) + .map_err(|e| (e, source_version_meta.size))?; + if ranges.len() != 1 { + return Err(Error::BadRequest( + "Invalid x-amz-copy-source-range header: exactly 1 range must be given".into(), + )); + } else { + ranges.pop().unwrap() + } + } + None => http_range::HttpRange { + start: 0, + length: source_version_meta.size, + }, + }; + + // Check destination version is indeed in uploading state + if !dest_object + .versions() + .iter() + .any(|v| v.uuid == dest_version_uuid && v.is_uploading()) + { + return Err(Error::NoSuchUpload); + } + + // Check source version is not inlined + match source_version_data { + ObjectVersionData::DeleteMarker => unreachable!(), + ObjectVersionData::Inline(_meta, _bytes) => { + // This is only for small files, we don't bother handling this. + // (in AWS UploadPartCopy works for parts at least 5MB which + // is never the case of an inline object) + return Err(Error::BadRequest( + "Source object is too small (minimum part size is 5Mb)".into(), + )); + } + ObjectVersionData::FirstBlock(_meta, _first_block_hash) => (), + }; + + // Fetch source versin with its block list, + // and destination version to check part hasn't yet been uploaded + let (source_version, dest_version) = futures::try_join!( + garage + .version_table + .get(&source_object_version.uuid, &EmptyKey), + garage.version_table.get(&dest_version_uuid, &EmptyKey), + )?; + let source_version = source_version.ok_or(Error::NoSuchKey)?; + + // Check this part number hasn't yet been uploaded + if let Some(dv) = dest_version { + if dv.has_part_number(part_number) { + return Err(Error::BadRequest(format!( + "Part number {} has already been uploaded", + part_number + ))); + } + } + + // We want to reuse blocks from the source version as much as possible. + // However, we still need to get the data from these blocks + // because we need to know it to calculate the MD5sum of the part + // which is used as its ETag. + + // First, calculate what blocks we want to keep, + // and the subrange of the block to take, if the bounds of the + // requested range are in the middle. + let (range_begin, range_end) = (source_range.start, source_range.start + source_range.length); + + let mut blocks_to_copy = vec![]; + let mut current_offset = 0; + let mut size_to_copy = 0; + for (_bk, block) in source_version.blocks.items().iter() { + let (block_begin, block_end) = (current_offset, current_offset + block.size); + + if block_begin < range_end && block_end > range_begin { + let subrange_begin = if block_begin < range_begin { + Some(range_begin - block_begin) + } else { + None + }; + let subrange_end = if block_end > range_end { + Some(range_end - block_begin) + } else { + None + }; + let range_to_copy = match (subrange_begin, subrange_end) { + (Some(b), Some(e)) => Some(b as usize..e as usize), + (None, Some(e)) => Some(0..e as usize), + (Some(b), None) => Some(b as usize..block.size as usize), + (None, None) => None, + }; + size_to_copy += range_to_copy + .as_ref() + .map(|x| x.len() as u64) + .unwrap_or(block.size); + + blocks_to_copy.push((block.hash, range_to_copy)); + } + + current_offset = block_end; + } + + if size_to_copy < 1024 * 1024 { + return Err(Error::BadRequest(format!( + "Not enough data to copy: {} bytes (minimum: 1MB)", + size_to_copy + ))); + } + + // Now, actually copy the blocks + let mut md5hasher = Md5::new(); + + let mut block = Some( + garage + .block_manager + .rpc_get_block(&blocks_to_copy[0].0) + .await?, + ); + + let mut current_offset = 0; + for (i, (block_hash, range_to_copy)) in blocks_to_copy.iter().enumerate() { + let (current_block, subrange_hash) = match range_to_copy.clone() { + Some(r) => { + let subrange = block.take().unwrap()[r].to_vec(); + let hash = blake2sum(&subrange); + (subrange, hash) + } + None => (block.take().unwrap(), *block_hash), + }; + md5hasher.update(¤t_block[..]); + + let mut version = Version::new(dest_version_uuid, dest_bucket_id, dest_key.clone(), false); + version.blocks.put( + VersionBlockKey { + part_number, + offset: current_offset, + }, + VersionBlock { + hash: subrange_hash, + size: current_block.len() as u64, + }, + ); + current_offset += current_block.len() as u64; + + let block_ref = BlockRef { + block: subrange_hash, + version: dest_version_uuid, + deleted: false.into(), + }; + + let next_block_hash = blocks_to_copy.get(i + 1).map(|(h, _)| *h); + + let garage2 = garage.clone(); + let garage3 = garage.clone(); + let is_subrange = range_to_copy.is_some(); + + let (_, _, _, next_block) = futures::try_join!( + // Thing 1: if we are taking a subrange of the source block, + // we need to insert that subrange as a new block. + async move { + if is_subrange { + garage2 + .block_manager + .rpc_put_block(subrange_hash, current_block) + .await + } else { + Ok(()) + } + }, + // Thing 2: we need to insert the block in the version + garage.version_table.insert(&version), + // Thing 3: we need to add a block reference + garage.block_ref_table.insert(&block_ref), + // Thing 4: we need to prefetch the next block + async move { + match next_block_hash { + Some(h) => Ok(Some(garage3.block_manager.rpc_get_block(&h).await?)), + None => Ok(None), + } + }, + )?; + + block = next_block; + } + + let data_md5sum = md5hasher.finalize(); + let etag = hex::encode(data_md5sum); + + // Put the part's ETag in the Versiontable + let mut version = Version::new(dest_version_uuid, dest_bucket_id, dest_key.clone(), false); + version.parts_etags.put(part_number, etag.clone()); + garage.version_table.insert(&version).await?; + + // LGTM + let resp_xml = s3_xml::to_xml_with_header(&CopyPartResult { + xmlns: (), + etag: s3_xml::Value(etag), + last_modified: s3_xml::Value(msec_to_rfc3339(source_object_version.timestamp)), + })?; + + Ok(Response::builder() + .header("Content-Type", "application/xml") + .header( + "x-amz-copy-source-version-id", + hex::encode(source_object_version.uuid), + ) + .body(Body::from(resp_xml))?) +} + +async fn get_copy_source( + garage: &Garage, + api_key: &Key, + req: &Request, +) -> Result { + let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?; + let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?; + + let (source_bucket, source_key) = parse_bucket_key(©_source, None)?; + let source_bucket_id = resolve_bucket(garage, &source_bucket.to_string(), api_key).await?; + + if !api_key.allow_read(&source_bucket_id) { + return Err(Error::Forbidden(format!( + "Reading from bucket {} not allowed for this key", + source_bucket + ))); + } + + let source_key = source_key.ok_or_bad_request("No source key specified")?; + + let source_object = garage + .object_table + .get(&source_bucket_id, &source_key.to_string()) + .await? + .ok_or(Error::NoSuchKey)?; + + Ok(source_object) +} + +fn extract_source_info( + source_object: &Object, +) -> Result<(&ObjectVersion, &ObjectVersionData, &ObjectVersionMeta), Error> { + let source_version = source_object + .versions() + .iter() + .rev() + .find(|v| v.is_complete()) + .ok_or(Error::NoSuchKey)?; + + let source_version_data = match &source_version.state { + ObjectVersionState::Complete(x) => x, + _ => unreachable!(), + }; + + let source_version_meta = match source_version_data { + ObjectVersionData::DeleteMarker => { + return Err(Error::NoSuchKey); + } + ObjectVersionData::Inline(meta, _bytes) => meta, + ObjectVersionData::FirstBlock(meta, _fbh) => meta, + }; + + Ok((source_version, source_version_data, source_version_meta)) +} + struct CopyPreconditionHeaders { copy_source_if_match: Option>, copy_source_if_modified_since: Option, @@ -267,3 +551,36 @@ impl CopyPreconditionHeaders { } } } + +#[derive(Debug, Serialize, PartialEq)] +pub struct CopyPartResult { + #[serde(serialize_with = "xmlns_tag")] + pub xmlns: (), + #[serde(rename = "LastModified")] + pub last_modified: s3_xml::Value, + #[serde(rename = "ETag")] + pub etag: s3_xml::Value, +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::s3_xml::to_xml_with_header; + + #[test] + fn serialize_copy_part_result() -> Result<(), Error> { + // @FIXME: ETag should be quoted, but we can't add quotes + // because XML serializer replaces them by `"` + let expected_retval = r#"2011-04-11T20:34:56.000Z9b2cf535f27731c974343645a3985328"#; + let v = CopyPartResult { + xmlns: (), + last_modified: s3_xml::Value("2011-04-11T20:34:56.000Z".into()), + etag: s3_xml::Value("9b2cf535f27731c974343645a3985328".into()), + }; + println!("{}", to_xml_with_header(&v)?); + + assert_eq!(to_xml_with_header(&v)?, expected_retval); + + Ok(()) + } +} diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index d7ee5893..421b94a1 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -370,12 +370,15 @@ pub async fn handle_put_part( let key = key.to_string(); let mut chunker = BodyChunker::new(req.into_body(), garage.config.block_size); - let (object, first_block) = - futures::try_join!(garage.object_table.get(&bucket_id, &key), chunker.next(),)?; + let (object, version, first_block) = futures::try_join!( + garage.object_table.get(&bucket_id, &key), + garage.version_table.get(&version_uuid, &EmptyKey), + chunker.next() + )?; // Check object is valid and multipart block can be accepted - let first_block = first_block.ok_or_else(|| Error::BadRequest("Empty body".to_string()))?; - let object = object.ok_or_else(|| Error::BadRequest("Object not found".to_string()))?; + let first_block = first_block.ok_or_bad_request("Empty body")?; + let object = object.ok_or_bad_request("Object not found")?; if !object .versions() @@ -385,6 +388,16 @@ pub async fn handle_put_part( return Err(Error::NoSuchUpload); } + // Check part hasn't already been uploaded + if let Some(v) = version { + if v.has_part_number(part_number) { + return Err(Error::BadRequest(format!( + "Part number {} has already been uploaded", + part_number + ))); + } + } + // Copy block to store let version = Version::new(version_uuid, bucket_id, key, false); let first_block_hash = blake2sum(&first_block[..]); diff --git a/src/api/s3_xml.rs b/src/api/s3_xml.rs index 98c63d57..962b4780 100644 --- a/src/api/s3_xml.rs +++ b/src/api/s3_xml.rs @@ -428,6 +428,8 @@ mod tests { #[test] fn copy_object_result() -> Result<(), ApiError> { + // @FIXME: ETag should be quoted, but we can't add quotes + // because XML serializer replaces them by `"` let copy_result = CopyObjectResult { last_modified: Value(msec_to_rfc3339(0)), etag: Value("9b2cf535f27731c974343645a3985328".to_string()), @@ -466,6 +468,8 @@ mod tests { #[test] fn complete_multipart_upload_result() -> Result<(), ApiError> { + // @FIXME: ETag should be quoted, but we can't add quotes + // because XML serializer replaces them by `"` let result = CompleteMultipartUploadResult { xmlns: (), location: Some(Value("https://garage.tld/mybucket/a/plop".to_string())), @@ -540,6 +544,8 @@ mod tests { #[test] fn list_objects_v1_1() -> Result<(), ApiError> { + // @FIXME: ETag should be quoted, but we can't add quotes + // because XML serializer replaces them by `"` let result = ListBucketResult { xmlns: (), name: Value("example-bucket".to_string()), @@ -639,6 +645,8 @@ mod tests { #[test] fn list_objects_v2_1() -> Result<(), ApiError> { + // @FIXME: ETag should be quoted, but we can't add quotes + // because XML serializer replaces them by `"` let result = ListBucketResult { xmlns: (), name: Value("quotes".to_string()), @@ -685,6 +693,8 @@ mod tests { #[test] fn list_objects_v2_2() -> Result<(), ApiError> { + // @FIXME: ETag should be quoted, but we can't add quotes + // because XML serializer replaces them by `"` let result = ListBucketResult { xmlns: (), name: Value("bucket".to_string()), diff --git a/src/model/version_table.rs b/src/model/version_table.rs index e0b99770..839b1f4f 100644 --- a/src/model/version_table.rs +++ b/src/model/version_table.rs @@ -47,6 +47,20 @@ impl Version { key, } } + + pub fn has_part_number(&self, part_number: u64) -> bool { + let case1 = self + .parts_etags + .items() + .binary_search_by(|(k, _)| k.cmp(&part_number)) + .is_ok(); + let case2 = self + .blocks + .items() + .binary_search_by(|(k, _)| k.part_number.cmp(&part_number)) + .is_ok(); + case1 || case2 + } } #[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)] -- cgit v1.2.3