aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api/api_server.rs5
-rw-r--r--src/api/s3_copy.rs69
-rw-r--r--src/api/s3_delete.rs1
-rw-r--r--src/api/s3_list.rs6
-rw-r--r--src/api/s3_put.rs26
-rw-r--r--src/garage/cli.rs2
-rw-r--r--src/model/block.rs4
-rw-r--r--src/rpc/membership.rs1
-rw-r--r--src/table/crdt/lww.rs2
-rw-r--r--src/table/crdt/lww_map.rs2
-rw-r--r--src/table/gc.rs10
-rw-r--r--src/util/Cargo.toml1
-rw-r--r--src/util/data.rs8
-rw-r--r--src/util/lib.rs1
-rw-r--r--src/util/time.rs16
15 files changed, 110 insertions, 44 deletions
diff --git a/src/api/api_server.rs b/src/api/api_server.rs
index c6b1d483..bc98686d 100644
--- a/src/api/api_server.rs
+++ b/src/api/api_server.rs
@@ -137,7 +137,10 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
)));
}
let source_key = source_key.ok_or_bad_request("No source key specified")?;
- Ok(handle_copy(garage, &bucket, &key, &source_bucket, &source_key).await?)
+ Ok(
+ handle_copy(garage, &req, &bucket, &key, &source_bucket, &source_key)
+ .await?,
+ )
} else {
// PutObject query
Ok(handle_put(garage, req, &bucket, &key, content_sha256).await?)
diff --git a/src/api/s3_copy.rs b/src/api/s3_copy.rs
index 8407faee..187fe347 100644
--- a/src/api/s3_copy.rs
+++ b/src/api/s3_copy.rs
@@ -1,11 +1,11 @@
use std::fmt::Write;
use std::sync::Arc;
-use chrono::{SecondsFormat, Utc};
-use hyper::{Body, Response};
+use hyper::{Body, Request, Response};
use garage_table::*;
use garage_util::data::*;
+use garage_util::time::*;
use garage_model::block_ref_table::*;
use garage_model::garage::Garage;
@@ -13,9 +13,11 @@ use garage_model::object_table::*;
use garage_model::version_table::*;
use crate::error::*;
+use crate::s3_put::get_headers;
pub async fn handle_copy(
garage: Arc<Garage>,
+ req: &Request<Body>,
dest_bucket: &str,
dest_key: &str,
source_bucket: &str,
@@ -42,25 +44,44 @@ pub async fn handle_copy(
let new_uuid = gen_uuid();
let new_timestamp = now_msec();
- let dest_object_version = ObjectVersion {
- uuid: new_uuid,
- timestamp: new_timestamp,
- state: ObjectVersionState::Complete(source_last_state.clone()),
- };
- let dest_object = Object::new(
- dest_bucket.to_string(),
- dest_key.to_string(),
- vec![dest_object_version],
- );
- match source_last_state {
+ // Implement x-amz-metadata-directive: REPLACE
+ let old_meta = match source_last_state {
ObjectVersionData::DeleteMarker => {
return Err(Error::NotFound);
}
- ObjectVersionData::Inline(_meta, _bytes) => {
+ ObjectVersionData::Inline(meta, _bytes) => meta,
+ ObjectVersionData::FirstBlock(meta, _fbh) => meta,
+ };
+ let new_meta = match req.headers().get("x-amz-metadata-directive") {
+ Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => ObjectVersionMeta {
+ headers: get_headers(req)?,
+ size: old_meta.size,
+ etag: old_meta.etag.clone(),
+ },
+ _ => old_meta.clone(),
+ };
+
+ // Save object copy
+ match source_last_state {
+ ObjectVersionData::DeleteMarker => unreachable!(),
+ ObjectVersionData::Inline(_meta, bytes) => {
+ let dest_object_version = ObjectVersion {
+ uuid: new_uuid,
+ timestamp: new_timestamp,
+ state: ObjectVersionState::Complete(ObjectVersionData::Inline(
+ new_meta,
+ bytes.clone(),
+ )),
+ };
+ let dest_object = Object::new(
+ dest_bucket.to_string(),
+ dest_key.to_string(),
+ vec![dest_object_version],
+ );
garage.object_table.insert(&dest_object).await?;
}
- ObjectVersionData::FirstBlock(meta, _first_block_hash) => {
+ ObjectVersionData::FirstBlock(_meta, first_block_hash) => {
// Get block list from source version
let source_version = garage
.version_table
@@ -74,7 +95,7 @@ pub async fn handle_copy(
let tmp_dest_object_version = ObjectVersion {
uuid: new_uuid,
timestamp: new_timestamp,
- state: ObjectVersionState::Uploading(meta.headers.clone()),
+ state: ObjectVersionState::Uploading(new_meta.headers.clone()),
};
let tmp_dest_object = Object::new(
dest_bucket.to_string(),
@@ -120,12 +141,24 @@ pub async fn handle_copy(
// it to update the modification timestamp for instance). If we did this concurrently
// with the stuff before, the block's reference counts could be decremented before
// they are incremented again for the new version, leading to data being deleted.
+ let dest_object_version = ObjectVersion {
+ uuid: new_uuid,
+ timestamp: new_timestamp,
+ state: ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
+ new_meta,
+ *first_block_hash,
+ )),
+ };
+ let dest_object = Object::new(
+ dest_bucket.to_string(),
+ dest_key.to_string(),
+ vec![dest_object_version],
+ );
garage.object_table.insert(&dest_object).await?;
}
}
- let now = Utc::now(); // FIXME use the unix timestamp from above
- let last_modified = now.to_rfc3339_opts(SecondsFormat::Secs, true);
+ let last_modified = msec_to_rfc3339(new_timestamp);
let mut xml = String::new();
writeln!(&mut xml, r#"<?xml version="1.0" encoding="UTF-8"?>"#).unwrap();
writeln!(&mut xml, r#"<CopyObjectResult>"#).unwrap();
diff --git a/src/api/s3_delete.rs b/src/api/s3_delete.rs
index 7f752566..bb42d90c 100644
--- a/src/api/s3_delete.rs
+++ b/src/api/s3_delete.rs
@@ -4,6 +4,7 @@ use std::sync::Arc;
use hyper::{Body, Request, Response};
use garage_util::data::*;
+use garage_util::time::*;
use garage_model::garage::Garage;
use garage_model::object_table::*;
diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs
index 16d96a49..4d6c32bc 100644
--- a/src/api/s3_list.rs
+++ b/src/api/s3_list.rs
@@ -2,10 +2,10 @@ use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::fmt::Write;
use std::sync::Arc;
-use chrono::{DateTime, NaiveDateTime, SecondsFormat, Utc};
use hyper::{Body, Response};
use garage_util::error::Error as GarageError;
+use garage_util::time::*;
use garage_model::garage::Garage;
use garage_model::object_table::*;
@@ -247,9 +247,7 @@ pub async fn handle_list(
}
for (key, info) in result_keys.iter() {
- let last_modif = NaiveDateTime::from_timestamp(info.last_modified as i64 / 1000, 0);
- let last_modif = DateTime::<Utc>::from_utc(last_modif, Utc);
- let last_modif = last_modif.to_rfc3339_opts(SecondsFormat::Millis, true);
+ let last_modif = msec_to_rfc3339(info.last_modified);
writeln!(&mut xml, "\t<Contents>").unwrap();
writeln!(
&mut xml,
diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs
index 6f675e37..ea3664bd 100644
--- a/src/api/s3_put.rs
+++ b/src/api/s3_put.rs
@@ -10,6 +10,7 @@ use sha2::{Digest as Sha256Digest, Sha256};
use garage_table::*;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
+use garage_util::time::*;
use garage_model::block::INLINE_THRESHOLD;
use garage_model::block_ref_table::*;
@@ -583,17 +584,19 @@ fn get_mime_type(req: &Request<Body>) -> Result<String, Error> {
.to_string())
}
-fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> {
+pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> {
let content_type = get_mime_type(req)?;
- let other_headers = vec![
+ let mut other = BTreeMap::new();
+
+ // Preserve standard headers
+ let standard_header = vec![
hyper::header::CACHE_CONTROL,
hyper::header::CONTENT_DISPOSITION,
hyper::header::CONTENT_ENCODING,
hyper::header::CONTENT_LANGUAGE,
hyper::header::EXPIRES,
];
- let mut other = BTreeMap::new();
- for h in other_headers.iter() {
+ for h in standard_header.iter() {
if let Some(v) = req.headers().get(h) {
match v.to_str() {
Ok(v_str) => {
@@ -605,6 +608,21 @@ fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> {
}
}
}
+
+ // Preserve x-amz-meta- headers
+ for (k, v) in req.headers().iter() {
+ if k.as_str().starts_with("x-amz-meta-") {
+ match v.to_str() {
+ Ok(v_str) => {
+ other.insert(k.to_string(), v_str.to_string());
+ }
+ Err(e) => {
+ warn!("Discarding header {}, error in .to_str(): {}", k, e);
+ }
+ }
+ }
+ }
+
Ok(ObjectVersionHeaders {
content_type,
other,
diff --git a/src/garage/cli.rs b/src/garage/cli.rs
index 56f03c8b..b5c91ffc 100644
--- a/src/garage/cli.rs
+++ b/src/garage/cli.rs
@@ -5,8 +5,8 @@ use std::path::PathBuf;
use serde::{Deserialize, Serialize};
use structopt::StructOpt;
-use garage_util::data::*;
use garage_util::error::Error;
+use garage_util::time::*;
use garage_rpc::membership::*;
use garage_rpc::ring::*;
diff --git a/src/model/block.rs b/src/model/block.rs
index 5934f20c..36ad867a 100644
--- a/src/model/block.rs
+++ b/src/model/block.rs
@@ -11,9 +11,9 @@ use tokio::fs;
use tokio::prelude::*;
use tokio::sync::{watch, Mutex, Notify};
-use garage_util::data;
use garage_util::data::*;
use garage_util::error::Error;
+use garage_util::time::*;
use garage_rpc::membership::System;
use garage_rpc::rpc_client::*;
@@ -174,7 +174,7 @@ impl BlockManager {
f.read_to_end(&mut data).await?;
drop(f);
- if data::blake2sum(&data[..]) != *hash {
+ if blake2sum(&data[..]) != *hash {
let _lock = self.data_dir_lock.lock().await;
warn!(
"Block {:?} is corrupted. Renaming to .corrupted and resyncing.",
diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs
index e1dc297e..6636e50b 100644
--- a/src/rpc/membership.rs
+++ b/src/rpc/membership.rs
@@ -18,6 +18,7 @@ use tokio::sync::Mutex;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
+use garage_util::time::*;
use crate::consul::get_consul_nodes;
use crate::ring::*;
diff --git a/src/table/crdt/lww.rs b/src/table/crdt/lww.rs
index 9a3ab671..25ecdb07 100644
--- a/src/table/crdt/lww.rs
+++ b/src/table/crdt/lww.rs
@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
-use garage_util::data::now_msec;
+use garage_util::time::now_msec;
use crate::crdt::crdt::*;
diff --git a/src/table/crdt/lww_map.rs b/src/table/crdt/lww_map.rs
index bd40f368..7b372191 100644
--- a/src/table/crdt/lww_map.rs
+++ b/src/table/crdt/lww_map.rs
@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
-use garage_util::data::now_msec;
+use garage_util::time::now_msec;
use crate::crdt::crdt::*;
diff --git a/src/table/gc.rs b/src/table/gc.rs
index 5b7f1ee7..c13c8234 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -75,17 +75,19 @@ where
match self.gc_loop_iter().await {
Ok(true) => {
// Stuff was done, loop imediately
+ continue;
}
Ok(false) => {
- select! {
- _ = tokio::time::delay_for(Duration::from_secs(10)).fuse() => (),
- _ = must_exit.recv().fuse() => (),
- }
+ // Nothing was done, sleep for some time (below)
}
Err(e) => {
warn!("({}) Error doing GC: {}", self.data.name, e);
}
}
+ select! {
+ _ = tokio::time::delay_for(Duration::from_secs(10)).fuse() => (),
+ _ = must_exit.recv().fuse() => (),
+ }
}
Ok(())
}
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index 35130c96..7bb7cb31 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -27,6 +27,7 @@ toml = "0.5"
rmp-serde = "0.14.3"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_json = "1.0"
+chrono = "0.4"
futures = "0.3"
futures-util = "0.3"
diff --git a/src/util/data.rs b/src/util/data.rs
index 0dbd6df4..591b7605 100644
--- a/src/util/data.rs
+++ b/src/util/data.rs
@@ -2,7 +2,6 @@ use rand::Rng;
use serde::de::{self, Visitor};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::fmt;
-use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Default, PartialOrd, Ord, Clone, Hash, PartialEq, Copy)]
pub struct FixedBytes32([u8; 32]);
@@ -119,13 +118,6 @@ pub fn gen_uuid() -> UUID {
rand::thread_rng().gen::<[u8; 32]>().into()
}
-pub fn now_msec() -> u64 {
- SystemTime::now()
- .duration_since(UNIX_EPOCH)
- .expect("Fix your clock :o")
- .as_millis() as u64
-}
-
// RMP serialization with names of fields and variants
pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
diff --git a/src/util/lib.rs b/src/util/lib.rs
index 0bf09bf6..e544a872 100644
--- a/src/util/lib.rs
+++ b/src/util/lib.rs
@@ -5,3 +5,4 @@ pub mod background;
pub mod config;
pub mod data;
pub mod error;
+pub mod time;
diff --git a/src/util/time.rs b/src/util/time.rs
new file mode 100644
index 00000000..148860e0
--- /dev/null
+++ b/src/util/time.rs
@@ -0,0 +1,16 @@
+use chrono::{SecondsFormat, TimeZone, Utc};
+use std::time::{SystemTime, UNIX_EPOCH};
+
+pub fn now_msec() -> u64 {
+ SystemTime::now()
+ .duration_since(UNIX_EPOCH)
+ .expect("Fix your clock :o")
+ .as_millis() as u64
+}
+
+pub fn msec_to_rfc3339(msecs: u64) -> String {
+ let secs = msecs as i64 / 1000;
+ let nanos = (msecs as i64 % 1000) as u32 * 1_000_000;
+ let timestamp = Utc.timestamp(secs, nanos);
+ timestamp.to_rfc3339_opts(SecondsFormat::Secs, true)
+}