aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-02-19 12:11:02 +0100
committerAlex Auvolat <alex@adnab.me>2021-02-19 12:11:02 +0100
commit76390085ef7cc0a8879e9edc039b536f90d5aa59 (patch)
treee4a9524dcf3d6613e1a881f938c6ad581e9d9e72
parent3b023c0c3b985dc1c04c8545351b5ba20cf9189e (diff)
downloadgarage-76390085ef7cc0a8879e9edc039b536f90d5aa59.tar.gz
garage-76390085ef7cc0a8879e9edc039b536f90d5aa59.zip
Small improvements in the S3 put workflow
-rw-r--r--src/api/s3_put.rs109
1 files changed, 70 insertions, 39 deletions
diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs
index c42309b2..656d84a9 100644
--- a/src/api/s3_put.rs
+++ b/src/api/s3_put.rs
@@ -27,24 +27,25 @@ pub async fn handle_put(
key: &str,
content_sha256: Option<Hash>,
) -> Result<Response<Body>, Error> {
+ // Generate identity of new version
let version_uuid = gen_uuid();
+ let version_timestamp = now_msec();
+
+ // Retrieve interesting headers from request
let headers = get_headers(&req)?;
let content_md5 = match req.headers().get("content-md5") {
Some(x) => Some(x.to_str()?.to_string()),
None => None,
};
+ // Parse body of uploaded file
let body = req.into_body();
let mut chunker = BodyChunker::new(body, garage.config.block_size);
let first_block = chunker.next().await?.unwrap_or(vec![]);
- let mut object_version = ObjectVersion {
- uuid: version_uuid,
- timestamp: now_msec(),
- state: ObjectVersionState::Uploading(headers.clone()),
- };
-
+ // If body is small enough, store it directly in the object table
+ // as "inline data". We can then return immediately.
if first_block.len() < INLINE_THRESHOLD {
let mut md5sum = Md5::new();
md5sum.update(&first_block[..]);
@@ -60,27 +61,41 @@ pub async fn handle_put(
content_sha256,
)?;
- object_version.state = ObjectVersionState::Complete(ObjectVersionData::Inline(
- ObjectVersionMeta {
- headers,
- size: first_block.len() as u64,
- etag: md5sum_hex.clone(),
- },
- first_block,
- ));
+ let object_version = ObjectVersion {
+ uuid: version_uuid,
+ timestamp: version_timestamp,
+ state: ObjectVersionState::Complete(ObjectVersionData::Inline(
+ ObjectVersionMeta {
+ headers,
+ size: first_block.len() as u64,
+ etag: md5sum_hex.clone(),
+ },
+ first_block,
+ )),
+ };
let object = Object::new(bucket.into(), key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
+
return Ok(put_response(version_uuid, md5sum_hex));
}
- let version = Version::new(version_uuid, bucket.into(), key.into(), false, vec![]);
-
- let first_block_hash = hash(&first_block[..]);
+ // Write version identifier in object table so that we have a trace
+ // that we are uploading something
+ let mut object_version = ObjectVersion {
+ uuid: version_uuid,
+ timestamp: now_msec(),
+ state: ObjectVersionState::Uploading(headers.clone()),
+ };
let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]);
garage.object_table.insert(&object).await?;
- let (total_size, md5sum_arr, sha256sum) = read_and_put_blocks(
+ // Initialize corresponding entry in version table
+ let version = Version::new(version_uuid, bucket.into(), key.into(), false, vec![]);
+ let first_block_hash = hash(&first_block[..]);
+
+ // Transfer data and verify checksum
+ let tx_result = read_and_put_blocks(
&garage,
version,
1,
@@ -88,19 +103,31 @@ pub async fn handle_put(
first_block_hash,
&mut chunker,
)
- .await?;
-
- ensure_checksum_matches(
- md5sum_arr.as_slice(),
- sha256sum,
- content_md5.as_deref(),
- content_sha256,
- )?;
-
- // TODO: if at any step we have an error, we should undo everything we did
+ .await
+ .and_then(|(total_size, md5sum_arr, sha256sum)| {
+ ensure_checksum_matches(
+ md5sum_arr.as_slice(),
+ sha256sum,
+ content_md5.as_deref(),
+ content_sha256,
+ )
+ .map(|()| (total_size, md5sum_arr))
+ });
+
+ // If something went wrong, clean up
+ let (total_size, md5sum_arr) = match tx_result {
+ Ok(rv) => rv,
+ Err(e) => {
+ // Mark object as aborted, this will free the blocks further down
+ object_version.state = ObjectVersionState::Aborted;
+ let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]);
+ garage.object_table.insert(&object).await?;
+ return Err(e);
+ }
+ };
+ // Save final object state, marked as Complete
let md5sum_hex = hex::encode(md5sum_arr);
-
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
ObjectVersionMeta {
headers,
@@ -109,7 +136,6 @@ pub async fn handle_put(
},
first_block_hash,
));
-
let object = Object::new(bucket.into(), key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
@@ -340,13 +366,12 @@ pub async fn handle_put_part(
};
// Read first chuck, and at the same time try to get object to see if it exists
- let mut chunker = BodyChunker::new(req.into_body(), garage.config.block_size);
-
let bucket = bucket.to_string();
let key = key.to_string();
- let get_object_fut = garage.object_table.get(&bucket, &key);
- let get_first_block_fut = chunker.next();
- let (object, first_block) = futures::try_join!(get_object_fut, get_first_block_fut)?;
+ let mut chunker = BodyChunker::new(req.into_body(), garage.config.block_size);
+
+ let (object, first_block) =
+ futures::try_join!(garage.object_table.get(&bucket, &key), chunker.next(),)?;
// Check object is valid and multipart block can be accepted
let first_block = first_block.ok_or(Error::BadRequest(format!("Empty body")))?;
@@ -404,8 +429,8 @@ pub async fn handle_complete_multipart_upload(
garage.object_table.get(&bucket, &key),
garage.version_table.get(&version_uuid, &EmptyKey),
)?;
- let object = object.ok_or(Error::BadRequest(format!("Object not found")))?;
+ let object = object.ok_or(Error::BadRequest(format!("Object not found")))?;
let object_version = object
.versions()
.iter()
@@ -418,11 +443,12 @@ pub async fn handle_complete_multipart_upload(
}
Some(x) => x.clone(),
};
- let version = version.ok_or(Error::BadRequest(format!("Version not found")))?;
+ let version = version.ok_or(Error::BadRequest(format!("Version not found")))?;
if version.blocks().len() == 0 {
return Err(Error::BadRequest(format!("No data was uploaded")));
}
+
let headers = match object_version.state {
ObjectVersionState::Uploading(headers) => headers.clone(),
_ => unreachable!(),
@@ -540,8 +566,13 @@ fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> {
let mut other = BTreeMap::new();
for h in other_headers.iter() {
if let Some(v) = req.headers().get(h) {
- if let Ok(v_str) = v.to_str() {
- other.insert(h.to_string(), v_str.to_string());
+ match v.to_str() {
+ Ok(v_str) => {
+ other.insert(h.to_string(), v_str.to_string());
+ }
+ Err(e) => {
+ warn!("Discarding header {}, error in .to_str(): {}", h, e);
+ }
}
}
}