diff options
author | Alex Auvolat <alex@adnab.me> | 2021-02-19 12:11:02 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-02-19 12:11:02 +0100 |
commit | 76390085ef7cc0a8879e9edc039b536f90d5aa59 (patch) | |
tree | e4a9524dcf3d6613e1a881f938c6ad581e9d9e72 | |
parent | 3b023c0c3b985dc1c04c8545351b5ba20cf9189e (diff) | |
download | garage-76390085ef7cc0a8879e9edc039b536f90d5aa59.tar.gz garage-76390085ef7cc0a8879e9edc039b536f90d5aa59.zip |
Small improvements in the S3 put workflow
-rw-r--r-- | src/api/s3_put.rs | 109 |
1 files changed, 70 insertions, 39 deletions
diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index c42309b2..656d84a9 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -27,24 +27,25 @@ pub async fn handle_put( key: &str, content_sha256: Option<Hash>, ) -> Result<Response<Body>, Error> { + // Generate identity of new version let version_uuid = gen_uuid(); + let version_timestamp = now_msec(); + + // Retrieve interesting headers from request let headers = get_headers(&req)?; let content_md5 = match req.headers().get("content-md5") { Some(x) => Some(x.to_str()?.to_string()), None => None, }; + // Parse body of uploaded file let body = req.into_body(); let mut chunker = BodyChunker::new(body, garage.config.block_size); let first_block = chunker.next().await?.unwrap_or(vec![]); - let mut object_version = ObjectVersion { - uuid: version_uuid, - timestamp: now_msec(), - state: ObjectVersionState::Uploading(headers.clone()), - }; - + // If body is small enough, store it directly in the object table + // as "inline data". We can then return immediately. if first_block.len() < INLINE_THRESHOLD { let mut md5sum = Md5::new(); md5sum.update(&first_block[..]); @@ -60,27 +61,41 @@ pub async fn handle_put( content_sha256, )?; - object_version.state = ObjectVersionState::Complete(ObjectVersionData::Inline( - ObjectVersionMeta { - headers, - size: first_block.len() as u64, - etag: md5sum_hex.clone(), - }, - first_block, - )); + let object_version = ObjectVersion { + uuid: version_uuid, + timestamp: version_timestamp, + state: ObjectVersionState::Complete(ObjectVersionData::Inline( + ObjectVersionMeta { + headers, + size: first_block.len() as u64, + etag: md5sum_hex.clone(), + }, + first_block, + )), + }; let object = Object::new(bucket.into(), key.into(), vec![object_version]); garage.object_table.insert(&object).await?; + return Ok(put_response(version_uuid, md5sum_hex)); } - let version = Version::new(version_uuid, bucket.into(), key.into(), false, vec![]); - - let first_block_hash = hash(&first_block[..]); + // Write version identifier in object table so that we have a trace + // that we are uploading something + let mut object_version = ObjectVersion { + uuid: version_uuid, + timestamp: now_msec(), + state: ObjectVersionState::Uploading(headers.clone()), + }; let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]); garage.object_table.insert(&object).await?; - let (total_size, md5sum_arr, sha256sum) = read_and_put_blocks( + // Initialize corresponding entry in version table + let version = Version::new(version_uuid, bucket.into(), key.into(), false, vec![]); + let first_block_hash = hash(&first_block[..]); + + // Transfer data and verify checksum + let tx_result = read_and_put_blocks( &garage, version, 1, @@ -88,19 +103,31 @@ pub async fn handle_put( first_block_hash, &mut chunker, ) - .await?; - - ensure_checksum_matches( - md5sum_arr.as_slice(), - sha256sum, - content_md5.as_deref(), - content_sha256, - )?; - - // TODO: if at any step we have an error, we should undo everything we did + .await + .and_then(|(total_size, md5sum_arr, sha256sum)| { + ensure_checksum_matches( + md5sum_arr.as_slice(), + sha256sum, + content_md5.as_deref(), + content_sha256, + ) + .map(|()| (total_size, md5sum_arr)) + }); + + // If something went wrong, clean up + let (total_size, md5sum_arr) = match tx_result { + Ok(rv) => rv, + Err(e) => { + // Mark object as aborted, this will free the blocks further down + object_version.state = ObjectVersionState::Aborted; + let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]); + garage.object_table.insert(&object).await?; + return Err(e); + } + }; + // Save final object state, marked as Complete let md5sum_hex = hex::encode(md5sum_arr); - object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( ObjectVersionMeta { headers, @@ -109,7 +136,6 @@ pub async fn handle_put( }, first_block_hash, )); - let object = Object::new(bucket.into(), key.into(), vec![object_version]); garage.object_table.insert(&object).await?; @@ -340,13 +366,12 @@ pub async fn handle_put_part( }; // Read first chuck, and at the same time try to get object to see if it exists - let mut chunker = BodyChunker::new(req.into_body(), garage.config.block_size); - let bucket = bucket.to_string(); let key = key.to_string(); - let get_object_fut = garage.object_table.get(&bucket, &key); - let get_first_block_fut = chunker.next(); - let (object, first_block) = futures::try_join!(get_object_fut, get_first_block_fut)?; + let mut chunker = BodyChunker::new(req.into_body(), garage.config.block_size); + + let (object, first_block) = + futures::try_join!(garage.object_table.get(&bucket, &key), chunker.next(),)?; // Check object is valid and multipart block can be accepted let first_block = first_block.ok_or(Error::BadRequest(format!("Empty body")))?; @@ -404,8 +429,8 @@ pub async fn handle_complete_multipart_upload( garage.object_table.get(&bucket, &key), garage.version_table.get(&version_uuid, &EmptyKey), )?; - let object = object.ok_or(Error::BadRequest(format!("Object not found")))?; + let object = object.ok_or(Error::BadRequest(format!("Object not found")))?; let object_version = object .versions() .iter() @@ -418,11 +443,12 @@ pub async fn handle_complete_multipart_upload( } Some(x) => x.clone(), }; - let version = version.ok_or(Error::BadRequest(format!("Version not found")))?; + let version = version.ok_or(Error::BadRequest(format!("Version not found")))?; if version.blocks().len() == 0 { return Err(Error::BadRequest(format!("No data was uploaded"))); } + let headers = match object_version.state { ObjectVersionState::Uploading(headers) => headers.clone(), _ => unreachable!(), @@ -540,8 +566,13 @@ fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> { let mut other = BTreeMap::new(); for h in other_headers.iter() { if let Some(v) = req.headers().get(h) { - if let Ok(v_str) = v.to_str() { - other.insert(h.to_string(), v_str.to_string()); + match v.to_str() { + Ok(v_str) => { + other.insert(h.to_string(), v_str.to_string()); + } + Err(e) => { + warn!("Discarding header {}, error in .to_str(): {}", h, e); + } } } } |