diff options
-rw-r--r-- | Cargo.lock | 88 | ||||
-rw-r--r-- | src/api/Cargo.toml | 2 | ||||
-rw-r--r-- | src/api/s3_put.rs | 405 | ||||
-rw-r--r-- | src/rpc/rpc_client.rs | 1 |
4 files changed, 292 insertions, 204 deletions
@@ -222,6 +222,12 @@ dependencies = [ ] [[package]] +name = "fastcdc" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5afa29be46b12c8c380b997def8d1ac77c2665da93eb0a768fab0bf4db79333f" + +[[package]] name = "fnv" version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -365,7 +371,7 @@ dependencies = [ "hex", "log", "pretty_env_logger", - "rand", + "rand 0.8.3", "rmp-serde", "serde", "sled", @@ -383,6 +389,7 @@ dependencies = [ "chrono", "crypto-mac 0.10.0", "err-derive", + "fastcdc", "futures", "futures-util", "garage_model", @@ -397,6 +404,7 @@ dependencies = [ "log", "md-5", "percent-encoding", + "rand 0.7.3", "roxmltree", "sha2", "tokio", @@ -415,7 +423,7 @@ dependencies = [ "garage_util", "hex", "log", - "rand", + "rand 0.8.3", "rmp-serde", "serde", "serde_bytes", @@ -459,7 +467,7 @@ dependencies = [ "garage_util", "hexdump", "log", - "rand", + "rand 0.8.3", "rmp-serde", "serde", "serde_bytes", @@ -479,7 +487,7 @@ dependencies = [ "http", "hyper", "log", - "rand", + "rand 0.8.3", "rmp-serde", "rustls", "serde", @@ -531,13 +539,24 @@ dependencies = [ [[package]] name = "getrandom" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce" +dependencies = [ + "cfg-if", + "libc", + "wasi 0.9.0+wasi-snapshot-preview1", +] + +[[package]] +name = "getrandom" version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c9495705279e7140bf035dde1f6e750c162df8b625267cd52cc44e0b156732c8" dependencies = [ "cfg-if", "libc", - "wasi", + "wasi 0.10.2+wasi-snapshot-preview1", ] [[package]] @@ -1045,14 +1064,37 @@ dependencies = [ [[package]] name = "rand" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" +dependencies = [ + "getrandom 0.1.16", + "libc", + "rand_chacha 0.2.2", + "rand_core 0.5.1", + "rand_hc 0.2.0", +] + +[[package]] +name = "rand" version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ef9e7e66b4468674bfcb0c81af8b7fa0bb154fa9f28eb840da5c447baeb8d7e" dependencies = [ "libc", - "rand_chacha", - "rand_core", - "rand_hc", + "rand_chacha 0.3.0", + "rand_core 0.6.2", + "rand_hc 0.3.0", +] + +[[package]] +name = "rand_chacha" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" +dependencies = [ + "ppv-lite86", + "rand_core 0.5.1", ] [[package]] @@ -1062,7 +1104,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e12735cf05c9e10bf21534da50a147b924d555dc7a547c42e6bb2d5b6017ae0d" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.2", +] + +[[package]] +name = "rand_core" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" +dependencies = [ + "getrandom 0.1.16", ] [[package]] @@ -1071,7 +1122,16 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34cf66eb183df1c5876e2dcf6b13d57340741e8dc255b48e40a26de954d06ae7" dependencies = [ - "getrandom", + "getrandom 0.2.2", +] + +[[package]] +name = "rand_hc" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" +dependencies = [ + "rand_core 0.5.1", ] [[package]] @@ -1080,7 +1140,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3190ef7066a446f2e7f42e239d161e905420ccab01eb967c9eb27d21b2322a73" dependencies = [ - "rand_core", + "rand_core 0.6.2", ] [[package]] @@ -1583,6 +1643,12 @@ dependencies = [ [[package]] name = "wasi" +version = "0.9.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" + +[[package]] +name = "wasi" version = "0.10.2+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 0b824ca3..b328f671 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -22,10 +22,12 @@ bytes = "1.0" chrono = "0.4" crypto-mac = "0.10" err-derive = "0.3" +fastcdc = "1.0.5" hex = "0.4" hmac = "0.10" log = "0.4" md-5 = "0.9" +rand = "0.7" sha2 = "0.9" futures = "0.3" diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index c4e3b818..50362b28 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -2,6 +2,7 @@ use std::collections::{BTreeMap, VecDeque}; use std::fmt::Write; use std::sync::Arc; +use fastcdc::{Chunk, FastCDC}; use futures::stream::*; use hyper::{Body, Request, Response}; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; @@ -22,6 +23,8 @@ use crate::encoding::*; use crate::error::*; use crate::signature::verify_signed_content; +// ---- PutObject call ---- + pub async fn handle_put( garage: Arc<Garage>, req: Request<Body>, @@ -150,159 +153,6 @@ pub async fn handle_put( Ok(put_response(version_uuid, md5sum_hex)) } -/// Validate MD5 sum against content-md5 header -/// and sha256sum against signed content-sha256 -fn ensure_checksum_matches( - data_md5sum: &[u8], - data_sha256sum: garage_util::data::FixedBytes32, - content_md5: Option<&str>, - content_sha256: Option<garage_util::data::FixedBytes32>, -) -> Result<(), Error> { - if let Some(expected_sha256) = content_sha256 { - if expected_sha256 != data_sha256sum { - return Err(Error::BadRequest(format!( - "Unable to validate x-amz-content-sha256" - ))); - } else { - trace!("Successfully validated x-amz-content-sha256"); - } - } - if let Some(expected_md5) = content_md5 { - if expected_md5.trim_matches('"') != base64::encode(data_md5sum) { - return Err(Error::BadRequest(format!("Unable to validate content-md5"))); - } else { - trace!("Successfully validated content-md5"); - } - } - Ok(()) -} - -async fn read_and_put_blocks( - garage: &Garage, - version: &Version, - part_number: u64, - first_block: Vec<u8>, - first_block_hash: Hash, - chunker: &mut BodyChunker, -) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> { - let mut md5hasher = Md5::new(); - let mut sha256hasher = Sha256::new(); - md5hasher.update(&first_block[..]); - sha256hasher.update(&first_block[..]); - - let mut next_offset = first_block.len(); - let mut put_curr_version_block = put_block_meta( - &garage, - &version, - part_number, - 0, - first_block_hash, - first_block.len() as u64, - ); - let mut put_curr_block = garage - .block_manager - .rpc_put_block(first_block_hash, first_block); - - loop { - let (_, _, next_block) = - futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?; - if let Some(block) = next_block { - md5hasher.update(&block[..]); - sha256hasher.update(&block[..]); - let block_hash = blake2sum(&block[..]); - let block_len = block.len(); - put_curr_version_block = put_block_meta( - &garage, - &version, - part_number, - next_offset as u64, - block_hash, - block_len as u64, - ); - put_curr_block = garage.block_manager.rpc_put_block(block_hash, block); - next_offset += block_len; - } else { - break; - } - } - - let total_size = next_offset as u64; - let data_md5sum = md5hasher.finalize(); - - let data_sha256sum = sha256hasher.finalize(); - let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap(); - - Ok((total_size, data_md5sum, data_sha256sum)) -} - -async fn put_block_meta( - garage: &Garage, - version: &Version, - part_number: u64, - offset: u64, - hash: Hash, - size: u64, -) -> Result<(), GarageError> { - let mut version = version.clone(); - version.blocks.put( - VersionBlockKey { - part_number, - offset, - }, - VersionBlock { hash, size }, - ); - - let block_ref = BlockRef { - block: hash, - version: version.uuid, - deleted: false.into(), - }; - - futures::try_join!( - garage.version_table.insert(&version), - garage.block_ref_table.insert(&block_ref), - )?; - Ok(()) -} - -struct BodyChunker { - body: Body, - read_all: bool, - block_size: usize, - buf: VecDeque<u8>, -} - -impl BodyChunker { - fn new(body: Body, block_size: usize) -> Self { - Self { - body, - read_all: false, - block_size, - buf: VecDeque::with_capacity(2 * block_size), - } - } - async fn next(&mut self) -> Result<Option<Vec<u8>>, GarageError> { - while !self.read_all && self.buf.len() < self.block_size { - if let Some(block) = self.body.next().await { - let bytes = block?; - trace!("Body next: {} bytes", bytes.len()); - self.buf.extend(&bytes[..]); - } else { - self.read_all = true; - } - } - if self.buf.len() == 0 { - Ok(None) - } else if self.buf.len() <= self.block_size { - let block = self.buf.drain(..).collect::<Vec<u8>>(); - Ok(Some(block)) - } else { - let block = self.buf.drain(..self.block_size).collect::<Vec<u8>>(); - Ok(Some(block)) - } - } -} - pub fn put_response(version_uuid: UUID, md5sum_hex: String) -> Response<Body> { Response::builder() .header("x-amz-version-id", hex::encode(version_uuid)) @@ -311,6 +161,8 @@ pub fn put_response(version_uuid: UUID, md5sum_hex: String) -> Response<Body> { .unwrap() } +// ---- Mutlipart upload calls ---- + pub async fn handle_create_multipart_upload( garage: Arc<Garage>, req: &Request<Body>, @@ -575,17 +427,64 @@ pub async fn handle_abort_multipart_upload( Ok(Response::new(Body::from(vec![]))) } -fn get_mime_type(req: &Request<Body>) -> Result<String, Error> { - Ok(req +// ---- Parsing input to multipart upload calls ---- + +fn decode_upload_id(id: &str) -> Result<UUID, Error> { + let id_bin = hex::decode(id).ok_or_bad_request("Invalid upload ID")?; + if id_bin.len() != 32 { + return None.ok_or_bad_request("Invalid upload ID"); + } + let mut uuid = [0u8; 32]; + uuid.copy_from_slice(&id_bin[..]); + Ok(UUID::from(uuid)) +} + +#[derive(Debug)] +struct CompleteMultipartUploadPart { + etag: String, + part_number: u64, +} + +fn parse_complete_multpart_upload_body( + xml: &roxmltree::Document, +) -> Option<Vec<CompleteMultipartUploadPart>> { + let mut parts = vec![]; + + let root = xml.root(); + let cmu = root.first_child()?; + if !cmu.has_tag_name("CompleteMultipartUpload") { + return None; + } + + for item in cmu.children() { + if item.has_tag_name("Part") { + let etag = item.children().find(|e| e.has_tag_name("ETag"))?.text()?; + let part_number = item + .children() + .find(|e| e.has_tag_name("PartNumber"))? + .text()?; + parts.push(CompleteMultipartUploadPart { + etag: etag.trim_matches('"').to_string(), + part_number: part_number.parse().ok()?, + }); + } else { + return None; + } + } + + Some(parts) +} + +// ---- Common code ---- + +pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> { + let content_type = req .headers() .get(hyper::header::CONTENT_TYPE) .map(|x| x.to_str()) .unwrap_or(Ok("blob"))? - .to_string()) -} + .to_string(); -pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> { - let content_type = get_mime_type(req)?; let mut other = BTreeMap::new(); // Preserve standard headers @@ -629,48 +528,170 @@ pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, E }) } -fn decode_upload_id(id: &str) -> Result<UUID, Error> { - let id_bin = hex::decode(id).ok_or_bad_request("Invalid upload ID")?; - if id_bin.len() != 32 { - return None.ok_or_bad_request("Invalid upload ID"); +struct BodyChunker { + body: Body, + read_all: bool, + min_block_size: usize, + avg_block_size: usize, + max_block_size: usize, + buf: VecDeque<u8>, +} + +impl BodyChunker { + fn new(body: Body, block_size: usize) -> Self { + let min_block_size = block_size / 4 * 3; + let avg_block_size = block_size; + let max_block_size = block_size * 2; + Self { + body, + read_all: false, + min_block_size, + avg_block_size, + max_block_size, + buf: VecDeque::with_capacity(2 * max_block_size), + } + } + async fn next(&mut self) -> Result<Option<Vec<u8>>, GarageError> { + while !self.read_all && self.buf.len() < self.max_block_size { + if let Some(block) = self.body.next().await { + let bytes = block?; + trace!("Body next: {} bytes", bytes.len()); + self.buf.extend(&bytes[..]); + } else { + self.read_all = true; + } + } + if self.buf.len() == 0 { + Ok(None) + } else { + let mut iter = FastCDC::with_eof( + self.buf.make_contiguous(), + self.min_block_size, + self.avg_block_size, + self.max_block_size, + self.read_all, + ); + if let Some(Chunk { length, .. }) = iter.next() { + let block = self.buf.drain(..length).collect::<Vec<u8>>(); + Ok(Some(block)) + } else { + unreachable!("FastCDC returned not chunk") + } + } } - let mut uuid = [0u8; 32]; - uuid.copy_from_slice(&id_bin[..]); - Ok(UUID::from(uuid)) } -#[derive(Debug)] -struct CompleteMultipartUploadPart { - etag: String, +async fn read_and_put_blocks( + garage: &Garage, + version: &Version, part_number: u64, -} + first_block: Vec<u8>, + first_block_hash: Hash, + chunker: &mut BodyChunker, +) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> { + let mut md5hasher = Md5::new(); + let mut sha256hasher = Sha256::new(); + md5hasher.update(&first_block[..]); + sha256hasher.update(&first_block[..]); -fn parse_complete_multpart_upload_body( - xml: &roxmltree::Document, -) -> Option<Vec<CompleteMultipartUploadPart>> { - let mut parts = vec![]; + let mut next_offset = first_block.len(); + let mut put_curr_version_block = put_block_meta( + &garage, + &version, + part_number, + 0, + first_block_hash, + first_block.len() as u64, + ); + let mut put_curr_block = garage + .block_manager + .rpc_put_block(first_block_hash, first_block); - let root = xml.root(); - let cmu = root.first_child()?; - if !cmu.has_tag_name("CompleteMultipartUpload") { - return None; + loop { + let (_, _, next_block) = + futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?; + if let Some(block) = next_block { + md5hasher.update(&block[..]); + sha256hasher.update(&block[..]); + let block_hash = blake2sum(&block[..]); + let block_len = block.len(); + put_curr_version_block = put_block_meta( + &garage, + &version, + part_number, + next_offset as u64, + block_hash, + block_len as u64, + ); + put_curr_block = garage.block_manager.rpc_put_block(block_hash, block); + next_offset += block_len; + } else { + break; + } } - for item in cmu.children() { - if item.has_tag_name("Part") { - let etag = item.children().find(|e| e.has_tag_name("ETag"))?.text()?; - let part_number = item - .children() - .find(|e| e.has_tag_name("PartNumber"))? - .text()?; - parts.push(CompleteMultipartUploadPart { - etag: etag.trim_matches('"').to_string(), - part_number: part_number.parse().ok()?, - }); + let total_size = next_offset as u64; + let data_md5sum = md5hasher.finalize(); + + let data_sha256sum = sha256hasher.finalize(); + let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap(); + + Ok((total_size, data_md5sum, data_sha256sum)) +} + +async fn put_block_meta( + garage: &Garage, + version: &Version, + part_number: u64, + offset: u64, + hash: Hash, + size: u64, +) -> Result<(), GarageError> { + let mut version = version.clone(); + version.blocks.put( + VersionBlockKey { + part_number, + offset, + }, + VersionBlock { hash, size }, + ); + + let block_ref = BlockRef { + block: hash, + version: version.uuid, + deleted: false.into(), + }; + + futures::try_join!( + garage.version_table.insert(&version), + garage.block_ref_table.insert(&block_ref), + )?; + Ok(()) +} + +/// Validate MD5 sum against content-md5 header +/// and sha256sum against signed content-sha256 +fn ensure_checksum_matches( + data_md5sum: &[u8], + data_sha256sum: garage_util::data::FixedBytes32, + content_md5: Option<&str>, + content_sha256: Option<garage_util::data::FixedBytes32>, +) -> Result<(), Error> { + if let Some(expected_sha256) = content_sha256 { + if expected_sha256 != data_sha256sum { + return Err(Error::BadRequest(format!( + "Unable to validate x-amz-content-sha256" + ))); } else { - return None; + trace!("Successfully validated x-amz-content-sha256"); } } - - Some(parts) + if let Some(expected_md5) = content_md5 { + if expected_md5.trim_matches('"') != base64::encode(data_md5sum) { + return Err(Error::BadRequest(format!("Unable to validate content-md5"))); + } else { + trace!("Successfully validated content-md5"); + } + } + Ok(()) } diff --git a/src/rpc/rpc_client.rs b/src/rpc/rpc_client.rs index 3bb58c91..8a6cc721 100644 --- a/src/rpc/rpc_client.rs +++ b/src/rpc/rpc_client.rs @@ -145,7 +145,6 @@ impl<M: RpcMessage + 'static> RpcClient<M> { { Err(rpc_error) => { node_status.num_failures.fetch_add(1, Ordering::SeqCst); - // TODO: Save failure info somewhere Err(Error::from(rpc_error)) } Ok(x) => x, |