use std::pin::Pin; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use futures::{stream, stream::Stream, StreamExt, TryFutureExt}; use md5::{Digest as Md5Digest, Md5}; use bytes::Bytes; use hyper::{Body, Request, Response}; use serde::Serialize; use garage_rpc::netapp::bytes_buf::BytesBuf; use garage_rpc::rpc_helper::OrderTag; use garage_table::*; use garage_util::data::*; use garage_util::time::*; use garage_model::garage::Garage; use garage_model::key_table::Key; use garage_model::s3::block_ref_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; use crate::helpers::parse_bucket_key; use crate::s3::error::*; use crate::s3::put::{decode_upload_id, get_headers}; use crate::s3::xml::{self as s3_xml, xmlns_tag}; pub async fn handle_copy( garage: Arc, api_key: &Key, req: &Request, dest_bucket_id: Uuid, dest_key: &str, ) -> Result, Error> { let copy_precondition = CopyPreconditionHeaders::parse(req)?; let source_object = get_copy_source(&garage, api_key, req).await?; let (source_version, source_version_data, source_version_meta) = extract_source_info(&source_object)?; // Check precondition, e.g. x-amz-copy-source-if-match copy_precondition.check(source_version, &source_version_meta.etag)?; // Generate parameters for copied object let new_uuid = gen_uuid(); let new_timestamp = now_msec(); // Implement x-amz-metadata-directive: REPLACE let new_meta = match req.headers().get("x-amz-metadata-directive") { Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => ObjectVersionMeta { headers: get_headers(req.headers())?, size: source_version_meta.size, etag: source_version_meta.etag.clone(), }, _ => source_version_meta.clone(), }; let etag = new_meta.etag.to_string(); // Save object copy match source_version_data { ObjectVersionData::DeleteMarker => unreachable!(), ObjectVersionData::Inline(_meta, bytes) => { let dest_object_version = ObjectVersion { uuid: new_uuid, timestamp: new_timestamp, state: ObjectVersionState::Complete(ObjectVersionData::Inline( new_meta, bytes.clone(), )), }; let dest_object = Object::new( dest_bucket_id, dest_key.to_string(), vec![dest_object_version], ); garage.object_table.insert(&dest_object).await?; } ObjectVersionData::FirstBlock(_meta, first_block_hash) => { // Get block list from source version let source_version = garage .version_table .get(&source_version.uuid, &EmptyKey) .await?; let source_version = source_version.ok_or(Error::NoSuchKey)?; // Write an "uploading" marker in Object table // This holds a reference to the object in the Version table // so that it won't be deleted, e.g. by repair_versions. let tmp_dest_object_version = ObjectVersion { uuid: new_uuid, timestamp: new_timestamp, state: ObjectVersionState::Uploading(new_meta.headers.clone()), }; let tmp_dest_object = Object::new( dest_bucket_id, dest_key.to_string(), vec![tmp_dest_object_version], ); garage.object_table.insert(&tmp_dest_object).await?; // Write version in the version table. Even with empty block list, // this means that the BlockRef entries linked to this version cannot be // marked as deleted (they are marked as deleted only if the Version // doesn't exist or is marked as deleted). let mut dest_version = Version::new(new_uuid, dest_bucket_id, dest_key.to_string(), false); garage.version_table.insert(&dest_version).await?; // Fill in block list for version and insert block refs for (bk, bv) in source_version.blocks.items().iter() { dest_version.blocks.put(*bk, *bv); } let dest_block_refs = dest_version .blocks .items() .iter() .map(|b| BlockRef { block: b.1.hash, version: new_uuid, deleted: false.into(), }) .collect::>(); futures::try_join!( garage.version_table.insert(&dest_version), garage.block_ref_table.insert_many(&dest_block_refs[..]), )?; // Insert final object // We do this last because otherwise there is a race condition in the case where // the copy call has the same source and destination (this happens, rclone does // it to update the modification timestamp for instance). If we did this concurrently // with the stuff before, the block's reference counts could be decremented before // they are incremented again for the new version, leading to data being deleted. let dest_object_version = ObjectVersion { uuid: new_uuid, timestamp: new_timestamp, state: ObjectVersionState::Complete(ObjectVersionData::FirstBlock( new_meta, *first_block_hash, )), }; let dest_object = Object::new( dest_bucket_id, dest_key.to_string(), vec![dest_object_version], ); garage.object_table.insert(&dest_object).await?; } } let last_modified = msec_to_rfc3339(new_timestamp); let result = CopyObjectResult { last_modified: s3_xml::Value(last_modified), etag: s3_xml::Value(format!("\"{}\"", etag)), }; let xml = s3_xml::to_xml_with_header(&result)?; Ok(Response::builder() .header("Content-Type", "application/xml") .header("x-amz-version-id", hex::encode(new_uuid)) .header( "x-amz-copy-source-version-id", hex::encode(source_version.uuid), ) .body(Body::from(xml))?) } pub async fn handle_upload_part_copy( garage: Arc, api_key: &Key, req: &Request, dest_bucket_id: Uuid, dest_key: &str, part_number: u64, upload_id: &str, ) -> Result, Error> { let copy_precondition = CopyPreconditionHeaders::parse(req)?; let dest_version_uuid = decode_upload_id(upload_id)?; let dest_key = dest_key.to_string(); let (source_object, dest_object) = futures::try_join!( get_copy_source(&garage, api_key, req), garage .object_table .get(&dest_bucket_id, &dest_key) .map_err(Error::from), )?; let dest_object = dest_object.ok_or(Error::NoSuchKey)?; let (source_object_version, source_version_data, source_version_meta) = extract_source_info(&source_object)?; // Check precondition on source, e.g. x-amz-copy-source-if-match copy_precondition.check(source_object_version, &source_version_meta.etag)?; // Check source range is valid let source_range = match req.headers().get("x-amz-copy-source-range") { Some(range) => { let range_str = range.to_str()?; let mut ranges = http_range::HttpRange::parse(range_str, source_version_meta.size) .map_err(|e| (e, source_version_meta.size))?; if ranges.len() != 1 { return Err(Error::bad_request( "Invalid x-amz-copy-source-range header: exactly 1 range must be given", )); } else { ranges.pop().unwrap() } } None => http_range::HttpRange { start: 0, length: source_version_meta.size, }, }; // Check destination version is indeed in uploading state if !dest_object .versions() .iter() .any(|v| v.uuid == dest_version_uuid && v.is_uploading()) { return Err(Error::NoSuchUpload); } // Check source version is not inlined match source_version_data { ObjectVersionData::DeleteMarker => unreachable!(), ObjectVersionData::Inline(_meta, _bytes) => { // This is only for small files, we don't bother handling this. // (in AWS UploadPartCopy works for parts at least 5MB which // is never the case of an inline object) return Err(Error::bad_request( "Source object is too small (minimum part size is 5Mb)", )); } ObjectVersionData::FirstBlock(_meta, _first_block_hash) => (), }; // Fetch source versin with its block list, // and destination version to check part hasn't yet been uploaded let (source_version, dest_version) = futures::try_join!( garage .version_table .get(&source_object_version.uuid, &EmptyKey), garage.version_table.get(&dest_version_uuid, &EmptyKey), )?; let source_version = source_version.ok_or(Error::NoSuchKey)?; // Check this part number hasn't yet been uploaded if let Some(dv) = dest_version { if dv.has_part_number(part_number) { return Err(Error::bad_request(format!( "Part number {} has already been uploaded", part_number ))); } } // We want to reuse blocks from the source version as much as possible. // However, we still need to get the data from these blocks // because we need to know it to calculate the MD5sum of the part // which is used as its ETag. // First, calculate what blocks we want to keep, // and the subrange of the block to take, if the bounds of the // requested range are in the middle. let (range_begin, range_end) = (source_range.start, source_range.start + source_range.length); let mut blocks_to_copy = vec![]; let mut current_offset = 0; for (_bk, block) in source_version.blocks.items().iter() { let (block_begin, block_end) = (current_offset, current_offset + block.size); if block_begin < range_end && block_end > range_begin { let subrange_begin = if block_begin < range_begin { Some(range_begin - block_begin) } else { None }; let subrange_end = if block_end > range_end { Some(range_end - block_begin) } else { None }; let range_to_copy = match (subrange_begin, subrange_end) { (Some(b), Some(e)) => Some(b as usize..e as usize), (None, Some(e)) => Some(0..e as usize), (Some(b), None) => Some(b as usize..block.size as usize), (None, None) => None, }; blocks_to_copy.push((block.hash, range_to_copy)); } current_offset = block_end; } // Now, actually copy the blocks let mut md5hasher = Md5::new(); // First, create a stream that is able to read the source blocks // and extract the subrange if necessary. // The second returned value is an Option, that is Some // if and only if the block returned is a block that already existed // in the Garage data store (thus we don't need to save it again). let garage2 = garage.clone(); let order_stream = OrderTag::stream(); let source_blocks = stream::iter(blocks_to_copy) .enumerate() .flat_map(|(i, (block_hash, range_to_copy))| { let garage3 = garage2.clone(); stream::once(async move { let data = garage3 .block_manager .rpc_get_block(&block_hash, Some(order_stream.order(i as u64))) .await?; match range_to_copy { Some(r) => Ok((data.slice(r), None)), None => Ok((data, Some(block_hash))), } }) }) .peekable(); // The defragmenter is a custom stream (defined below) that concatenates // consecutive block parts when they are too small. // It returns a series of (Vec, Option). // When it is done, it returns an empty vec. // Same as the previous iterator, the Option is Some(_) if and only if // it's an existing block of the Garage data store. let mut defragmenter = Defragmenter::new(garage.config.block_size, Box::pin(source_blocks)); let mut current_offset = 0; let mut next_block = defragmenter.next().await?; loop { let (data, existing_block_hash) = next_block; if data.is_empty() { break; } md5hasher.update(&data[..]); let must_upload = existing_block_hash.is_none(); let final_hash = existing_block_hash.unwrap_or_else(|| blake2sum(&data[..])); let mut version = Version::new(dest_version_uuid, dest_bucket_id, dest_key.clone(), false); version.blocks.put( VersionBlockKey { part_number, offset: current_offset, }, VersionBlock { hash: final_hash, size: data.len() as u64, }, ); current_offset += data.len() as u64; let block_ref = BlockRef { block: final_hash, version: dest_version_uuid, deleted: false.into(), }; let garage2 = garage.clone(); let res = futures::try_join!( // Thing 1: if the block is not exactly a block that existed before, // we need to insert that data as a new block. async move { if must_upload { garage2.block_manager.rpc_put_block(final_hash, data).await } else { Ok(()) } }, // Thing 2: we need to insert the block in the version garage.version_table.insert(&version), // Thing 3: we need to add a block reference garage.block_ref_table.insert(&block_ref), // Thing 4: we need to prefetch the next block defragmenter.next(), )?; next_block = res.3; } let data_md5sum = md5hasher.finalize(); let etag = hex::encode(data_md5sum); // Put the part's ETag in the Versiontable let mut version = Version::new(dest_version_uuid, dest_bucket_id, dest_key.clone(), false); version.parts_etags.put(part_number, etag.clone()); garage.version_table.insert(&version).await?; // LGTM let resp_xml = s3_xml::to_xml_with_header(&CopyPartResult { xmlns: (), etag: s3_xml::Value(format!("\"{}\"", etag)), last_modified: s3_xml::Value(msec_to_rfc3339(source_object_version.timestamp)), })?; Ok(Response::builder() .header("Content-Type", "application/xml") .header( "x-amz-copy-source-version-id", hex::encode(source_object_version.uuid), ) .body(Body::from(resp_xml))?) } async fn get_copy_source( garage: &Garage, api_key: &Key, req: &Request, ) -> Result { let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?; let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?; let (source_bucket, source_key) = parse_bucket_key(©_source, None)?; let source_bucket_id = garage .bucket_helper() .resolve_bucket(&source_bucket.to_string(), api_key) .await?; if !api_key.allow_read(&source_bucket_id) { return Err(Error::forbidden(format!( "Reading from bucket {} not allowed for this key", source_bucket ))); } let source_key = source_key.ok_or_bad_request("No source key specified")?; let source_object = garage .object_table .get(&source_bucket_id, &source_key.to_string()) .await? .ok_or(Error::NoSuchKey)?; Ok(source_object) } fn extract_source_info( source_object: &Object, ) -> Result<(&ObjectVersion, &ObjectVersionData, &ObjectVersionMeta), Error> { let source_version = source_object .versions() .iter() .rev() .find(|v| v.is_complete()) .ok_or(Error::NoSuchKey)?; let source_version_data = match &source_version.state { ObjectVersionState::Complete(x) => x, _ => unreachable!(), }; let source_version_meta = match source_version_data { ObjectVersionData::DeleteMarker => { return Err(Error::NoSuchKey); } ObjectVersionData::Inline(meta, _bytes) => meta, ObjectVersionData::FirstBlock(meta, _fbh) => meta, }; Ok((source_version, source_version_data, source_version_meta)) } struct CopyPreconditionHeaders { copy_source_if_match: Option>, copy_source_if_modified_since: Option, copy_source_if_none_match: Option>, copy_source_if_unmodified_since: Option, } impl CopyPreconditionHeaders { fn parse(req: &Request) -> Result { Ok(Self { copy_source_if_match: req .headers() .get("x-amz-copy-source-if-match") .map(|x| x.to_str()) .transpose()? .map(|x| { x.split(',') .map(|m| m.trim().trim_matches('"').to_string()) .collect::>() }), copy_source_if_modified_since: req .headers() .get("x-amz-copy-source-if-modified-since") .map(|x| x.to_str()) .transpose()? .map(httpdate::parse_http_date) .transpose() .ok_or_bad_request("Invalid date in x-amz-copy-source-if-modified-since")?, copy_source_if_none_match: req .headers() .get("x-amz-copy-source-if-none-match") .map(|x| x.to_str()) .transpose()? .map(|x| { x.split(',') .map(|m| m.trim().trim_matches('"').to_string()) .collect::>() }), copy_source_if_unmodified_since: req .headers() .get("x-amz-copy-source-if-unmodified-since") .map(|x| x.to_str()) .transpose()? .map(httpdate::parse_http_date) .transpose() .ok_or_bad_request("Invalid date in x-amz-copy-source-if-unmodified-since")?, }) } fn check(&self, v: &ObjectVersion, etag: &str) -> Result<(), Error> { let v_date = UNIX_EPOCH + Duration::from_millis(v.timestamp); let ok = match ( &self.copy_source_if_match, &self.copy_source_if_unmodified_since, &self.copy_source_if_none_match, &self.copy_source_if_modified_since, ) { // TODO I'm not sure all of the conditions are evaluated correctly here // If we have both if-match and if-unmodified-since, // basically we don't care about if-unmodified-since, // because in the spec it says that if if-match evaluates to // true but if-unmodified-since evaluates to false, // the copy is still done. (Some(im), _, None, None) => im.iter().any(|x| x == etag || x == "*"), (None, Some(ius), None, None) => v_date <= *ius, // If we have both if-none-match and if-modified-since, // then both of the two conditions must evaluate to true (None, None, Some(inm), Some(ims)) => { !inm.iter().any(|x| x == etag || x == "*") && v_date > *ims } (None, None, Some(inm), None) => !inm.iter().any(|x| x == etag || x == "*"), (None, None, None, Some(ims)) => v_date > *ims, (None, None, None, None) => true, _ => { return Err(Error::bad_request( "Invalid combination of x-amz-copy-source-if-xxxxx headers", )) } }; if ok { Ok(()) } else { Err(Error::PreconditionFailed) } } } type BlockStreamItemOk = (Bytes, Option); type BlockStreamItem = Result; struct Defragmenter> { block_size: usize, block_stream: Pin>>, buffer: BytesBuf, hash: Option, } impl> Defragmenter { fn new(block_size: usize, block_stream: Pin>>) -> Self { Self { block_size, block_stream, buffer: BytesBuf::new(), hash: None, } } async fn next(&mut self) -> BlockStreamItem { // Fill buffer while we can while let Some(res) = self.block_stream.as_mut().peek().await { let (peeked_next_block, _) = match res { Ok(t) => t, Err(_) => { self.block_stream.next().await.unwrap()?; unreachable!() } }; if self.buffer.is_empty() { let (next_block, next_block_hash) = self.block_stream.next().await.unwrap()?; self.buffer.extend(next_block); self.hash = next_block_hash; } else if self.buffer.len() + peeked_next_block.len() > self.block_size { break; } else { let (next_block, _) = self.block_stream.next().await.unwrap()?; self.buffer.extend(next_block); self.hash = None; } } Ok((self.buffer.take_all(), self.hash.take())) } } #[derive(Debug, Serialize, PartialEq)] pub struct CopyObjectResult { #[serde(rename = "LastModified")] pub last_modified: s3_xml::Value, #[serde(rename = "ETag")] pub etag: s3_xml::Value, } #[derive(Debug, Serialize, PartialEq)] pub struct CopyPartResult { #[serde(serialize_with = "xmlns_tag")] pub xmlns: (), #[serde(rename = "LastModified")] pub last_modified: s3_xml::Value, #[serde(rename = "ETag")] pub etag: s3_xml::Value, } #[cfg(test)] mod tests { use super::*; use crate::s3::xml::to_xml_with_header; #[test] fn copy_object_result() -> Result<(), Error> { let copy_result = CopyObjectResult { last_modified: s3_xml::Value(msec_to_rfc3339(0)), etag: s3_xml::Value("\"9b2cf535f27731c974343645a3985328\"".to_string()), }; assert_eq!( to_xml_with_header(©_result)?, "\ \ 1970-01-01T00:00:00.000Z\ "9b2cf535f27731c974343645a3985328"\ \ " ); Ok(()) } #[test] fn serialize_copy_part_result() -> Result<(), Error> { let expected_retval = "\ \ 2011-04-11T20:34:56.000Z\ "9b2cf535f27731c974343645a3985328"\ "; let v = CopyPartResult { xmlns: (), last_modified: s3_xml::Value("2011-04-11T20:34:56.000Z".into()), etag: s3_xml::Value("\"9b2cf535f27731c974343645a3985328\"".into()), }; assert_eq!(to_xml_with_header(&v)?, expected_retval); Ok(()) } }