aboutsummaryrefslogtreecommitdiff
path: root/src/api/s3_put.rs
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2021-04-06 22:18:41 +0200
committerAlex <alex@adnab.me>2021-04-06 22:18:41 +0200
commit7380f3855ca11d6ca0c55c2132f478e52f3fe9b8 (patch)
treeead4cd90d954db614d4c3e60ef06c2844108d6d6 /src/api/s3_put.rs
parentc4c4b7dedc7b0faca508a0d4a921cb2cc97ba560 (diff)
parent6cbc8d6ec93b832a301a5402f1b1ae70b07a2be3 (diff)
downloadgarage-7380f3855ca11d6ca0c55c2132f478e52f3fe9b8.tar.gz
garage-7380f3855ca11d6ca0c55c2132f478e52f3fe9b8.zip
Merge pull request 'Use content defined chunking' (#43) from trinity-1686a/garage:content-defined-chunking into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/43
Diffstat (limited to 'src/api/s3_put.rs')
-rw-r--r--src/api/s3_put.rs34
1 files changed, 25 insertions, 9 deletions
diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs
index c4e3b818..d023bcef 100644
--- a/src/api/s3_put.rs
+++ b/src/api/s3_put.rs
@@ -2,6 +2,7 @@ use std::collections::{BTreeMap, VecDeque};
use std::fmt::Write;
use std::sync::Arc;
+use fastcdc::{Chunk, FastCDC};
use futures::stream::*;
use hyper::{Body, Request, Response};
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
@@ -268,21 +269,28 @@ async fn put_block_meta(
struct BodyChunker {
body: Body,
read_all: bool,
- block_size: usize,
+ min_block_size: usize,
+ avg_block_size: usize,
+ max_block_size: usize,
buf: VecDeque<u8>,
}
impl BodyChunker {
fn new(body: Body, block_size: usize) -> Self {
+ let min_block_size = block_size / 4 * 3;
+ let avg_block_size = block_size;
+ let max_block_size = block_size * 2;
Self {
body,
read_all: false,
- block_size,
- buf: VecDeque::with_capacity(2 * block_size),
+ min_block_size,
+ avg_block_size,
+ max_block_size,
+ buf: VecDeque::with_capacity(2 * max_block_size),
}
}
async fn next(&mut self) -> Result<Option<Vec<u8>>, GarageError> {
- while !self.read_all && self.buf.len() < self.block_size {
+ while !self.read_all && self.buf.len() < self.max_block_size {
if let Some(block) = self.body.next().await {
let bytes = block?;
trace!("Body next: {} bytes", bytes.len());
@@ -293,12 +301,20 @@ impl BodyChunker {
}
if self.buf.len() == 0 {
Ok(None)
- } else if self.buf.len() <= self.block_size {
- let block = self.buf.drain(..).collect::<Vec<u8>>();
- Ok(Some(block))
} else {
- let block = self.buf.drain(..self.block_size).collect::<Vec<u8>>();
- Ok(Some(block))
+ let mut iter = FastCDC::with_eof(
+ self.buf.make_contiguous(),
+ self.min_block_size,
+ self.avg_block_size,
+ self.max_block_size,
+ self.read_all,
+ );
+ if let Some(Chunk { length, .. }) = iter.next() {
+ let block = self.buf.drain(..length).collect::<Vec<u8>>();
+ Ok(Some(block))
+ } else {
+ unreachable!("FastCDC returned not chunk")
+ }
}
}
}