diff options
author | Alex <alex@adnab.me> | 2021-04-06 22:18:41 +0200 |
---|---|---|
committer | Alex <alex@adnab.me> | 2021-04-06 22:18:41 +0200 |
commit | 7380f3855ca11d6ca0c55c2132f478e52f3fe9b8 (patch) | |
tree | ead4cd90d954db614d4c3e60ef06c2844108d6d6 /src/api | |
parent | c4c4b7dedc7b0faca508a0d4a921cb2cc97ba560 (diff) | |
parent | 6cbc8d6ec93b832a301a5402f1b1ae70b07a2be3 (diff) | |
download | garage-7380f3855ca11d6ca0c55c2132f478e52f3fe9b8.tar.gz garage-7380f3855ca11d6ca0c55c2132f478e52f3fe9b8.zip |
Merge pull request 'Use content defined chunking' (#43) from trinity-1686a/garage:content-defined-chunking into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/43
Diffstat (limited to 'src/api')
-rw-r--r-- | src/api/Cargo.toml | 2 | ||||
-rw-r--r-- | src/api/s3_put.rs | 34 |
2 files changed, 27 insertions, 9 deletions
diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 0b824ca3..b328f671 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -22,10 +22,12 @@ bytes = "1.0" chrono = "0.4" crypto-mac = "0.10" err-derive = "0.3" +fastcdc = "1.0.5" hex = "0.4" hmac = "0.10" log = "0.4" md-5 = "0.9" +rand = "0.7" sha2 = "0.9" futures = "0.3" diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index c4e3b818..d023bcef 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -2,6 +2,7 @@ use std::collections::{BTreeMap, VecDeque}; use std::fmt::Write; use std::sync::Arc; +use fastcdc::{Chunk, FastCDC}; use futures::stream::*; use hyper::{Body, Request, Response}; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; @@ -268,21 +269,28 @@ async fn put_block_meta( struct BodyChunker { body: Body, read_all: bool, - block_size: usize, + min_block_size: usize, + avg_block_size: usize, + max_block_size: usize, buf: VecDeque<u8>, } impl BodyChunker { fn new(body: Body, block_size: usize) -> Self { + let min_block_size = block_size / 4 * 3; + let avg_block_size = block_size; + let max_block_size = block_size * 2; Self { body, read_all: false, - block_size, - buf: VecDeque::with_capacity(2 * block_size), + min_block_size, + avg_block_size, + max_block_size, + buf: VecDeque::with_capacity(2 * max_block_size), } } async fn next(&mut self) -> Result<Option<Vec<u8>>, GarageError> { - while !self.read_all && self.buf.len() < self.block_size { + while !self.read_all && self.buf.len() < self.max_block_size { if let Some(block) = self.body.next().await { let bytes = block?; trace!("Body next: {} bytes", bytes.len()); @@ -293,12 +301,20 @@ impl BodyChunker { } if self.buf.len() == 0 { Ok(None) - } else if self.buf.len() <= self.block_size { - let block = self.buf.drain(..).collect::<Vec<u8>>(); - Ok(Some(block)) } else { - let block = self.buf.drain(..self.block_size).collect::<Vec<u8>>(); - Ok(Some(block)) + let mut iter = FastCDC::with_eof( + self.buf.make_contiguous(), + self.min_block_size, + self.avg_block_size, + self.max_block_size, + self.read_all, + ); + if let Some(Chunk { length, .. }) = iter.next() { + let block = self.buf.drain(..length).collect::<Vec<u8>>(); + Ok(Some(block)) + } else { + unreachable!("FastCDC returned not chunk") + } } } } |