aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/s3')
-rw-r--r--src/api/s3/bucket.rs2
-rw-r--r--src/api/s3/error.rs2
-rw-r--r--src/api/s3/list.rs11
-rw-r--r--src/api/s3/post_object.rs5
-rw-r--r--src/api/s3/put.rs96
5 files changed, 74 insertions, 42 deletions
diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs
index 8471385f..733981e1 100644
--- a/src/api/s3/bucket.rs
+++ b/src/api/s3/bucket.rs
@@ -305,7 +305,7 @@ fn parse_create_bucket_xml(xml_bytes: &[u8]) -> Option<Option<String>> {
let mut ret = None;
for item in cbc.children() {
if item.has_tag_name("LocationConstraint") {
- if ret != None {
+ if ret.is_some() {
return None;
}
ret = Some(item.text()?.to_string());
diff --git a/src/api/s3/error.rs b/src/api/s3/error.rs
index 67009d63..c50cff9f 100644
--- a/src/api/s3/error.rs
+++ b/src/api/s3/error.rs
@@ -21,7 +21,7 @@ pub enum Error {
// Category: cannot process
/// Authorization Header Malformed
- #[error(display = "Authorization header malformed, expected scope: {}", _0)]
+ #[error(display = "Authorization header malformed, unexpected scope: {}", _0)]
AuthorizationHeaderMalformed(String),
/// The object requested don't exists
diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs
index e5f486c8..5cb0d65a 100644
--- a/src/api/s3/list.rs
+++ b/src/api/s3/list.rs
@@ -3,6 +3,7 @@ use std::collections::{BTreeMap, BTreeSet};
use std::iter::{Iterator, Peekable};
use std::sync::Arc;
+use base64::prelude::*;
use hyper::{Body, Response};
use garage_util::data::*;
@@ -129,11 +130,11 @@ pub async fn handle_list(
next_continuation_token: match (query.is_v2, &pagination) {
(true, Some(RangeBegin::AfterKey { key })) => Some(s3_xml::Value(format!(
"]{}",
- base64::encode(key.as_bytes())
+ BASE64_STANDARD.encode(key.as_bytes())
))),
(true, Some(RangeBegin::IncludingKey { key, .. })) => Some(s3_xml::Value(format!(
"[{}",
- base64::encode(key.as_bytes())
+ BASE64_STANDARD.encode(key.as_bytes())
))),
_ => None,
},
@@ -583,14 +584,16 @@ impl ListObjectsQuery {
(Some(token), _) => match &token[..1] {
"[" => Ok(RangeBegin::IncludingKey {
key: String::from_utf8(
- base64::decode(token[1..].as_bytes())
+ BASE64_STANDARD
+ .decode(token[1..].as_bytes())
.ok_or_bad_request("Invalid continuation token")?,
)?,
fallback_key: None,
}),
"]" => Ok(RangeBegin::AfterKey {
key: String::from_utf8(
- base64::decode(token[1..].as_bytes())
+ BASE64_STANDARD
+ .decode(token[1..].as_bytes())
.ok_or_bad_request("Invalid continuation token")?,
)?,
}),
diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs
index d063faa4..f2098ab0 100644
--- a/src/api/s3/post_object.rs
+++ b/src/api/s3/post_object.rs
@@ -4,6 +4,7 @@ use std::ops::RangeInclusive;
use std::sync::Arc;
use std::task::{Context, Poll};
+use base64::prelude::*;
use bytes::Bytes;
use chrono::{DateTime, Duration, Utc};
use futures::{Stream, StreamExt};
@@ -138,7 +139,9 @@ pub async fn handle_post_object(
.get_existing_bucket(bucket_id)
.await?;
- let decoded_policy = base64::decode(&policy).ok_or_bad_request("Invalid policy")?;
+ let decoded_policy = BASE64_STANDARD
+ .decode(policy)
+ .ok_or_bad_request("Invalid policy")?;
let decoded_policy: Policy =
serde_json::from_slice(&decoded_policy).ok_or_bad_request("Invalid policy")?;
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index 97b8e4e3..350ab884 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -1,6 +1,7 @@
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::Arc;
+use base64::prelude::*;
use futures::prelude::*;
use hyper::body::{Body, Bytes};
use hyper::header::{HeaderMap, HeaderValue};
@@ -119,6 +120,17 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
return Ok((version_uuid, data_md5sum_hex));
}
+ // The following consists in many steps that can each fail.
+ // Keep track that some cleanup will be needed if things fail
+ // before everything is finished (cleanup is done using the Drop trait).
+ let mut interrupted_cleanup = InterruptedCleanup(Some((
+ garage.clone(),
+ bucket.id,
+ key.into(),
+ version_uuid,
+ version_timestamp,
+ )));
+
// Write version identifier in object table so that we have a trace
// that we are uploading something
let mut object_version = ObjectVersion {
@@ -139,44 +151,27 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
// Transfer data and verify checksum
let first_block_hash = async_blake2sum(first_block.clone()).await;
- let tx_result = (|| async {
- let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks(
- &garage,
- &version,
- 1,
- first_block,
- first_block_hash,
- &mut chunker,
- )
- .await?;
-
- ensure_checksum_matches(
- data_md5sum.as_slice(),
- data_sha256sum,
- content_md5.as_deref(),
- content_sha256,
- )?;
-
- check_quotas(&garage, bucket, key, total_size).await?;
+ let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks(
+ &garage,
+ &version,
+ 1,
+ first_block,
+ first_block_hash,
+ &mut chunker,
+ )
+ .await?;
- Ok((total_size, data_md5sum))
- })()
- .await;
+ ensure_checksum_matches(
+ data_md5sum.as_slice(),
+ data_sha256sum,
+ content_md5.as_deref(),
+ content_sha256,
+ )?;
- // If something went wrong, clean up
- let (total_size, md5sum_arr) = match tx_result {
- Ok(rv) => rv,
- Err(e) => {
- // Mark object as aborted, this will free the blocks further down
- object_version.state = ObjectVersionState::Aborted;
- let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]);
- garage.object_table.insert(&object).await?;
- return Err(e);
- }
- };
+ check_quotas(&garage, bucket, key, total_size).await?;
// Save final object state, marked as Complete
- let md5sum_hex = hex::encode(md5sum_arr);
+ let md5sum_hex = hex::encode(data_md5sum);
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
ObjectVersionMeta {
headers,
@@ -188,6 +183,10 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
let object = Object::new(bucket.id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
+ // We were not interrupted, everything went fine.
+ // We won't have to clean up on drop.
+ interrupted_cleanup.cancel();
+
Ok((version_uuid, md5sum_hex))
}
@@ -209,7 +208,7 @@ fn ensure_checksum_matches(
}
}
if let Some(expected_md5) = content_md5 {
- if expected_md5.trim_matches('"') != base64::encode(data_md5sum) {
+ if expected_md5.trim_matches('"') != BASE64_STANDARD.encode(data_md5sum) {
return Err(Error::bad_request("Unable to validate content-md5"));
} else {
trace!("Successfully validated content-md5");
@@ -426,6 +425,33 @@ pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<Body> {
.unwrap()
}
+struct InterruptedCleanup(Option<(Arc<Garage>, Uuid, String, Uuid, u64)>);
+
+impl InterruptedCleanup {
+ fn cancel(&mut self) {
+ drop(self.0.take());
+ }
+}
+impl Drop for InterruptedCleanup {
+ fn drop(&mut self) {
+ if let Some((garage, bucket_id, key, version_uuid, version_ts)) = self.0.take() {
+ tokio::spawn(async move {
+ let object_version = ObjectVersion {
+ uuid: version_uuid,
+ timestamp: version_ts,
+ state: ObjectVersionState::Aborted,
+ };
+ let object = Object::new(bucket_id, key, vec![object_version]);
+ if let Err(e) = garage.object_table.insert(&object).await {
+ warn!("Cannot cleanup after aborted PutObject: {}", e);
+ }
+ });
+ }
+ }
+}
+
+// ----
+
pub async fn handle_create_multipart_upload(
garage: Arc<Garage>,
req: &Request<Body>,