aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3/put.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/s3/put.rs')
-rw-r--r--src/api/s3/put.rs47
1 files changed, 32 insertions, 15 deletions
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index c7ac5030..606facc4 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -3,6 +3,7 @@ use std::sync::Arc;
use base64::prelude::*;
use futures::prelude::*;
+use futures::try_join;
use hyper::body::{Body, Bytes};
use hyper::header::{HeaderMap, HeaderValue};
use hyper::{Request, Response};
@@ -35,7 +36,7 @@ pub async fn handle_put(
garage: Arc<Garage>,
req: Request<Body>,
bucket: &Bucket,
- key: &str,
+ key: &String,
content_sha256: Option<Hash>,
) -> Result<Response<Body>, Error> {
// Retrieve interesting headers from request
@@ -68,16 +69,24 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
headers: ObjectVersionHeaders,
body: S,
bucket: &Bucket,
- key: &str,
+ key: &String,
content_md5: Option<String>,
content_sha256: Option<FixedBytes32>,
) -> Result<(Uuid, String), Error> {
+ let mut chunker = StreamChunker::new(body, garage.config.block_size);
+ let (first_block_opt, existing_object) = try_join!(
+ chunker.next(),
+ garage
+ .object_table
+ .get(&bucket.id, key)
+ .map_err(Error::from),
+ )?;
+
+ let first_block = first_block_opt.unwrap_or_default();
+
// Generate identity of new version
let version_uuid = gen_uuid();
- let version_timestamp = now_msec();
-
- let mut chunker = StreamChunker::new(body, garage.config.block_size);
- let first_block = chunker.next().await?.unwrap_or_default();
+ let version_timestamp = next_timestamp(existing_object.as_ref());
// If body is small enough, store it directly in the object table
// as "inline data". We can then return immediately.
@@ -97,7 +106,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
content_sha256,
)?;
- check_quotas(&garage, bucket, key, size).await?;
+ check_quotas(&garage, bucket, size, existing_object.as_ref()).await?;
let object_version = ObjectVersion {
uuid: version_uuid,
@@ -176,7 +185,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
content_sha256,
)?;
- check_quotas(&garage, bucket, key, total_size).await?;
+ check_quotas(&garage, bucket, total_size, existing_object.as_ref()).await?;
// Save final object state, marked as Complete
let md5sum_hex = hex::encode(data_md5sum);
@@ -229,19 +238,19 @@ pub(crate) fn ensure_checksum_matches(
pub(crate) async fn check_quotas(
garage: &Arc<Garage>,
bucket: &Bucket,
- key: &str,
size: u64,
+ prev_object: Option<&Object>,
) -> Result<(), Error> {
let quotas = bucket.state.as_option().unwrap().quotas.get();
if quotas.max_objects.is_none() && quotas.max_size.is_none() {
return Ok(());
};
- let key = key.to_string();
- let (prev_object, counters) = futures::try_join!(
- garage.object_table.get(&bucket.id, &key),
- garage.object_counter_table.table.get(&bucket.id, &EmptyKey),
- )?;
+ let counters = garage
+ .object_counter_table
+ .table
+ .get(&bucket.id, &EmptyKey)
+ .await?;
let counters = counters
.map(|x| x.filtered_values(&garage.system.ring.borrow()))
@@ -275,7 +284,7 @@ pub(crate) async fn check_quotas(
if cnt_size_diff > 0 && current_size + cnt_size_diff > ms as i64 {
return Err(Error::forbidden(format!(
"Bucket size quota is reached, maximum total size of objects for this bucket: {}. The bucket is already {} bytes, and this object would add {} bytes.",
- ms, current_size, size
+ ms, current_size, cnt_size_diff
)));
}
}
@@ -519,3 +528,11 @@ pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<ObjectVers
other,
})
}
+
+pub(crate) fn next_timestamp(existing_object: Option<&Object>) -> u64 {
+ existing_object
+ .as_ref()
+ .and_then(|obj| obj.versions().iter().map(|v| v.timestamp).max())
+ .map(|t| std::cmp::max(t + 1, now_msec()))
+ .unwrap_or_else(now_msec)
+}