diff options
Diffstat (limited to 'src/api/s3_copy.rs')
-rw-r--r-- | src/api/s3_copy.rs | 660 |
1 files changed, 0 insertions, 660 deletions
diff --git a/src/api/s3_copy.rs b/src/api/s3_copy.rs deleted file mode 100644 index fc4707e2..00000000 --- a/src/api/s3_copy.rs +++ /dev/null @@ -1,660 +0,0 @@ -use std::pin::Pin; -use std::sync::Arc; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; - -use futures::{stream, stream::Stream, StreamExt, TryFutureExt}; -use md5::{Digest as Md5Digest, Md5}; - -use hyper::{Body, Request, Response}; -use serde::Serialize; - -use garage_table::*; -use garage_util::data::*; -use garage_util::time::*; - -use garage_model::block_ref_table::*; -use garage_model::garage::Garage; -use garage_model::key_table::Key; -use garage_model::object_table::*; -use garage_model::version_table::*; - -use crate::api_server::{parse_bucket_key, resolve_bucket}; -use crate::error::*; -use crate::s3_put::{decode_upload_id, get_headers}; -use crate::s3_xml::{self, xmlns_tag}; - -pub async fn handle_copy( - garage: Arc<Garage>, - api_key: &Key, - req: &Request<Body>, - dest_bucket_id: Uuid, - dest_key: &str, -) -> Result<Response<Body>, Error> { - let copy_precondition = CopyPreconditionHeaders::parse(req)?; - - let source_object = get_copy_source(&garage, api_key, req).await?; - - let (source_version, source_version_data, source_version_meta) = - extract_source_info(&source_object)?; - - // Check precondition, e.g. x-amz-copy-source-if-match - copy_precondition.check(source_version, &source_version_meta.etag)?; - - // Generate parameters for copied object - let new_uuid = gen_uuid(); - let new_timestamp = now_msec(); - - // Implement x-amz-metadata-directive: REPLACE - let new_meta = match req.headers().get("x-amz-metadata-directive") { - Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => ObjectVersionMeta { - headers: get_headers(req.headers())?, - size: source_version_meta.size, - etag: source_version_meta.etag.clone(), - }, - _ => source_version_meta.clone(), - }; - - let etag = new_meta.etag.to_string(); - - // Save object copy - match source_version_data { - ObjectVersionData::DeleteMarker => unreachable!(), - ObjectVersionData::Inline(_meta, bytes) => { - let dest_object_version = ObjectVersion { - uuid: new_uuid, - timestamp: new_timestamp, - state: ObjectVersionState::Complete(ObjectVersionData::Inline( - new_meta, - bytes.clone(), - )), - }; - let dest_object = Object::new( - dest_bucket_id, - dest_key.to_string(), - vec![dest_object_version], - ); - garage.object_table.insert(&dest_object).await?; - } - ObjectVersionData::FirstBlock(_meta, first_block_hash) => { - // Get block list from source version - let source_version = garage - .version_table - .get(&source_version.uuid, &EmptyKey) - .await?; - let source_version = source_version.ok_or(Error::NoSuchKey)?; - - // Write an "uploading" marker in Object table - // This holds a reference to the object in the Version table - // so that it won't be deleted, e.g. by repair_versions. - let tmp_dest_object_version = ObjectVersion { - uuid: new_uuid, - timestamp: new_timestamp, - state: ObjectVersionState::Uploading(new_meta.headers.clone()), - }; - let tmp_dest_object = Object::new( - dest_bucket_id, - dest_key.to_string(), - vec![tmp_dest_object_version], - ); - garage.object_table.insert(&tmp_dest_object).await?; - - // Write version in the version table. Even with empty block list, - // this means that the BlockRef entries linked to this version cannot be - // marked as deleted (they are marked as deleted only if the Version - // doesn't exist or is marked as deleted). - let mut dest_version = - Version::new(new_uuid, dest_bucket_id, dest_key.to_string(), false); - garage.version_table.insert(&dest_version).await?; - - // Fill in block list for version and insert block refs - for (bk, bv) in source_version.blocks.items().iter() { - dest_version.blocks.put(*bk, *bv); - } - let dest_block_refs = dest_version - .blocks - .items() - .iter() - .map(|b| BlockRef { - block: b.1.hash, - version: new_uuid, - deleted: false.into(), - }) - .collect::<Vec<_>>(); - futures::try_join!( - garage.version_table.insert(&dest_version), - garage.block_ref_table.insert_many(&dest_block_refs[..]), - )?; - - // Insert final object - // We do this last because otherwise there is a race condition in the case where - // the copy call has the same source and destination (this happens, rclone does - // it to update the modification timestamp for instance). If we did this concurrently - // with the stuff before, the block's reference counts could be decremented before - // they are incremented again for the new version, leading to data being deleted. - let dest_object_version = ObjectVersion { - uuid: new_uuid, - timestamp: new_timestamp, - state: ObjectVersionState::Complete(ObjectVersionData::FirstBlock( - new_meta, - *first_block_hash, - )), - }; - let dest_object = Object::new( - dest_bucket_id, - dest_key.to_string(), - vec![dest_object_version], - ); - garage.object_table.insert(&dest_object).await?; - } - } - - let last_modified = msec_to_rfc3339(new_timestamp); - let result = CopyObjectResult { - last_modified: s3_xml::Value(last_modified), - etag: s3_xml::Value(format!("\"{}\"", etag)), - }; - let xml = s3_xml::to_xml_with_header(&result)?; - - Ok(Response::builder() - .header("Content-Type", "application/xml") - .header("x-amz-version-id", hex::encode(new_uuid)) - .header( - "x-amz-copy-source-version-id", - hex::encode(source_version.uuid), - ) - .body(Body::from(xml))?) -} - -pub async fn handle_upload_part_copy( - garage: Arc<Garage>, - api_key: &Key, - req: &Request<Body>, - dest_bucket_id: Uuid, - dest_key: &str, - part_number: u64, - upload_id: &str, -) -> Result<Response<Body>, Error> { - let copy_precondition = CopyPreconditionHeaders::parse(req)?; - - let dest_version_uuid = decode_upload_id(upload_id)?; - - let dest_key = dest_key.to_string(); - let (source_object, dest_object) = futures::try_join!( - get_copy_source(&garage, api_key, req), - garage - .object_table - .get(&dest_bucket_id, &dest_key) - .map_err(Error::from), - )?; - let dest_object = dest_object.ok_or(Error::NoSuchKey)?; - - let (source_object_version, source_version_data, source_version_meta) = - extract_source_info(&source_object)?; - - // Check precondition on source, e.g. x-amz-copy-source-if-match - copy_precondition.check(source_object_version, &source_version_meta.etag)?; - - // Check source range is valid - let source_range = match req.headers().get("x-amz-copy-source-range") { - Some(range) => { - let range_str = range.to_str()?; - let mut ranges = http_range::HttpRange::parse(range_str, source_version_meta.size) - .map_err(|e| (e, source_version_meta.size))?; - if ranges.len() != 1 { - return Err(Error::BadRequest( - "Invalid x-amz-copy-source-range header: exactly 1 range must be given".into(), - )); - } else { - ranges.pop().unwrap() - } - } - None => http_range::HttpRange { - start: 0, - length: source_version_meta.size, - }, - }; - - // Check destination version is indeed in uploading state - if !dest_object - .versions() - .iter() - .any(|v| v.uuid == dest_version_uuid && v.is_uploading()) - { - return Err(Error::NoSuchUpload); - } - - // Check source version is not inlined - match source_version_data { - ObjectVersionData::DeleteMarker => unreachable!(), - ObjectVersionData::Inline(_meta, _bytes) => { - // This is only for small files, we don't bother handling this. - // (in AWS UploadPartCopy works for parts at least 5MB which - // is never the case of an inline object) - return Err(Error::BadRequest( - "Source object is too small (minimum part size is 5Mb)".into(), - )); - } - ObjectVersionData::FirstBlock(_meta, _first_block_hash) => (), - }; - - // Fetch source versin with its block list, - // and destination version to check part hasn't yet been uploaded - let (source_version, dest_version) = futures::try_join!( - garage - .version_table - .get(&source_object_version.uuid, &EmptyKey), - garage.version_table.get(&dest_version_uuid, &EmptyKey), - )?; - let source_version = source_version.ok_or(Error::NoSuchKey)?; - - // Check this part number hasn't yet been uploaded - if let Some(dv) = dest_version { - if dv.has_part_number(part_number) { - return Err(Error::BadRequest(format!( - "Part number {} has already been uploaded", - part_number - ))); - } - } - - // We want to reuse blocks from the source version as much as possible. - // However, we still need to get the data from these blocks - // because we need to know it to calculate the MD5sum of the part - // which is used as its ETag. - - // First, calculate what blocks we want to keep, - // and the subrange of the block to take, if the bounds of the - // requested range are in the middle. - let (range_begin, range_end) = (source_range.start, source_range.start + source_range.length); - - let mut blocks_to_copy = vec![]; - let mut current_offset = 0; - for (_bk, block) in source_version.blocks.items().iter() { - let (block_begin, block_end) = (current_offset, current_offset + block.size); - - if block_begin < range_end && block_end > range_begin { - let subrange_begin = if block_begin < range_begin { - Some(range_begin - block_begin) - } else { - None - }; - let subrange_end = if block_end > range_end { - Some(range_end - block_begin) - } else { - None - }; - let range_to_copy = match (subrange_begin, subrange_end) { - (Some(b), Some(e)) => Some(b as usize..e as usize), - (None, Some(e)) => Some(0..e as usize), - (Some(b), None) => Some(b as usize..block.size as usize), - (None, None) => None, - }; - - blocks_to_copy.push((block.hash, range_to_copy)); - } - - current_offset = block_end; - } - - // Now, actually copy the blocks - let mut md5hasher = Md5::new(); - - // First, create a stream that is able to read the source blocks - // and extract the subrange if necessary. - // The second returned value is an Option<Hash>, that is Some - // if and only if the block returned is a block that already existed - // in the Garage data store (thus we don't need to save it again). - let garage2 = garage.clone(); - let source_blocks = stream::iter(blocks_to_copy) - .flat_map(|(block_hash, range_to_copy)| { - let garage3 = garage2.clone(); - stream::once(async move { - let data = garage3.block_manager.rpc_get_block(&block_hash).await?; - match range_to_copy { - Some(r) => Ok((data[r].to_vec(), None)), - None => Ok((data, Some(block_hash))), - } - }) - }) - .peekable(); - - // The defragmenter is a custom stream (defined below) that concatenates - // consecutive block parts when they are too small. - // It returns a series of (Vec<u8>, Option<Hash>). - // When it is done, it returns an empty vec. - // Same as the previous iterator, the Option is Some(_) if and only if - // it's an existing block of the Garage data store. - let mut defragmenter = Defragmenter::new(garage.config.block_size, Box::pin(source_blocks)); - - let mut current_offset = 0; - let mut next_block = defragmenter.next().await?; - - loop { - let (data, existing_block_hash) = next_block; - if data.is_empty() { - break; - } - - md5hasher.update(&data[..]); - - let must_upload = existing_block_hash.is_none(); - let final_hash = existing_block_hash.unwrap_or_else(|| blake2sum(&data[..])); - - let mut version = Version::new(dest_version_uuid, dest_bucket_id, dest_key.clone(), false); - version.blocks.put( - VersionBlockKey { - part_number, - offset: current_offset, - }, - VersionBlock { - hash: final_hash, - size: data.len() as u64, - }, - ); - current_offset += data.len() as u64; - - let block_ref = BlockRef { - block: final_hash, - version: dest_version_uuid, - deleted: false.into(), - }; - - let garage2 = garage.clone(); - let res = futures::try_join!( - // Thing 1: if the block is not exactly a block that existed before, - // we need to insert that data as a new block. - async move { - if must_upload { - garage2.block_manager.rpc_put_block(final_hash, data).await - } else { - Ok(()) - } - }, - // Thing 2: we need to insert the block in the version - garage.version_table.insert(&version), - // Thing 3: we need to add a block reference - garage.block_ref_table.insert(&block_ref), - // Thing 4: we need to prefetch the next block - defragmenter.next(), - )?; - next_block = res.3; - } - - let data_md5sum = md5hasher.finalize(); - let etag = hex::encode(data_md5sum); - - // Put the part's ETag in the Versiontable - let mut version = Version::new(dest_version_uuid, dest_bucket_id, dest_key.clone(), false); - version.parts_etags.put(part_number, etag.clone()); - garage.version_table.insert(&version).await?; - - // LGTM - let resp_xml = s3_xml::to_xml_with_header(&CopyPartResult { - xmlns: (), - etag: s3_xml::Value(format!("\"{}\"", etag)), - last_modified: s3_xml::Value(msec_to_rfc3339(source_object_version.timestamp)), - })?; - - Ok(Response::builder() - .header("Content-Type", "application/xml") - .header( - "x-amz-copy-source-version-id", - hex::encode(source_object_version.uuid), - ) - .body(Body::from(resp_xml))?) -} - -async fn get_copy_source( - garage: &Garage, - api_key: &Key, - req: &Request<Body>, -) -> Result<Object, Error> { - let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?; - let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?; - - let (source_bucket, source_key) = parse_bucket_key(©_source, None)?; - let source_bucket_id = resolve_bucket(garage, &source_bucket.to_string(), api_key).await?; - - if !api_key.allow_read(&source_bucket_id) { - return Err(Error::Forbidden(format!( - "Reading from bucket {} not allowed for this key", - source_bucket - ))); - } - - let source_key = source_key.ok_or_bad_request("No source key specified")?; - - let source_object = garage - .object_table - .get(&source_bucket_id, &source_key.to_string()) - .await? - .ok_or(Error::NoSuchKey)?; - - Ok(source_object) -} - -fn extract_source_info( - source_object: &Object, -) -> Result<(&ObjectVersion, &ObjectVersionData, &ObjectVersionMeta), Error> { - let source_version = source_object - .versions() - .iter() - .rev() - .find(|v| v.is_complete()) - .ok_or(Error::NoSuchKey)?; - - let source_version_data = match &source_version.state { - ObjectVersionState::Complete(x) => x, - _ => unreachable!(), - }; - - let source_version_meta = match source_version_data { - ObjectVersionData::DeleteMarker => { - return Err(Error::NoSuchKey); - } - ObjectVersionData::Inline(meta, _bytes) => meta, - ObjectVersionData::FirstBlock(meta, _fbh) => meta, - }; - - Ok((source_version, source_version_data, source_version_meta)) -} - -struct CopyPreconditionHeaders { - copy_source_if_match: Option<Vec<String>>, - copy_source_if_modified_since: Option<SystemTime>, - copy_source_if_none_match: Option<Vec<String>>, - copy_source_if_unmodified_since: Option<SystemTime>, -} - -impl CopyPreconditionHeaders { - fn parse(req: &Request<Body>) -> Result<Self, Error> { - Ok(Self { - copy_source_if_match: req - .headers() - .get("x-amz-copy-source-if-match") - .map(|x| x.to_str()) - .transpose()? - .map(|x| { - x.split(',') - .map(|m| m.trim().trim_matches('"').to_string()) - .collect::<Vec<_>>() - }), - copy_source_if_modified_since: req - .headers() - .get("x-amz-copy-source-if-modified-since") - .map(|x| x.to_str()) - .transpose()? - .map(httpdate::parse_http_date) - .transpose() - .ok_or_bad_request("Invalid date in x-amz-copy-source-if-modified-since")?, - copy_source_if_none_match: req - .headers() - .get("x-amz-copy-source-if-none-match") - .map(|x| x.to_str()) - .transpose()? - .map(|x| { - x.split(',') - .map(|m| m.trim().trim_matches('"').to_string()) - .collect::<Vec<_>>() - }), - copy_source_if_unmodified_since: req - .headers() - .get("x-amz-copy-source-if-unmodified-since") - .map(|x| x.to_str()) - .transpose()? - .map(httpdate::parse_http_date) - .transpose() - .ok_or_bad_request("Invalid date in x-amz-copy-source-if-unmodified-since")?, - }) - } - - fn check(&self, v: &ObjectVersion, etag: &str) -> Result<(), Error> { - let v_date = UNIX_EPOCH + Duration::from_millis(v.timestamp); - - let ok = match ( - &self.copy_source_if_match, - &self.copy_source_if_unmodified_since, - &self.copy_source_if_none_match, - &self.copy_source_if_modified_since, - ) { - // TODO I'm not sure all of the conditions are evaluated correctly here - - // If we have both if-match and if-unmodified-since, - // basically we don't care about if-unmodified-since, - // because in the spec it says that if if-match evaluates to - // true but if-unmodified-since evaluates to false, - // the copy is still done. - (Some(im), _, None, None) => im.iter().any(|x| x == etag || x == "*"), - (None, Some(ius), None, None) => v_date <= *ius, - - // If we have both if-none-match and if-modified-since, - // then both of the two conditions must evaluate to true - (None, None, Some(inm), Some(ims)) => { - !inm.iter().any(|x| x == etag || x == "*") && v_date > *ims - } - (None, None, Some(inm), None) => !inm.iter().any(|x| x == etag || x == "*"), - (None, None, None, Some(ims)) => v_date > *ims, - (None, None, None, None) => true, - _ => { - return Err(Error::BadRequest( - "Invalid combination of x-amz-copy-source-if-xxxxx headers".into(), - )) - } - }; - - if ok { - Ok(()) - } else { - Err(Error::PreconditionFailed) - } - } -} - -type BlockStreamItemOk = (Vec<u8>, Option<Hash>); -type BlockStreamItem = Result<BlockStreamItemOk, garage_util::error::Error>; - -struct Defragmenter<S: Stream<Item = BlockStreamItem>> { - block_size: usize, - block_stream: Pin<Box<stream::Peekable<S>>>, - buffer: Vec<u8>, - hash: Option<Hash>, -} - -impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> { - fn new(block_size: usize, block_stream: Pin<Box<stream::Peekable<S>>>) -> Self { - Self { - block_size, - block_stream, - buffer: vec![], - hash: None, - } - } - - async fn next(&mut self) -> BlockStreamItem { - // Fill buffer while we can - while let Some(res) = self.block_stream.as_mut().peek().await { - let (peeked_next_block, _) = match res { - Ok(t) => t, - Err(_) => { - self.block_stream.next().await.unwrap()?; - unreachable!() - } - }; - - if self.buffer.is_empty() { - let (next_block, next_block_hash) = self.block_stream.next().await.unwrap()?; - self.buffer = next_block; - self.hash = next_block_hash; - } else if self.buffer.len() + peeked_next_block.len() > self.block_size { - break; - } else { - let (next_block, _) = self.block_stream.next().await.unwrap()?; - self.buffer.extend(next_block); - self.hash = None; - } - } - - Ok((std::mem::take(&mut self.buffer), self.hash.take())) - } -} - -#[derive(Debug, Serialize, PartialEq)] -pub struct CopyObjectResult { - #[serde(rename = "LastModified")] - pub last_modified: s3_xml::Value, - #[serde(rename = "ETag")] - pub etag: s3_xml::Value, -} - -#[derive(Debug, Serialize, PartialEq)] -pub struct CopyPartResult { - #[serde(serialize_with = "xmlns_tag")] - pub xmlns: (), - #[serde(rename = "LastModified")] - pub last_modified: s3_xml::Value, - #[serde(rename = "ETag")] - pub etag: s3_xml::Value, -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::s3_xml::to_xml_with_header; - - #[test] - fn copy_object_result() -> Result<(), Error> { - let copy_result = CopyObjectResult { - last_modified: s3_xml::Value(msec_to_rfc3339(0)), - etag: s3_xml::Value("\"9b2cf535f27731c974343645a3985328\"".to_string()), - }; - assert_eq!( - to_xml_with_header(©_result)?, - "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\ -<CopyObjectResult>\ - <LastModified>1970-01-01T00:00:00.000Z</LastModified>\ - <ETag>"9b2cf535f27731c974343645a3985328"</ETag>\ -</CopyObjectResult>\ - " - ); - Ok(()) - } - - #[test] - fn serialize_copy_part_result() -> Result<(), Error> { - let expected_retval = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\ -<CopyPartResult xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">\ - <LastModified>2011-04-11T20:34:56.000Z</LastModified>\ - <ETag>"9b2cf535f27731c974343645a3985328"</ETag>\ -</CopyPartResult>"; - let v = CopyPartResult { - xmlns: (), - last_modified: s3_xml::Value("2011-04-11T20:34:56.000Z".into()), - etag: s3_xml::Value("\"9b2cf535f27731c974343645a3985328\"".into()), - }; - println!("{}", to_xml_with_header(&v)?); - - assert_eq!(to_xml_with_header(&v)?, expected_retval); - - Ok(()) - } -} |