aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3_copy.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/s3_copy.rs')
-rw-r--r--src/api/s3_copy.rs106
1 files changed, 84 insertions, 22 deletions
diff --git a/src/api/s3_copy.rs b/src/api/s3_copy.rs
index b6ec48b0..187fe347 100644
--- a/src/api/s3_copy.rs
+++ b/src/api/s3_copy.rs
@@ -1,11 +1,11 @@
use std::fmt::Write;
use std::sync::Arc;
-use chrono::{SecondsFormat, Utc};
-use hyper::{Body, Response};
+use hyper::{Body, Request, Response};
use garage_table::*;
use garage_util::data::*;
+use garage_util::time::*;
use garage_model::block_ref_table::*;
use garage_model::garage::Garage;
@@ -13,9 +13,11 @@ use garage_model::object_table::*;
use garage_model::version_table::*;
use crate::error::*;
+use crate::s3_put::get_headers;
pub async fn handle_copy(
garage: Arc<Garage>,
+ req: &Request<Body>,
dest_bucket: &str,
dest_key: &str,
source_bucket: &str,
@@ -41,17 +43,37 @@ pub async fn handle_copy(
};
let new_uuid = gen_uuid();
- let dest_object_version = ObjectVersion {
- uuid: new_uuid,
- timestamp: now_msec(),
- state: ObjectVersionState::Complete(source_last_state.clone()),
- };
+ let new_timestamp = now_msec();
- match &source_last_state {
+ // Implement x-amz-metadata-directive: REPLACE
+ let old_meta = match source_last_state {
ObjectVersionData::DeleteMarker => {
return Err(Error::NotFound);
}
- ObjectVersionData::Inline(_meta, _bytes) => {
+ ObjectVersionData::Inline(meta, _bytes) => meta,
+ ObjectVersionData::FirstBlock(meta, _fbh) => meta,
+ };
+ let new_meta = match req.headers().get("x-amz-metadata-directive") {
+ Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => ObjectVersionMeta {
+ headers: get_headers(req)?,
+ size: old_meta.size,
+ etag: old_meta.etag.clone(),
+ },
+ _ => old_meta.clone(),
+ };
+
+ // Save object copy
+ match source_last_state {
+ ObjectVersionData::DeleteMarker => unreachable!(),
+ ObjectVersionData::Inline(_meta, bytes) => {
+ let dest_object_version = ObjectVersion {
+ uuid: new_uuid,
+ timestamp: new_timestamp,
+ state: ObjectVersionState::Complete(ObjectVersionData::Inline(
+ new_meta,
+ bytes.clone(),
+ )),
+ };
let dest_object = Object::new(
dest_bucket.to_string(),
dest_key.to_string(),
@@ -59,44 +81,84 @@ pub async fn handle_copy(
);
garage.object_table.insert(&dest_object).await?;
}
- ObjectVersionData::FirstBlock(_meta, _first_block_hash) => {
+ ObjectVersionData::FirstBlock(_meta, first_block_hash) => {
+ // Get block list from source version
let source_version = garage
.version_table
.get(&source_last_v.uuid, &EmptyKey)
.await?;
let source_version = source_version.ok_or(Error::NotFound)?;
- let dest_version = Version::new(
- new_uuid,
+ // Write an "uploading" marker in Object table
+ // This holds a reference to the object in the Version table
+ // so that it won't be deleted, e.g. by repair_versions.
+ let tmp_dest_object_version = ObjectVersion {
+ uuid: new_uuid,
+ timestamp: new_timestamp,
+ state: ObjectVersionState::Uploading(new_meta.headers.clone()),
+ };
+ let tmp_dest_object = Object::new(
dest_bucket.to_string(),
dest_key.to_string(),
- false,
- source_version.blocks().to_vec(),
+ vec![tmp_dest_object_version],
);
- let dest_object = Object::new(
+ garage.object_table.insert(&tmp_dest_object).await?;
+
+ // Write version in the version table. Even with empty block list,
+ // this means that the BlockRef entries linked to this version cannot be
+ // marked as deleted (they are marked as deleted only if the Version
+ // doesn't exist or is marked as deleted).
+ let mut dest_version = Version::new(
+ new_uuid,
dest_bucket.to_string(),
dest_key.to_string(),
- vec![dest_object_version],
+ false,
);
+ garage.version_table.insert(&dest_version).await?;
+
+ // Fill in block list for version and insert block refs
+ for (bk, bv) in source_version.blocks.items().iter() {
+ dest_version.blocks.put(*bk, *bv);
+ }
let dest_block_refs = dest_version
- .blocks()
+ .blocks
+ .items()
.iter()
.map(|b| BlockRef {
- block: b.hash,
+ block: b.1.hash,
version: new_uuid,
- deleted: false,
+ deleted: false.into(),
})
.collect::<Vec<_>>();
futures::try_join!(
- garage.object_table.insert(&dest_object),
garage.version_table.insert(&dest_version),
garage.block_ref_table.insert_many(&dest_block_refs[..]),
)?;
+
+ // Insert final object
+ // We do this last because otherwise there is a race condition in the case where
+ // the copy call has the same source and destination (this happens, rclone does
+ // it to update the modification timestamp for instance). If we did this concurrently
+ // with the stuff before, the block's reference counts could be decremented before
+ // they are incremented again for the new version, leading to data being deleted.
+ let dest_object_version = ObjectVersion {
+ uuid: new_uuid,
+ timestamp: new_timestamp,
+ state: ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
+ new_meta,
+ *first_block_hash,
+ )),
+ };
+ let dest_object = Object::new(
+ dest_bucket.to_string(),
+ dest_key.to_string(),
+ vec![dest_object_version],
+ );
+ garage.object_table.insert(&dest_object).await?;
}
}
- let now = Utc::now();
- let last_modified = now.to_rfc3339_opts(SecondsFormat::Secs, true);
+ let last_modified = msec_to_rfc3339(new_timestamp);
let mut xml = String::new();
writeln!(&mut xml, r#"<?xml version="1.0" encoding="UTF-8"?>"#).unwrap();
writeln!(&mut xml, r#"<CopyObjectResult>"#).unwrap();