From 6e0cb2dfb6db182a07debe4f7b89f7780bcdf5e4 Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Tue, 16 Mar 2021 21:08:39 +0100 Subject: add content defined chuking --- src/api/s3_put.rs | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) (limited to 'src/api/s3_put.rs') diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index c4e3b818..e2a1b54d 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -3,6 +3,7 @@ use std::fmt::Write; use std::sync::Arc; use futures::stream::*; +use hash_roll::{ChunkIncr, fastcdc::{FastCdc, FastCdcIncr}, gear_table::GEAR_64}; use hyper::{Body, Request, Response}; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; use sha2::Sha256; @@ -268,21 +269,26 @@ async fn put_block_meta( struct BodyChunker { body: Body, read_all: bool, - block_size: usize, + max_block_size: usize, buf: VecDeque, + chunker: FastCdcIncr<'static>, } impl BodyChunker { fn new(body: Body, block_size: usize) -> Self { + let max_block_size = block_size * 2; + let chunker = FastCdc::new(&GEAR_64, block_size as u64 / 2, block_size as u64, max_block_size as u64); + let chunker = (&chunker).into(); Self { body, read_all: false, - block_size, - buf: VecDeque::with_capacity(2 * block_size), + max_block_size, + buf: VecDeque::with_capacity(2 * max_block_size), + chunker, } } async fn next(&mut self) -> Result>, GarageError> { - while !self.read_all && self.buf.len() < self.block_size { + while !self.read_all && self.buf.len() < self.max_block_size { if let Some(block) = self.body.next().await { let bytes = block?; trace!("Body next: {} bytes", bytes.len()); @@ -293,11 +299,11 @@ impl BodyChunker { } if self.buf.len() == 0 { Ok(None) - } else if self.buf.len() <= self.block_size { - let block = self.buf.drain(..).collect::>(); + } else if let Some(index) = self.chunker.push(self.buf.make_contiguous()) { + let block = self.buf.drain(..index).collect::>(); Ok(Some(block)) } else { - let block = self.buf.drain(..self.block_size).collect::>(); + let block = self.buf.drain(..).collect::>(); Ok(Some(block)) } } -- cgit v1.2.3 From 47d0aee9f807e0ded10a01d045a8930ff112224b Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Wed, 17 Mar 2021 01:31:35 +0100 Subject: change crate used for cdc previous one seemed to output incorrect results --- src/api/s3_put.rs | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) (limited to 'src/api/s3_put.rs') diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index e2a1b54d..f5607c9f 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -2,8 +2,8 @@ use std::collections::{BTreeMap, VecDeque}; use std::fmt::Write; use std::sync::Arc; +use fastcdc::{Chunk, FastCDC}; use futures::stream::*; -use hash_roll::{ChunkIncr, fastcdc::{FastCdc, FastCdcIncr}, gear_table::GEAR_64}; use hyper::{Body, Request, Response}; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; use sha2::Sha256; @@ -269,22 +269,24 @@ async fn put_block_meta( struct BodyChunker { body: Body, read_all: bool, + min_block_size: usize, + avg_block_size: usize, max_block_size: usize, buf: VecDeque, - chunker: FastCdcIncr<'static>, } impl BodyChunker { fn new(body: Body, block_size: usize) -> Self { + let min_block_size = block_size / 4 * 3; + let avg_block_size = block_size; let max_block_size = block_size * 2; - let chunker = FastCdc::new(&GEAR_64, block_size as u64 / 2, block_size as u64, max_block_size as u64); - let chunker = (&chunker).into(); Self { body, read_all: false, + min_block_size, + avg_block_size, max_block_size, buf: VecDeque::with_capacity(2 * max_block_size), - chunker, } } async fn next(&mut self) -> Result>, GarageError> { @@ -299,12 +301,14 @@ impl BodyChunker { } if self.buf.len() == 0 { Ok(None) - } else if let Some(index) = self.chunker.push(self.buf.make_contiguous()) { - let block = self.buf.drain(..index).collect::>(); - Ok(Some(block)) } else { - let block = self.buf.drain(..).collect::>(); - Ok(Some(block)) + let mut iter = FastCDC::with_eof(self.buf.make_contiguous(), self.min_block_size, self.avg_block_size, self.max_block_size, self.read_all); + if let Some(Chunk {length, ..}) = iter.next() { + let block = self.buf.drain(..length).collect::>(); + Ok(Some(block)) + } else { + Ok(None) + } } } } -- cgit v1.2.3 From b3b0b20d722bd001ef971b029a2165a2407fec83 Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Tue, 6 Apr 2021 02:54:00 +0200 Subject: run fmt --- src/api/s3_put.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) (limited to 'src/api/s3_put.rs') diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index f5607c9f..d2702940 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -302,8 +302,14 @@ impl BodyChunker { if self.buf.len() == 0 { Ok(None) } else { - let mut iter = FastCDC::with_eof(self.buf.make_contiguous(), self.min_block_size, self.avg_block_size, self.max_block_size, self.read_all); - if let Some(Chunk {length, ..}) = iter.next() { + let mut iter = FastCDC::with_eof( + self.buf.make_contiguous(), + self.min_block_size, + self.avg_block_size, + self.max_block_size, + self.read_all, + ); + if let Some(Chunk { length, .. }) = iter.next() { let block = self.buf.drain(..length).collect::>(); Ok(Some(block)) } else { -- cgit v1.2.3 From 6cbc8d6ec93b832a301a5402f1b1ae70b07a2be3 Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Tue, 6 Apr 2021 16:53:39 +0200 Subject: mark branch as unreachable --- src/api/s3_put.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/api/s3_put.rs') diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index d2702940..d023bcef 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -313,7 +313,7 @@ impl BodyChunker { let block = self.buf.drain(..length).collect::>(); Ok(Some(block)) } else { - Ok(None) + unreachable!("FastCDC returned not chunk") } } } -- cgit v1.2.3