From 163ee977a0cca3e9995ee24863fbd5dc3d0f4778 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Apr 2021 23:37:41 +0200 Subject: Small improvements to compression code --- script/dev-cluster.sh | 3 +++ src/model/block.rs | 28 ++++++++++++++++++---------- src/util/config.rs | 3 +++ 3 files changed, 24 insertions(+), 10 deletions(-) diff --git a/script/dev-cluster.sh b/script/dev-cluster.sh index addbf4b4..67c44c7d 100755 --- a/script/dev-cluster.sh +++ b/script/dev-cluster.sh @@ -35,6 +35,9 @@ data_replication_factor = 3 meta_replication_factor = 3 meta_epidemic_fanout = 3 +enable_compression = true +compressin_level = 10 + [s3_api] api_bind_addr = "0.0.0.0:$((3910+$count))" # the S3 API port, HTTP without TLS. Add a reverse proxy for the TLS part. s3_region = "garage" # set this to anything. S3 API calls will fail if they are not made against the region set here. diff --git a/src/model/block.rs b/src/model/block.rs index 699ff32d..5e4347f6 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -196,12 +196,12 @@ impl BlockManager { let mut f = fs::File::create(path.clone()).await?; f.write_all(&buffer).await?; + drop(f); if clean_plain { path.set_extension(""); fs::remove_file(path).await?; } - drop(f); Ok(Message::Ok) } @@ -516,17 +516,25 @@ impl BlockManager { /// Send block to nodes that should have it pub async fn rpc_put_block(&self, hash: Hash, data: Vec) -> Result<(), Error> { let garage = self.garage.load_full().unwrap(); - let compressed = zstd_encode(&data[..], garage.config.compression_level); - let message = if compressed.is_ok() && compressed.as_ref().unwrap().len() < data.len() { - Message::PutBlock { - hash, - data: BlockData::Compressed(compressed.unwrap()), - } + + let compressed = if garage.config.enable_compression { + zstd_encode(&data[..], garage.config.compression_level).ok() + } else { + None + }; + + // If compressed data is not less than 7/8 of the size of the original data, i.e. if we + // don't gain a significant margin by compressing, then we store the plain data instead + // so that we don't lose time decompressing it on reads. + let block_data = if compressed.is_some() && compressed.as_ref().unwrap().len() < (data.len() * 7) / 8 { + BlockData::Compressed(compressed.unwrap()) } else { - Message::PutBlock { + BlockData::Plain(data) + }; + + let message = Message::PutBlock { hash, - data: BlockData::Plain(data), - } + data: block_data, }; let who = self.replication.write_nodes(&hash); self.rpc_client diff --git a/src/util/config.rs b/src/util/config.rs index 29901d46..a893cd66 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -45,6 +45,9 @@ pub struct Config { #[serde(default = "default_replication_factor")] pub data_replication_factor: usize, + /// Enable Zstd compression of block data + pub enable_compression: bool, + /// Zstd compression level used on data blocks #[serde(default)] pub compression_level: i32, -- cgit v1.2.3