diff options
Diffstat (limited to 'src/api/s3_put.rs')
-rw-r--r-- | src/api/s3_put.rs | 137 |
1 files changed, 69 insertions, 68 deletions
diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index 70a467a8..e6df5bc0 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -5,9 +5,9 @@ use std::sync::Arc; use futures::stream::*; use hyper::{Body, Request, Response}; +use garage_table::*; use garage_util::data::*; use garage_util::error::Error; -use garage_table::*; use garage_core::block::INLINE_THRESHOLD; use garage_core::block_ref_table::*; @@ -15,6 +15,7 @@ use garage_core::garage::Garage; use garage_core::object_table::*; use garage_core::version_table::*; +use crate::encoding::*; use crate::http_util::*; pub async fn handle_put( @@ -30,7 +31,7 @@ pub async fn handle_put( let mut chunker = BodyChunker::new(body, garage.config.block_size); let first_block = match chunker.next().await? { Some(x) => x, - None => return Err(Error::BadRequest(format!("Empty body"))), + None => vec![], }; let mut object_version = ObjectVersion { @@ -58,7 +59,15 @@ pub async fn handle_put( let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]); garage.object_table.insert(&object).await?; - let total_size = read_and_put_blocks(&garage, version, 1, first_block, first_block_hash, &mut chunker).await?; + let total_size = read_and_put_blocks( + &garage, + version, + 1, + first_block, + first_block_hash, + &mut chunker, + ) + .await?; // TODO: if at any step we have an error, we should undo everything we did @@ -80,8 +89,14 @@ async fn read_and_put_blocks( chunker: &mut BodyChunker, ) -> Result<u64, Error> { let mut next_offset = first_block.len(); - let mut put_curr_version_block = - put_block_meta(garage.clone(), &version, part_number, 0, first_block_hash, first_block.len() as u64); + let mut put_curr_version_block = put_block_meta( + garage.clone(), + &version, + part_number, + 0, + first_block_hash, + first_block.len() as u64, + ); let mut put_curr_block = garage .block_manager .rpc_put_block(first_block_hash, first_block); @@ -92,8 +107,14 @@ async fn read_and_put_blocks( if let Some(block) = next_block { let block_hash = hash(&block[..]); let block_len = block.len(); - put_curr_version_block = - put_block_meta(garage.clone(), &version, part_number, next_offset as u64, block_hash, block_len as u64); + put_curr_version_block = put_block_meta( + garage.clone(), + &version, + part_number, + next_offset as u64, + block_hash, + block_len as u64, + ); put_curr_block = garage.block_manager.rpc_put_block(block_hash, block); next_offset += block_len; } else { @@ -232,8 +253,9 @@ pub async fn handle_put_part( .parse::<u64>() .map_err(|e| Error::BadRequest(format!("Invalid part number: {}", e)))?; - let version_uuid = uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?; - + let version_uuid = + uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?; + // Read first chuck, and at the same time try to get object to see if it exists let mut chunker = BodyChunker::new(req.into_body(), garage.config.block_size); @@ -265,7 +287,15 @@ pub async fn handle_put_part( // Copy block to store let version = Version::new(version_uuid, bucket.into(), key.into(), false, vec![]); let first_block_hash = hash(&first_block[..]); - read_and_put_blocks(&garage, version, part_number, first_block, first_block_hash, &mut chunker).await?; + read_and_put_blocks( + &garage, + version, + part_number, + first_block, + first_block_hash, + &mut chunker, + ) + .await?; Ok(Response::new(Box::new(BytesBody::from(vec![])))) } @@ -277,7 +307,8 @@ pub async fn handle_complete_multipart_upload( key: &str, upload_id: &str, ) -> Result<Response<BodyType>, Error> { - let version_uuid = uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?; + let version_uuid = + uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?; let bucket = bucket.to_string(); let key = key.to_string(); @@ -295,9 +326,11 @@ pub async fn handle_complete_multipart_upload( && v.data == ObjectVersionData::Uploading }); let mut object_version = match object_version { - None => return Err(Error::BadRequest(format!( - "Multipart upload does not exist or has already been completed" - ))), + None => { + return Err(Error::BadRequest(format!( + "Multipart upload does not exist or has already been completed" + ))) + } Some(x) => x.clone(), }; let version = match version { @@ -311,7 +344,11 @@ pub async fn handle_complete_multipart_upload( // TODO: check that all the parts that they pretend they gave us are indeed there // TODO: check MD5 sum of all uploaded parts? but that would mean we have to store them somewhere... - let total_size = version.blocks().iter().map(|x| x.size).fold(0, |x, y| x+y); + let total_size = version + .blocks() + .iter() + .map(|x| x.size) + .fold(0, |x, y| x + y); object_version.size = total_size; object_version.state = ObjectVersionState::Complete; object_version.data = ObjectVersionData::FirstBlock(version.blocks()[0].hash); @@ -325,7 +362,12 @@ pub async fn handle_complete_multipart_upload( r#"<CompleteMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">"# ) .unwrap(); - writeln!(&mut xml, "\t<Location>{}</Location>", garage.config.s3_api.s3_region).unwrap(); + writeln!( + &mut xml, + "\t<Location>{}</Location>", + garage.config.s3_api.s3_region + ) + .unwrap(); writeln!(&mut xml, "\t<Bucket>{}</Bucket>", bucket).unwrap(); writeln!(&mut xml, "\t<Key>{}</Key>", xml_escape(&key)).unwrap(); writeln!(&mut xml, "</CompleteMultipartUploadResult>").unwrap(); @@ -339,9 +381,13 @@ pub async fn handle_abort_multipart_upload( key: &str, upload_id: &str, ) -> Result<Response<BodyType>, Error> { - let version_uuid = uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?; + let version_uuid = + uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?; - let object = garage.object_table.get(&bucket.to_string(), &key.to_string()).await?; + let object = garage + .object_table + .get(&bucket.to_string(), &key.to_string()) + .await?; let object = match object { None => return Err(Error::BadRequest(format!("Object not found"))), Some(x) => x, @@ -352,9 +398,11 @@ pub async fn handle_abort_multipart_upload( && v.data == ObjectVersionData::Uploading }); let mut object_version = match object_version { - None => return Err(Error::BadRequest(format!( - "Multipart upload does not exist or has already been completed" - ))), + None => { + return Err(Error::BadRequest(format!( + "Multipart upload does not exist or has already been completed" + ))) + } Some(x) => x.clone(), }; @@ -383,50 +431,3 @@ fn uuid_from_str(id: &str) -> Result<UUID, ()> { uuid.copy_from_slice(&id_bin[..]); Ok(UUID::from(uuid)) } - -pub async fn handle_delete(garage: Arc<Garage>, bucket: &str, key: &str) -> Result<UUID, Error> { - let object = match garage - .object_table - .get(&bucket.to_string(), &key.to_string()) - .await? - { - None => { - // No need to delete - return Ok([0u8; 32].into()); - } - Some(o) => o, - }; - - let interesting_versions = object.versions().iter().filter(|v| { - v.data != ObjectVersionData::DeleteMarker && v.state != ObjectVersionState::Aborted - }); - - let mut must_delete = false; - let mut timestamp = now_msec(); - for v in interesting_versions { - must_delete = true; - timestamp = std::cmp::max(timestamp, v.timestamp + 1); - } - - if !must_delete { - return Ok([0u8; 32].into()); - } - - let version_uuid = gen_uuid(); - - let object = Object::new( - bucket.into(), - key.into(), - vec![ObjectVersion { - uuid: version_uuid, - timestamp: now_msec(), - mime_type: "application/x-delete-marker".into(), - size: 0, - state: ObjectVersionState::Complete, - data: ObjectVersionData::DeleteMarker, - }], - ); - - garage.object_table.insert(&object).await?; - return Ok(version_uuid); -} |