From 71a13f366ea2b5c779bd2ea74385af1b03a2455b Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Tue, 16 Mar 2021 21:08:39 +0100 Subject: add content defined chuking --- src/api/Cargo.toml | 2 ++ src/api/s3_put.rs | 20 +++++++++++++------- 2 files changed, 15 insertions(+), 7 deletions(-) (limited to 'src') diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 0b824ca3..5bc170a3 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -22,10 +22,12 @@ bytes = "1.0" chrono = "0.4" crypto-mac = "0.10" err-derive = "0.3" +hash-roll = "0.3.0" hex = "0.4" hmac = "0.10" log = "0.4" md-5 = "0.9" +rand = "0.7" sha2 = "0.9" futures = "0.3" diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index c4e3b818..e2a1b54d 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -3,6 +3,7 @@ use std::fmt::Write; use std::sync::Arc; use futures::stream::*; +use hash_roll::{ChunkIncr, fastcdc::{FastCdc, FastCdcIncr}, gear_table::GEAR_64}; use hyper::{Body, Request, Response}; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; use sha2::Sha256; @@ -268,21 +269,26 @@ async fn put_block_meta( struct BodyChunker { body: Body, read_all: bool, - block_size: usize, + max_block_size: usize, buf: VecDeque, + chunker: FastCdcIncr<'static>, } impl BodyChunker { fn new(body: Body, block_size: usize) -> Self { + let max_block_size = block_size * 2; + let chunker = FastCdc::new(&GEAR_64, block_size as u64 / 2, block_size as u64, max_block_size as u64); + let chunker = (&chunker).into(); Self { body, read_all: false, - block_size, - buf: VecDeque::with_capacity(2 * block_size), + max_block_size, + buf: VecDeque::with_capacity(2 * max_block_size), + chunker, } } async fn next(&mut self) -> Result>, GarageError> { - while !self.read_all && self.buf.len() < self.block_size { + while !self.read_all && self.buf.len() < self.max_block_size { if let Some(block) = self.body.next().await { let bytes = block?; trace!("Body next: {} bytes", bytes.len()); @@ -293,11 +299,11 @@ impl BodyChunker { } if self.buf.len() == 0 { Ok(None) - } else if self.buf.len() <= self.block_size { - let block = self.buf.drain(..).collect::>(); + } else if let Some(index) = self.chunker.push(self.buf.make_contiguous()) { + let block = self.buf.drain(..index).collect::>(); Ok(Some(block)) } else { - let block = self.buf.drain(..self.block_size).collect::>(); + let block = self.buf.drain(..).collect::>(); Ok(Some(block)) } } -- cgit v1.2.3