aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2021-04-07 10:00:47 +0200
committerAlex Auvolat <alex@adnab.me>2021-04-07 10:00:47 +0200
commitc3bd672d58d32c8fc3b3225bfc2bfb5330ec726e (patch)
tree739ce03151cd601a90ed24e23aa0788ea29a29cb
parent93a9f9613051385832e22f0c54e9682e814e468c (diff)
downloadgarage-c3bd672d58d32c8fc3b3225bfc2bfb5330ec726e.tar.gz
garage-c3bd672d58d32c8fc3b3225bfc2bfb5330ec726e.zip
Reorganize code in s3_put.rs
-rw-r--r--src/api/s3_put.rs419
1 files changed, 212 insertions, 207 deletions
diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs
index d023bcef..50362b28 100644
--- a/src/api/s3_put.rs
+++ b/src/api/s3_put.rs
@@ -23,6 +23,8 @@ use crate::encoding::*;
use crate::error::*;
use crate::signature::verify_signed_content;
+// ---- PutObject call ----
+
pub async fn handle_put(
garage: Arc<Garage>,
req: Request<Body>,
@@ -151,174 +153,6 @@ pub async fn handle_put(
Ok(put_response(version_uuid, md5sum_hex))
}
-/// Validate MD5 sum against content-md5 header
-/// and sha256sum against signed content-sha256
-fn ensure_checksum_matches(
- data_md5sum: &[u8],
- data_sha256sum: garage_util::data::FixedBytes32,
- content_md5: Option<&str>,
- content_sha256: Option<garage_util::data::FixedBytes32>,
-) -> Result<(), Error> {
- if let Some(expected_sha256) = content_sha256 {
- if expected_sha256 != data_sha256sum {
- return Err(Error::BadRequest(format!(
- "Unable to validate x-amz-content-sha256"
- )));
- } else {
- trace!("Successfully validated x-amz-content-sha256");
- }
- }
- if let Some(expected_md5) = content_md5 {
- if expected_md5.trim_matches('"') != base64::encode(data_md5sum) {
- return Err(Error::BadRequest(format!("Unable to validate content-md5")));
- } else {
- trace!("Successfully validated content-md5");
- }
- }
- Ok(())
-}
-
-async fn read_and_put_blocks(
- garage: &Garage,
- version: &Version,
- part_number: u64,
- first_block: Vec<u8>,
- first_block_hash: Hash,
- chunker: &mut BodyChunker,
-) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> {
- let mut md5hasher = Md5::new();
- let mut sha256hasher = Sha256::new();
- md5hasher.update(&first_block[..]);
- sha256hasher.update(&first_block[..]);
-
- let mut next_offset = first_block.len();
- let mut put_curr_version_block = put_block_meta(
- &garage,
- &version,
- part_number,
- 0,
- first_block_hash,
- first_block.len() as u64,
- );
- let mut put_curr_block = garage
- .block_manager
- .rpc_put_block(first_block_hash, first_block);
-
- loop {
- let (_, _, next_block) =
- futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?;
- if let Some(block) = next_block {
- md5hasher.update(&block[..]);
- sha256hasher.update(&block[..]);
- let block_hash = blake2sum(&block[..]);
- let block_len = block.len();
- put_curr_version_block = put_block_meta(
- &garage,
- &version,
- part_number,
- next_offset as u64,
- block_hash,
- block_len as u64,
- );
- put_curr_block = garage.block_manager.rpc_put_block(block_hash, block);
- next_offset += block_len;
- } else {
- break;
- }
- }
-
- let total_size = next_offset as u64;
- let data_md5sum = md5hasher.finalize();
-
- let data_sha256sum = sha256hasher.finalize();
- let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap();
-
- Ok((total_size, data_md5sum, data_sha256sum))
-}
-
-async fn put_block_meta(
- garage: &Garage,
- version: &Version,
- part_number: u64,
- offset: u64,
- hash: Hash,
- size: u64,
-) -> Result<(), GarageError> {
- let mut version = version.clone();
- version.blocks.put(
- VersionBlockKey {
- part_number,
- offset,
- },
- VersionBlock { hash, size },
- );
-
- let block_ref = BlockRef {
- block: hash,
- version: version.uuid,
- deleted: false.into(),
- };
-
- futures::try_join!(
- garage.version_table.insert(&version),
- garage.block_ref_table.insert(&block_ref),
- )?;
- Ok(())
-}
-
-struct BodyChunker {
- body: Body,
- read_all: bool,
- min_block_size: usize,
- avg_block_size: usize,
- max_block_size: usize,
- buf: VecDeque<u8>,
-}
-
-impl BodyChunker {
- fn new(body: Body, block_size: usize) -> Self {
- let min_block_size = block_size / 4 * 3;
- let avg_block_size = block_size;
- let max_block_size = block_size * 2;
- Self {
- body,
- read_all: false,
- min_block_size,
- avg_block_size,
- max_block_size,
- buf: VecDeque::with_capacity(2 * max_block_size),
- }
- }
- async fn next(&mut self) -> Result<Option<Vec<u8>>, GarageError> {
- while !self.read_all && self.buf.len() < self.max_block_size {
- if let Some(block) = self.body.next().await {
- let bytes = block?;
- trace!("Body next: {} bytes", bytes.len());
- self.buf.extend(&bytes[..]);
- } else {
- self.read_all = true;
- }
- }
- if self.buf.len() == 0 {
- Ok(None)
- } else {
- let mut iter = FastCDC::with_eof(
- self.buf.make_contiguous(),
- self.min_block_size,
- self.avg_block_size,
- self.max_block_size,
- self.read_all,
- );
- if let Some(Chunk { length, .. }) = iter.next() {
- let block = self.buf.drain(..length).collect::<Vec<u8>>();
- Ok(Some(block))
- } else {
- unreachable!("FastCDC returned not chunk")
- }
- }
- }
-}
-
pub fn put_response(version_uuid: UUID, md5sum_hex: String) -> Response<Body> {
Response::builder()
.header("x-amz-version-id", hex::encode(version_uuid))
@@ -327,6 +161,8 @@ pub fn put_response(version_uuid: UUID, md5sum_hex: String) -> Response<Body> {
.unwrap()
}
+// ---- Mutlipart upload calls ----
+
pub async fn handle_create_multipart_upload(
garage: Arc<Garage>,
req: &Request<Body>,
@@ -591,17 +427,64 @@ pub async fn handle_abort_multipart_upload(
Ok(Response::new(Body::from(vec![])))
}
-fn get_mime_type(req: &Request<Body>) -> Result<String, Error> {
- Ok(req
+// ---- Parsing input to multipart upload calls ----
+
+fn decode_upload_id(id: &str) -> Result<UUID, Error> {
+ let id_bin = hex::decode(id).ok_or_bad_request("Invalid upload ID")?;
+ if id_bin.len() != 32 {
+ return None.ok_or_bad_request("Invalid upload ID");
+ }
+ let mut uuid = [0u8; 32];
+ uuid.copy_from_slice(&id_bin[..]);
+ Ok(UUID::from(uuid))
+}
+
+#[derive(Debug)]
+struct CompleteMultipartUploadPart {
+ etag: String,
+ part_number: u64,
+}
+
+fn parse_complete_multpart_upload_body(
+ xml: &roxmltree::Document,
+) -> Option<Vec<CompleteMultipartUploadPart>> {
+ let mut parts = vec![];
+
+ let root = xml.root();
+ let cmu = root.first_child()?;
+ if !cmu.has_tag_name("CompleteMultipartUpload") {
+ return None;
+ }
+
+ for item in cmu.children() {
+ if item.has_tag_name("Part") {
+ let etag = item.children().find(|e| e.has_tag_name("ETag"))?.text()?;
+ let part_number = item
+ .children()
+ .find(|e| e.has_tag_name("PartNumber"))?
+ .text()?;
+ parts.push(CompleteMultipartUploadPart {
+ etag: etag.trim_matches('"').to_string(),
+ part_number: part_number.parse().ok()?,
+ });
+ } else {
+ return None;
+ }
+ }
+
+ Some(parts)
+}
+
+// ---- Common code ----
+
+pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> {
+ let content_type = req
.headers()
.get(hyper::header::CONTENT_TYPE)
.map(|x| x.to_str())
.unwrap_or(Ok("blob"))?
- .to_string())
-}
+ .to_string();
-pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> {
- let content_type = get_mime_type(req)?;
let mut other = BTreeMap::new();
// Preserve standard headers
@@ -645,48 +528,170 @@ pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, E
})
}
-fn decode_upload_id(id: &str) -> Result<UUID, Error> {
- let id_bin = hex::decode(id).ok_or_bad_request("Invalid upload ID")?;
- if id_bin.len() != 32 {
- return None.ok_or_bad_request("Invalid upload ID");
+struct BodyChunker {
+ body: Body,
+ read_all: bool,
+ min_block_size: usize,
+ avg_block_size: usize,
+ max_block_size: usize,
+ buf: VecDeque<u8>,
+}
+
+impl BodyChunker {
+ fn new(body: Body, block_size: usize) -> Self {
+ let min_block_size = block_size / 4 * 3;
+ let avg_block_size = block_size;
+ let max_block_size = block_size * 2;
+ Self {
+ body,
+ read_all: false,
+ min_block_size,
+ avg_block_size,
+ max_block_size,
+ buf: VecDeque::with_capacity(2 * max_block_size),
+ }
+ }
+ async fn next(&mut self) -> Result<Option<Vec<u8>>, GarageError> {
+ while !self.read_all && self.buf.len() < self.max_block_size {
+ if let Some(block) = self.body.next().await {
+ let bytes = block?;
+ trace!("Body next: {} bytes", bytes.len());
+ self.buf.extend(&bytes[..]);
+ } else {
+ self.read_all = true;
+ }
+ }
+ if self.buf.len() == 0 {
+ Ok(None)
+ } else {
+ let mut iter = FastCDC::with_eof(
+ self.buf.make_contiguous(),
+ self.min_block_size,
+ self.avg_block_size,
+ self.max_block_size,
+ self.read_all,
+ );
+ if let Some(Chunk { length, .. }) = iter.next() {
+ let block = self.buf.drain(..length).collect::<Vec<u8>>();
+ Ok(Some(block))
+ } else {
+ unreachable!("FastCDC returned not chunk")
+ }
+ }
}
- let mut uuid = [0u8; 32];
- uuid.copy_from_slice(&id_bin[..]);
- Ok(UUID::from(uuid))
}
-#[derive(Debug)]
-struct CompleteMultipartUploadPart {
- etag: String,
+async fn read_and_put_blocks(
+ garage: &Garage,
+ version: &Version,
part_number: u64,
-}
+ first_block: Vec<u8>,
+ first_block_hash: Hash,
+ chunker: &mut BodyChunker,
+) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> {
+ let mut md5hasher = Md5::new();
+ let mut sha256hasher = Sha256::new();
+ md5hasher.update(&first_block[..]);
+ sha256hasher.update(&first_block[..]);
-fn parse_complete_multpart_upload_body(
- xml: &roxmltree::Document,
-) -> Option<Vec<CompleteMultipartUploadPart>> {
- let mut parts = vec![];
+ let mut next_offset = first_block.len();
+ let mut put_curr_version_block = put_block_meta(
+ &garage,
+ &version,
+ part_number,
+ 0,
+ first_block_hash,
+ first_block.len() as u64,
+ );
+ let mut put_curr_block = garage
+ .block_manager
+ .rpc_put_block(first_block_hash, first_block);
- let root = xml.root();
- let cmu = root.first_child()?;
- if !cmu.has_tag_name("CompleteMultipartUpload") {
- return None;
+ loop {
+ let (_, _, next_block) =
+ futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?;
+ if let Some(block) = next_block {
+ md5hasher.update(&block[..]);
+ sha256hasher.update(&block[..]);
+ let block_hash = blake2sum(&block[..]);
+ let block_len = block.len();
+ put_curr_version_block = put_block_meta(
+ &garage,
+ &version,
+ part_number,
+ next_offset as u64,
+ block_hash,
+ block_len as u64,
+ );
+ put_curr_block = garage.block_manager.rpc_put_block(block_hash, block);
+ next_offset += block_len;
+ } else {
+ break;
+ }
}
- for item in cmu.children() {
- if item.has_tag_name("Part") {
- let etag = item.children().find(|e| e.has_tag_name("ETag"))?.text()?;
- let part_number = item
- .children()
- .find(|e| e.has_tag_name("PartNumber"))?
- .text()?;
- parts.push(CompleteMultipartUploadPart {
- etag: etag.trim_matches('"').to_string(),
- part_number: part_number.parse().ok()?,
- });
+ let total_size = next_offset as u64;
+ let data_md5sum = md5hasher.finalize();
+
+ let data_sha256sum = sha256hasher.finalize();
+ let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap();
+
+ Ok((total_size, data_md5sum, data_sha256sum))
+}
+
+async fn put_block_meta(
+ garage: &Garage,
+ version: &Version,
+ part_number: u64,
+ offset: u64,
+ hash: Hash,
+ size: u64,
+) -> Result<(), GarageError> {
+ let mut version = version.clone();
+ version.blocks.put(
+ VersionBlockKey {
+ part_number,
+ offset,
+ },
+ VersionBlock { hash, size },
+ );
+
+ let block_ref = BlockRef {
+ block: hash,
+ version: version.uuid,
+ deleted: false.into(),
+ };
+
+ futures::try_join!(
+ garage.version_table.insert(&version),
+ garage.block_ref_table.insert(&block_ref),
+ )?;
+ Ok(())
+}
+
+/// Validate MD5 sum against content-md5 header
+/// and sha256sum against signed content-sha256
+fn ensure_checksum_matches(
+ data_md5sum: &[u8],
+ data_sha256sum: garage_util::data::FixedBytes32,
+ content_md5: Option<&str>,
+ content_sha256: Option<garage_util::data::FixedBytes32>,
+) -> Result<(), Error> {
+ if let Some(expected_sha256) = content_sha256 {
+ if expected_sha256 != data_sha256sum {
+ return Err(Error::BadRequest(format!(
+ "Unable to validate x-amz-content-sha256"
+ )));
} else {
- return None;
+ trace!("Successfully validated x-amz-content-sha256");
}
}
-
- Some(parts)
+ if let Some(expected_md5) = content_md5 {
+ if expected_md5.trim_matches('"') != base64::encode(data_md5sum) {
+ return Err(Error::BadRequest(format!("Unable to validate content-md5")));
+ } else {
+ trace!("Successfully validated content-md5");
+ }
+ }
+ Ok(())
}