aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/s3')
-rw-r--r--src/api/s3/api_server.rs2
-rw-r--r--src/api/s3/delete.rs42
-rw-r--r--src/api/s3/list.rs72
-rw-r--r--src/api/s3/multipart.rs13
-rw-r--r--src/api/s3/put.rs47
5 files changed, 101 insertions, 75 deletions
diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs
index d675ab61..887839dd 100644
--- a/src/api/s3/api_server.rs
+++ b/src/api/s3/api_server.rs
@@ -344,7 +344,7 @@ impl ApiHandler for S3ApiServer {
bucket_id,
key,
upload_id,
- part_number_marker: part_number_marker.map(|p| p.clamp(1, 10000)),
+ part_number_marker: part_number_marker.map(|p| p.min(10000)),
max_parts: max_parts.unwrap_or(1000).clamp(1, 1000),
},
)
diff --git a/src/api/s3/delete.rs b/src/api/s3/delete.rs
index b337155f..1c491eac 100644
--- a/src/api/s3/delete.rs
+++ b/src/api/s3/delete.rs
@@ -3,12 +3,12 @@ use std::sync::Arc;
use hyper::{Body, Request, Response, StatusCode};
use garage_util::data::*;
-use garage_util::time::*;
use garage_model::garage::Garage;
use garage_model::s3::object_table::*;
use crate::s3::error::*;
+use crate::s3::put::next_timestamp;
use crate::s3::xml as s3_xml;
use crate::signature::verify_signed_content;
@@ -23,40 +23,36 @@ async fn handle_delete_internal(
.await?
.ok_or(Error::NoSuchKey)?; // No need to delete
- let interesting_versions = object.versions().iter().filter(|v| {
- !matches!(
- v.state,
- ObjectVersionState::Aborted
- | ObjectVersionState::Complete(ObjectVersionData::DeleteMarker)
- )
- });
-
- let mut version_to_delete = None;
- let mut timestamp = now_msec();
- for v in interesting_versions {
- if v.timestamp + 1 > timestamp || version_to_delete.is_none() {
- version_to_delete = Some(v.uuid);
+ let del_timestamp = next_timestamp(Some(&object));
+ let del_uuid = gen_uuid();
+
+ let deleted_version = object
+ .versions()
+ .iter()
+ .rev()
+ .find(|v| !matches!(&v.state, ObjectVersionState::Aborted))
+ .or_else(|| object.versions().iter().rev().next());
+ let deleted_version = match deleted_version {
+ Some(dv) => dv.uuid,
+ None => {
+ warn!("Object has no versions: {:?}", object);
+ Uuid::from([0u8; 32])
}
- timestamp = std::cmp::max(timestamp, v.timestamp + 1);
- }
-
- let deleted_version = version_to_delete.ok_or(Error::NoSuchKey)?;
-
- let version_uuid = gen_uuid();
+ };
let object = Object::new(
bucket_id,
key.into(),
vec![ObjectVersion {
- uuid: version_uuid,
- timestamp,
+ uuid: del_uuid,
+ timestamp: del_timestamp,
state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker),
}],
);
garage.object_table.insert(&object).await?;
- Ok((deleted_version, version_uuid))
+ Ok((deleted_version, del_uuid))
}
pub async fn handle_delete(
diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs
index 33d62518..1b9e8cd5 100644
--- a/src/api/s3/list.rs
+++ b/src/api/s3/list.rs
@@ -426,8 +426,10 @@ where
// Drop the first key if needed
// Only AfterKey requires it according to the S3 spec and our implem.
match (&cursor, iter.peek()) {
- (RangeBegin::AfterKey { key }, Some(object)) if &object.key == key => iter.next(),
- (_, _) => None,
+ (RangeBegin::AfterKey { key }, Some(object)) if &object.key == key => {
+ iter.next();
+ }
+ _ => (),
};
while let Some(object) = iter.peek() {
@@ -436,16 +438,22 @@ where
return Ok(None);
}
- cursor = match acc.extract(query, &cursor, &mut iter) {
- ExtractionResult::Extracted { key } => RangeBegin::AfterKey { key },
+ match acc.extract(query, &cursor, &mut iter) {
+ ExtractionResult::Extracted { key } => {
+ cursor = RangeBegin::AfterKey { key };
+ }
ExtractionResult::SkipTo { key, fallback_key } => {
- RangeBegin::IncludingKey { key, fallback_key }
+ cursor = RangeBegin::IncludingKey { key, fallback_key };
}
ExtractionResult::FilledAtUpload { key, upload } => {
- return Ok(Some(RangeBegin::AfterUpload { key, upload }))
+ return Ok(Some(RangeBegin::AfterUpload { key, upload }));
+ }
+ ExtractionResult::Filled => {
+ return Ok(Some(cursor));
+ }
+ ExtractionResult::NoMore => {
+ return Ok(None);
}
- ExtractionResult::Filled => return Ok(Some(cursor)),
- ExtractionResult::NoMore => return Ok(None),
};
}
@@ -519,8 +527,8 @@ fn fetch_part_info<'a>(
/// This key can be the prefix in the base case, or intermediate
/// points in the dataset if we are continuing a previous listing.
impl ListObjectsQuery {
- fn build_accumulator(&self) -> Accumulator<String, ObjectInfo> {
- Accumulator::<String, ObjectInfo>::new(self.common.page_size)
+ fn build_accumulator(&self) -> ObjectAccumulator {
+ ObjectAccumulator::new(self.common.page_size)
}
fn begin(&self) -> Result<RangeBegin, Error> {
@@ -529,9 +537,10 @@ impl ListObjectsQuery {
// In V2 mode, the continuation token is defined as an opaque
// string in the spec, so we can do whatever we want with it.
// In our case, it is defined as either [ or ] (for include
+ // or exclude), followed by a base64-encoded string
// representing the key to start with.
- (Some(token), _) => match &token[..1] {
- "[" => Ok(RangeBegin::IncludingKey {
+ (Some(token), _) => match &token.get(..1) {
+ Some("[") => Ok(RangeBegin::IncludingKey {
key: String::from_utf8(
BASE64_STANDARD
.decode(token[1..].as_bytes())
@@ -539,7 +548,7 @@ impl ListObjectsQuery {
)?,
fallback_key: None,
}),
- "]" => Ok(RangeBegin::AfterKey {
+ Some("]") => Ok(RangeBegin::AfterKey {
key: String::from_utf8(
BASE64_STANDARD
.decode(token[1..].as_bytes())
@@ -580,8 +589,8 @@ impl ListObjectsQuery {
}
impl ListMultipartUploadsQuery {
- fn build_accumulator(&self) -> Accumulator<Uuid, UploadInfo> {
- Accumulator::<Uuid, UploadInfo>::new(self.common.page_size)
+ fn build_accumulator(&self) -> UploadAccumulator {
+ UploadAccumulator::new(self.common.page_size)
}
fn begin(&self) -> Result<RangeBegin, Error> {
@@ -665,6 +674,7 @@ impl<K: std::cmp::Ord, V> Accumulator<K, V> {
Some(p) => p,
None => return None,
};
+ assert!(pfx.starts_with(&query.prefix));
// Try to register this prefix
// If not possible, we can return early
@@ -675,8 +685,11 @@ impl<K: std::cmp::Ord, V> Accumulator<K, V> {
// We consume the whole common prefix from the iterator
let mut last_pfx_key = &object.key;
loop {
- last_pfx_key = match objects.peek() {
- Some(o) if o.key.starts_with(pfx) => &o.key,
+ match objects.peek() {
+ Some(o) if o.key.starts_with(pfx) => {
+ last_pfx_key = &o.key;
+ objects.next();
+ }
Some(_) => {
return Some(ExtractionResult::Extracted {
key: last_pfx_key.to_owned(),
@@ -692,8 +705,6 @@ impl<K: std::cmp::Ord, V> Accumulator<K, V> {
}
}
};
-
- objects.next();
}
}
@@ -708,12 +719,11 @@ impl<K: std::cmp::Ord, V> Accumulator<K, V> {
}
// Otherwise, we need to check if we can add it
- match self.is_full() {
- true => false,
- false => {
- self.common_prefixes.insert(key);
- true
- }
+ if self.is_full() {
+ false
+ } else {
+ self.common_prefixes.insert(key);
+ true
}
}
@@ -721,12 +731,11 @@ impl<K: std::cmp::Ord, V> Accumulator<K, V> {
// It is impossible to add twice a key, this is an error
assert!(!self.keys.contains_key(&key));
- match self.is_full() {
- true => false,
- false => {
- self.keys.insert(key, value);
- true
- }
+ if self.is_full() {
+ false
+ } else {
+ self.keys.insert(key, value);
+ true
}
}
}
@@ -743,6 +752,7 @@ impl ExtractAccumulator for ObjectAccumulator {
}
let object = objects.next().expect("This iterator can not be empty as it is checked earlier in the code. This is a logic bug, please report it.");
+ assert!(object.key.starts_with(&query.prefix));
let version = match object.versions().iter().find(|x| x.is_data()) {
Some(v) => v,
diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs
index 52ea8e78..6b786318 100644
--- a/src/api/s3/multipart.rs
+++ b/src/api/s3/multipart.rs
@@ -9,7 +9,6 @@ use md5::{Digest as Md5Digest, Md5};
use garage_table::*;
use garage_util::async_hash::*;
use garage_util::data::*;
-use garage_util::time::*;
use garage_model::bucket_table::Bucket;
use garage_model::garage::Garage;
@@ -30,10 +29,13 @@ pub async fn handle_create_multipart_upload(
req: &Request<Body>,
bucket_name: &str,
bucket_id: Uuid,
- key: &str,
+ key: &String,
) -> Result<Response<Body>, Error> {
+ let existing_object = garage.object_table.get(&bucket_id, &key).await?;
+
let upload_id = gen_uuid();
- let timestamp = now_msec();
+ let timestamp = next_timestamp(existing_object.as_ref());
+
let headers = get_headers(req.headers())?;
// Create object in object table
@@ -233,7 +235,8 @@ pub async fn handle_complete_multipart_upload(
// Get object and multipart upload
let key = key.to_string();
- let (_, mut object_version, mpu) = get_upload(&garage, &bucket.id, &key, &upload_id).await?;
+ let (object, mut object_version, mpu) =
+ get_upload(&garage, &bucket.id, &key, &upload_id).await?;
if mpu.parts.is_empty() {
return Err(Error::bad_request("No data was uploaded"));
@@ -331,7 +334,7 @@ pub async fn handle_complete_multipart_upload(
// Calculate total size of final object
let total_size = parts.iter().map(|x| x.size.unwrap()).sum();
- if let Err(e) = check_quotas(&garage, bucket, &key, total_size).await {
+ if let Err(e) = check_quotas(&garage, bucket, total_size, Some(&object)).await {
object_version.state = ObjectVersionState::Aborted;
let final_object = Object::new(bucket.id, key.clone(), vec![object_version]);
garage.object_table.insert(&final_object).await?;
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index c7ac5030..606facc4 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -3,6 +3,7 @@ use std::sync::Arc;
use base64::prelude::*;
use futures::prelude::*;
+use futures::try_join;
use hyper::body::{Body, Bytes};
use hyper::header::{HeaderMap, HeaderValue};
use hyper::{Request, Response};
@@ -35,7 +36,7 @@ pub async fn handle_put(
garage: Arc<Garage>,
req: Request<Body>,
bucket: &Bucket,
- key: &str,
+ key: &String,
content_sha256: Option<Hash>,
) -> Result<Response<Body>, Error> {
// Retrieve interesting headers from request
@@ -68,16 +69,24 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
headers: ObjectVersionHeaders,
body: S,
bucket: &Bucket,
- key: &str,
+ key: &String,
content_md5: Option<String>,
content_sha256: Option<FixedBytes32>,
) -> Result<(Uuid, String), Error> {
+ let mut chunker = StreamChunker::new(body, garage.config.block_size);
+ let (first_block_opt, existing_object) = try_join!(
+ chunker.next(),
+ garage
+ .object_table
+ .get(&bucket.id, key)
+ .map_err(Error::from),
+ )?;
+
+ let first_block = first_block_opt.unwrap_or_default();
+
// Generate identity of new version
let version_uuid = gen_uuid();
- let version_timestamp = now_msec();
-
- let mut chunker = StreamChunker::new(body, garage.config.block_size);
- let first_block = chunker.next().await?.unwrap_or_default();
+ let version_timestamp = next_timestamp(existing_object.as_ref());
// If body is small enough, store it directly in the object table
// as "inline data". We can then return immediately.
@@ -97,7 +106,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
content_sha256,
)?;
- check_quotas(&garage, bucket, key, size).await?;
+ check_quotas(&garage, bucket, size, existing_object.as_ref()).await?;
let object_version = ObjectVersion {
uuid: version_uuid,
@@ -176,7 +185,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
content_sha256,
)?;
- check_quotas(&garage, bucket, key, total_size).await?;
+ check_quotas(&garage, bucket, total_size, existing_object.as_ref()).await?;
// Save final object state, marked as Complete
let md5sum_hex = hex::encode(data_md5sum);
@@ -229,19 +238,19 @@ pub(crate) fn ensure_checksum_matches(
pub(crate) async fn check_quotas(
garage: &Arc<Garage>,
bucket: &Bucket,
- key: &str,
size: u64,
+ prev_object: Option<&Object>,
) -> Result<(), Error> {
let quotas = bucket.state.as_option().unwrap().quotas.get();
if quotas.max_objects.is_none() && quotas.max_size.is_none() {
return Ok(());
};
- let key = key.to_string();
- let (prev_object, counters) = futures::try_join!(
- garage.object_table.get(&bucket.id, &key),
- garage.object_counter_table.table.get(&bucket.id, &EmptyKey),
- )?;
+ let counters = garage
+ .object_counter_table
+ .table
+ .get(&bucket.id, &EmptyKey)
+ .await?;
let counters = counters
.map(|x| x.filtered_values(&garage.system.ring.borrow()))
@@ -275,7 +284,7 @@ pub(crate) async fn check_quotas(
if cnt_size_diff > 0 && current_size + cnt_size_diff > ms as i64 {
return Err(Error::forbidden(format!(
"Bucket size quota is reached, maximum total size of objects for this bucket: {}. The bucket is already {} bytes, and this object would add {} bytes.",
- ms, current_size, size
+ ms, current_size, cnt_size_diff
)));
}
}
@@ -519,3 +528,11 @@ pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<ObjectVers
other,
})
}
+
+pub(crate) fn next_timestamp(existing_object: Option<&Object>) -> u64 {
+ existing_object
+ .as_ref()
+ .and_then(|obj| obj.versions().iter().map(|v| v.timestamp).max())
+ .map(|t| std::cmp::max(t + 1, now_msec()))
+ .unwrap_or_else(now_msec)
+}