aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3_put.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/s3_put.rs')
-rw-r--r--src/api/s3_put.rs137
1 files changed, 69 insertions, 68 deletions
diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs
index 70a467a8..e6df5bc0 100644
--- a/src/api/s3_put.rs
+++ b/src/api/s3_put.rs
@@ -5,9 +5,9 @@ use std::sync::Arc;
use futures::stream::*;
use hyper::{Body, Request, Response};
+use garage_table::*;
use garage_util::data::*;
use garage_util::error::Error;
-use garage_table::*;
use garage_core::block::INLINE_THRESHOLD;
use garage_core::block_ref_table::*;
@@ -15,6 +15,7 @@ use garage_core::garage::Garage;
use garage_core::object_table::*;
use garage_core::version_table::*;
+use crate::encoding::*;
use crate::http_util::*;
pub async fn handle_put(
@@ -30,7 +31,7 @@ pub async fn handle_put(
let mut chunker = BodyChunker::new(body, garage.config.block_size);
let first_block = match chunker.next().await? {
Some(x) => x,
- None => return Err(Error::BadRequest(format!("Empty body"))),
+ None => vec![],
};
let mut object_version = ObjectVersion {
@@ -58,7 +59,15 @@ pub async fn handle_put(
let object = Object::new(bucket.into(), key.into(), vec![object_version.clone()]);
garage.object_table.insert(&object).await?;
- let total_size = read_and_put_blocks(&garage, version, 1, first_block, first_block_hash, &mut chunker).await?;
+ let total_size = read_and_put_blocks(
+ &garage,
+ version,
+ 1,
+ first_block,
+ first_block_hash,
+ &mut chunker,
+ )
+ .await?;
// TODO: if at any step we have an error, we should undo everything we did
@@ -80,8 +89,14 @@ async fn read_and_put_blocks(
chunker: &mut BodyChunker,
) -> Result<u64, Error> {
let mut next_offset = first_block.len();
- let mut put_curr_version_block =
- put_block_meta(garage.clone(), &version, part_number, 0, first_block_hash, first_block.len() as u64);
+ let mut put_curr_version_block = put_block_meta(
+ garage.clone(),
+ &version,
+ part_number,
+ 0,
+ first_block_hash,
+ first_block.len() as u64,
+ );
let mut put_curr_block = garage
.block_manager
.rpc_put_block(first_block_hash, first_block);
@@ -92,8 +107,14 @@ async fn read_and_put_blocks(
if let Some(block) = next_block {
let block_hash = hash(&block[..]);
let block_len = block.len();
- put_curr_version_block =
- put_block_meta(garage.clone(), &version, part_number, next_offset as u64, block_hash, block_len as u64);
+ put_curr_version_block = put_block_meta(
+ garage.clone(),
+ &version,
+ part_number,
+ next_offset as u64,
+ block_hash,
+ block_len as u64,
+ );
put_curr_block = garage.block_manager.rpc_put_block(block_hash, block);
next_offset += block_len;
} else {
@@ -232,8 +253,9 @@ pub async fn handle_put_part(
.parse::<u64>()
.map_err(|e| Error::BadRequest(format!("Invalid part number: {}", e)))?;
- let version_uuid = uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?;
-
+ let version_uuid =
+ uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?;
+
// Read first chuck, and at the same time try to get object to see if it exists
let mut chunker = BodyChunker::new(req.into_body(), garage.config.block_size);
@@ -265,7 +287,15 @@ pub async fn handle_put_part(
// Copy block to store
let version = Version::new(version_uuid, bucket.into(), key.into(), false, vec![]);
let first_block_hash = hash(&first_block[..]);
- read_and_put_blocks(&garage, version, part_number, first_block, first_block_hash, &mut chunker).await?;
+ read_and_put_blocks(
+ &garage,
+ version,
+ part_number,
+ first_block,
+ first_block_hash,
+ &mut chunker,
+ )
+ .await?;
Ok(Response::new(Box::new(BytesBody::from(vec![]))))
}
@@ -277,7 +307,8 @@ pub async fn handle_complete_multipart_upload(
key: &str,
upload_id: &str,
) -> Result<Response<BodyType>, Error> {
- let version_uuid = uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?;
+ let version_uuid =
+ uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?;
let bucket = bucket.to_string();
let key = key.to_string();
@@ -295,9 +326,11 @@ pub async fn handle_complete_multipart_upload(
&& v.data == ObjectVersionData::Uploading
});
let mut object_version = match object_version {
- None => return Err(Error::BadRequest(format!(
- "Multipart upload does not exist or has already been completed"
- ))),
+ None => {
+ return Err(Error::BadRequest(format!(
+ "Multipart upload does not exist or has already been completed"
+ )))
+ }
Some(x) => x.clone(),
};
let version = match version {
@@ -311,7 +344,11 @@ pub async fn handle_complete_multipart_upload(
// TODO: check that all the parts that they pretend they gave us are indeed there
// TODO: check MD5 sum of all uploaded parts? but that would mean we have to store them somewhere...
- let total_size = version.blocks().iter().map(|x| x.size).fold(0, |x, y| x+y);
+ let total_size = version
+ .blocks()
+ .iter()
+ .map(|x| x.size)
+ .fold(0, |x, y| x + y);
object_version.size = total_size;
object_version.state = ObjectVersionState::Complete;
object_version.data = ObjectVersionData::FirstBlock(version.blocks()[0].hash);
@@ -325,7 +362,12 @@ pub async fn handle_complete_multipart_upload(
r#"<CompleteMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">"#
)
.unwrap();
- writeln!(&mut xml, "\t<Location>{}</Location>", garage.config.s3_api.s3_region).unwrap();
+ writeln!(
+ &mut xml,
+ "\t<Location>{}</Location>",
+ garage.config.s3_api.s3_region
+ )
+ .unwrap();
writeln!(&mut xml, "\t<Bucket>{}</Bucket>", bucket).unwrap();
writeln!(&mut xml, "\t<Key>{}</Key>", xml_escape(&key)).unwrap();
writeln!(&mut xml, "</CompleteMultipartUploadResult>").unwrap();
@@ -339,9 +381,13 @@ pub async fn handle_abort_multipart_upload(
key: &str,
upload_id: &str,
) -> Result<Response<BodyType>, Error> {
- let version_uuid = uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?;
+ let version_uuid =
+ uuid_from_str(upload_id).map_err(|_| Error::BadRequest(format!("Invalid upload ID")))?;
- let object = garage.object_table.get(&bucket.to_string(), &key.to_string()).await?;
+ let object = garage
+ .object_table
+ .get(&bucket.to_string(), &key.to_string())
+ .await?;
let object = match object {
None => return Err(Error::BadRequest(format!("Object not found"))),
Some(x) => x,
@@ -352,9 +398,11 @@ pub async fn handle_abort_multipart_upload(
&& v.data == ObjectVersionData::Uploading
});
let mut object_version = match object_version {
- None => return Err(Error::BadRequest(format!(
- "Multipart upload does not exist or has already been completed"
- ))),
+ None => {
+ return Err(Error::BadRequest(format!(
+ "Multipart upload does not exist or has already been completed"
+ )))
+ }
Some(x) => x.clone(),
};
@@ -383,50 +431,3 @@ fn uuid_from_str(id: &str) -> Result<UUID, ()> {
uuid.copy_from_slice(&id_bin[..]);
Ok(UUID::from(uuid))
}
-
-pub async fn handle_delete(garage: Arc<Garage>, bucket: &str, key: &str) -> Result<UUID, Error> {
- let object = match garage
- .object_table
- .get(&bucket.to_string(), &key.to_string())
- .await?
- {
- None => {
- // No need to delete
- return Ok([0u8; 32].into());
- }
- Some(o) => o,
- };
-
- let interesting_versions = object.versions().iter().filter(|v| {
- v.data != ObjectVersionData::DeleteMarker && v.state != ObjectVersionState::Aborted
- });
-
- let mut must_delete = false;
- let mut timestamp = now_msec();
- for v in interesting_versions {
- must_delete = true;
- timestamp = std::cmp::max(timestamp, v.timestamp + 1);
- }
-
- if !must_delete {
- return Ok([0u8; 32].into());
- }
-
- let version_uuid = gen_uuid();
-
- let object = Object::new(
- bucket.into(),
- key.into(),
- vec![ObjectVersion {
- uuid: version_uuid,
- timestamp: now_msec(),
- mime_type: "application/x-delete-marker".into(),
- size: 0,
- state: ObjectVersionState::Complete,
- data: ObjectVersionData::DeleteMarker,
- }],
- );
-
- garage.object_table.insert(&object).await?;
- return Ok(version_uuid);
-}