diff options
Diffstat (limited to 'src/api/s3_put.rs')
-rw-r--r-- | src/api/s3_put.rs | 72 |
1 files changed, 44 insertions, 28 deletions
diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index c5d0a31c..75622168 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -1,4 +1,4 @@ -use std::collections::VecDeque; +use std::collections::{VecDeque, BTreeMap}; use std::fmt::Write; use std::sync::Arc; @@ -24,7 +24,11 @@ pub async fn handle_put( key: &str, ) -> Result<Response<Body>, Error> { let version_uuid = gen_uuid(); - let mime_type = get_mime_type(&req)?; + let headers = ObjectVersionHeaders{ + content_type: get_mime_type(&req)?, + other: BTreeMap::new(), // TODO + }; + let body = req.into_body(); let mut chunker = BodyChunker::new(body, garage.config.block_size); @@ -36,15 +40,17 @@ pub async fn handle_put( let mut object_version = ObjectVersion { uuid: version_uuid, timestamp: now_msec(), - mime_type, - size: first_block.len() as u64, - state: ObjectVersionState::Uploading, - data: ObjectVersionData::Uploading, + state: ObjectVersionState::Uploading(headers.clone()), }; if first_block.len() < INLINE_THRESHOLD { - object_version.data = ObjectVersionData::Inline(first_block); - object_version.state = ObjectVersionState::Complete; + object_version.state = ObjectVersionState::Complete(ObjectVersionData::Inline( + ObjectVersionMeta{ + headers, + size: first_block.len() as u64, + etag: "".to_string(), // TODO + }, + first_block)); let object = Object::new(bucket.into(), key.into(), vec![object_version]); garage.object_table.insert(&object).await?; @@ -54,7 +60,6 @@ pub async fn handle_put( let version = Version::new(version_uuid, bucket.into(), key.into(), false, vec![]); let first_block_hash = hash(&first_block[..]); - object_version.data = ObjectVersionData::FirstBlock(first_block_hash); let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]); garage.object_table.insert(&object).await?; @@ -70,8 +75,13 @@ pub async fn handle_put( // TODO: if at any step we have an error, we should undo everything we did - object_version.state = ObjectVersionState::Complete; - object_version.size = total_size; + object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( + ObjectVersionMeta{ + headers, + size: total_size, + etag: "".to_string(), // TODO + }, + first_block_hash)); let object = Object::new(bucket.into(), key.into(), vec![object_version]); garage.object_table.insert(&object).await?; @@ -197,6 +207,7 @@ impl BodyChunker { pub fn put_response(version_uuid: UUID) -> Response<Body> { Response::builder() .header("x-amz-version-id", hex::encode(version_uuid)) + // TODO ETag .body(Body::from(vec![])) .unwrap() } @@ -208,15 +219,15 @@ pub async fn handle_create_multipart_upload( key: &str, ) -> Result<Response<Body>, Error> { let version_uuid = gen_uuid(); - let mime_type = get_mime_type(req)?; + let headers = ObjectVersionHeaders{ + content_type: get_mime_type(&req)?, + other: BTreeMap::new(), // TODO + }; let object_version = ObjectVersion { uuid: version_uuid, timestamp: now_msec(), - mime_type, - size: 0, - state: ObjectVersionState::Uploading, - data: ObjectVersionData::Uploading, + state: ObjectVersionState::Uploading(headers), }; let object = Object::new(bucket.to_string(), key.to_string(), vec![object_version]); garage.object_table.insert(&object).await?; @@ -276,9 +287,7 @@ pub async fn handle_put_part( Some(x) => x, }; if !object.versions().iter().any(|v| { - v.uuid == version_uuid - && v.state == ObjectVersionState::Uploading - && v.data == ObjectVersionData::Uploading + v.uuid == version_uuid && v.is_uploading() }) { return Err(Error::BadRequest(format!( "Multipart upload does not exist or is otherwise invalid" @@ -322,9 +331,7 @@ pub async fn handle_complete_multipart_upload( Some(x) => x, }; let object_version = object.versions().iter().find(|v| { - v.uuid == version_uuid - && v.state == ObjectVersionState::Uploading - && v.data == ObjectVersionData::Uploading + v.uuid == version_uuid && v.is_uploading() }); let mut object_version = match object_version { None => { @@ -341,6 +348,10 @@ pub async fn handle_complete_multipart_upload( if version.blocks().len() == 0 { return Err(Error::BadRequest(format!("No data was uploaded"))); } + let headers = match object_version.state { + ObjectVersionState::Uploading(headers) => headers.clone(), + _ => unreachable!(), + }; // TODO: check that all the parts that they pretend they gave us are indeed there // TODO: check MD5 sum of all uploaded parts? but that would mean we have to store them somewhere... @@ -350,9 +361,16 @@ pub async fn handle_complete_multipart_upload( .iter() .map(|x| x.size) .fold(0, |x, y| x + y); - object_version.size = total_size; - object_version.state = ObjectVersionState::Complete; - object_version.data = ObjectVersionData::FirstBlock(version.blocks()[0].hash); + object_version.state = ObjectVersionState::Complete( + ObjectVersionData::FirstBlock( + ObjectVersionMeta{ + headers, + size: total_size, + etag: "".to_string(),// TODO + }, + version.blocks()[0].hash) + ); + let final_object = Object::new(bucket.clone(), key.clone(), vec![object_version]); garage.object_table.insert(&final_object).await?; @@ -394,9 +412,7 @@ pub async fn handle_abort_multipart_upload( Some(x) => x, }; let object_version = object.versions().iter().find(|v| { - v.uuid == version_uuid - && v.state == ObjectVersionState::Uploading - && v.data == ObjectVersionData::Uploading + v.uuid == version_uuid && v.is_uploading() }); let mut object_version = match object_version { None => { |