From 1eb972b1ac8951faac234f56a30b34100d6ecff2 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Wed, 15 Dec 2021 11:26:43 +0100 Subject: Add compression using zstd (#173) fix #27 Co-authored-by: Trinity Pointard Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/173 Co-authored-by: trinity-1686a Co-committed-by: trinity-1686a --- Cargo.lock | 42 ++++++ Cargo.nix | 58 +++++++ doc/book/src/cookbook/real_world.md | 2 + doc/book/src/reference_manual/configuration.md | 26 ++++ script/test-smoke.sh | 39 ++--- src/model/Cargo.toml | 1 + src/model/block.rs | 201 ++++++++++++++++++++----- src/util/config.rs | 61 ++++++++ 8 files changed, 377 insertions(+), 53 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8f81c098..c07acac2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -106,6 +106,9 @@ name = "cc" version = "1.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79c2681d6594606957bbb8631c4b90a7fcaaa72cdb714743a437b156d6a7eedd" +dependencies = [ + "jobserver", +] [[package]] name = "cfg-if" @@ -458,6 +461,7 @@ dependencies = [ "serde_bytes", "sled", "tokio", + "zstd", ] [[package]] @@ -752,6 +756,15 @@ version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" +[[package]] +name = "jobserver" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af25a77299a7f711a01975c35a6a424eb6862092cc2d6c72c4ed6cbc56dfc1fa" +dependencies = [ + "libc", +] + [[package]] name = "kuska-handshake" version = "0.2.0" @@ -1652,3 +1665,32 @@ name = "xxhash-rust" version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e575e15bedf6e57b5c2d763ffc6c3c760143466cbd09d762d539680ab5992ded" + +[[package]] +name = "zstd" +version = "0.9.0+zstd.1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07749a5dc2cb6b36661290245e350f15ec3bbb304e493db54a1d354480522ccd" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "4.1.1+zstd.1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c91c90f2c593b003603e5e0493c837088df4469da25aafff8bce42ba48caf079" +dependencies = [ + "libc", + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "1.6.1+zstd.1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "615120c7a2431d16cf1cf979e7fc31ba7a5b5e5707b29c8a99e5dbf8a8392a33" +dependencies = [ + "cc", + "libc", +] diff --git a/Cargo.nix b/Cargo.nix index 2e0804d5..5b38c55e 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -196,6 +196,13 @@ in version = "1.0.71"; registry = "registry+https://github.com/rust-lang/crates.io-index"; src = fetchCratesIo { inherit name version; sha256 = "79c2681d6594606957bbb8631c4b90a7fcaaa72cdb714743a437b156d6a7eedd"; }; + features = builtins.concatLists [ + [ "jobserver" ] + [ "parallel" ] + ]; + dependencies = { + jobserver = rustPackages."registry+https://github.com/rust-lang/crates.io-index".jobserver."0.1.24" { inherit profileName; }; + }; }); "registry+https://github.com/rust-lang/crates.io-index".cfg-if."1.0.0" = overridableMkRustCrate (profileName: rec { @@ -695,6 +702,7 @@ in serde_bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.5" { inherit profileName; }; sled = rustPackages."registry+https://github.com/rust-lang/crates.io-index".sled."0.34.7" { inherit profileName; }; tokio = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.12.0" { inherit profileName; }; + zstd = rustPackages."registry+https://github.com/rust-lang/crates.io-index".zstd."0.9.0+zstd.1.5.0" { inherit profileName; }; }; }); @@ -1055,6 +1063,16 @@ in ]; }); + "registry+https://github.com/rust-lang/crates.io-index".jobserver."0.1.24" = overridableMkRustCrate (profileName: rec { + name = "jobserver"; + version = "0.1.24"; + registry = "registry+https://github.com/rust-lang/crates.io-index"; + src = fetchCratesIo { inherit name version; sha256 = "af25a77299a7f711a01975c35a6a424eb6862092cc2d6c72c4ed6cbc56dfc1fa"; }; + dependencies = { + ${ if hostPlatform.isUnix then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.103" { inherit profileName; }; + }; + }); + "registry+https://github.com/rust-lang/crates.io-index".kuska-handshake."0.2.0" = overridableMkRustCrate (profileName: rec { name = "kuska-handshake"; version = "0.2.0"; @@ -2351,4 +2369,44 @@ in ]; }); + "registry+https://github.com/rust-lang/crates.io-index".zstd."0.9.0+zstd.1.5.0" = overridableMkRustCrate (profileName: rec { + name = "zstd"; + version = "0.9.0+zstd.1.5.0"; + registry = "registry+https://github.com/rust-lang/crates.io-index"; + src = fetchCratesIo { inherit name version; sha256 = "07749a5dc2cb6b36661290245e350f15ec3bbb304e493db54a1d354480522ccd"; }; + dependencies = { + zstd_safe = rustPackages."registry+https://github.com/rust-lang/crates.io-index".zstd-safe."4.1.1+zstd.1.5.0" { inherit profileName; }; + }; + }); + + "registry+https://github.com/rust-lang/crates.io-index".zstd-safe."4.1.1+zstd.1.5.0" = overridableMkRustCrate (profileName: rec { + name = "zstd-safe"; + version = "4.1.1+zstd.1.5.0"; + registry = "registry+https://github.com/rust-lang/crates.io-index"; + src = fetchCratesIo { inherit name version; sha256 = "c91c90f2c593b003603e5e0493c837088df4469da25aafff8bce42ba48caf079"; }; + features = builtins.concatLists [ + [ "std" ] + ]; + dependencies = { + libc = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.103" { inherit profileName; }; + zstd_sys = rustPackages."registry+https://github.com/rust-lang/crates.io-index".zstd-sys."1.6.1+zstd.1.5.0" { inherit profileName; }; + }; + }); + + "registry+https://github.com/rust-lang/crates.io-index".zstd-sys."1.6.1+zstd.1.5.0" = overridableMkRustCrate (profileName: rec { + name = "zstd-sys"; + version = "1.6.1+zstd.1.5.0"; + registry = "registry+https://github.com/rust-lang/crates.io-index"; + src = fetchCratesIo { inherit name version; sha256 = "615120c7a2431d16cf1cf979e7fc31ba7a5b5e5707b29c8a99e5dbf8a8392a33"; }; + features = builtins.concatLists [ + [ "std" ] + ]; + dependencies = { + libc = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.103" { inherit profileName; }; + }; + buildDependencies = { + cc = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".cc."1.0.71" { profileName = "__noProfile"; }; + }; + }); + } diff --git a/doc/book/src/cookbook/real_world.md b/doc/book/src/cookbook/real_world.md index d1303d47..906fe31f 100644 --- a/doc/book/src/cookbook/real_world.md +++ b/doc/book/src/cookbook/real_world.md @@ -74,6 +74,8 @@ data_dir = "/var/lib/garage/data" replication_mode = "3" +compression_level = 2 + rpc_bind_addr = "[::]:3901" rpc_public_addr = ":3901" rpc_secret = "" diff --git a/doc/book/src/reference_manual/configuration.md b/doc/book/src/reference_manual/configuration.md index 0b1e7bc7..9f88b232 100644 --- a/doc/book/src/reference_manual/configuration.md +++ b/doc/book/src/reference_manual/configuration.md @@ -10,6 +10,8 @@ block_size = 1048576 replication_mode = "3" +compression_level = 1 + rpc_secret = "4425f5c26c5e11581d3223904324dcb5b5d5dfb14e5e7f35e38c595424f5f1e6" rpc_bind_addr = "[::]:3901" rpc_public_addr = "[fc00:1::1]:3901" @@ -98,6 +100,30 @@ Never run a Garage cluster where that is not the case.** Changing the `replication_mode` of a cluster might work (make sure to shut down all nodes and changing it everywhere at the time), but is not officially supported. +### `compression_level` + +Zstd compression level to use for storing blocks. + +Values between `1` (faster compression) and `19` (smaller file) are standard compression +levels for zstd. From `20` to `22`, compression levels are referred as "ultra" and must be +used with extra care as it will use lot of memory. A value of `0` will let zstd choose a +default value (currently `3`). Finally, zstd has also compression designed to be faster +than default compression levels, they range from `-1` (smaller file) to `-99` (faster +compression). + +If you do not specify a `compression_level` entry, garage will set it to `1` for you. With +this parameters, zstd consumes low amount of cpu and should work faster than line speed in +most situations, while saving some space and intra-cluster +bandwidth. + +If you want to totally deactivate zstd in garage, you can pass the special value `'none'`. No +zstd related code will be called, your chunks will be stored on disk without any processing. + +Compression is done synchronously, setting a value too high will add latency to write queries. + +This value can be different between nodes, compression is done by the node which receive the +API call. + #### `rpc_secret` Garage uses a secret key that is shared between all nodes of the cluster diff --git a/script/test-smoke.sh b/script/test-smoke.sh index 3146e637..1f900ece 100755 --- a/script/test-smoke.sh +++ b/script/test-smoke.sh @@ -30,6 +30,11 @@ dd if=/dev/urandom of=/tmp/garage.1.rnd bs=1k count=2 # No multipart, inline sto dd if=/dev/urandom of=/tmp/garage.2.rnd bs=1M count=5 # No multipart but file will be chunked dd if=/dev/urandom of=/tmp/garage.3.rnd bs=1M count=10 # by default, AWS starts using multipart at 8MB +# data of lower entropy, to test compression +dd if=/dev/urandom bs=1k count=2 | base64 -w0 > /tmp/garage.1.b64 +dd if=/dev/urandom bs=1M count=5 | base64 -w0 > /tmp/garage.2.b64 +dd if=/dev/urandom bs=1M count=10 | base64 -w0 > /tmp/garage.3.b64 + echo "🧪 S3 API testing..." # AWS @@ -37,11 +42,11 @@ if [ -z "$SKIP_AWS" ]; then echo "🛠️ Testing with awscli" source ${SCRIPT_FOLDER}/dev-env-aws.sh aws s3 ls - for idx in $(seq 1 3); do - aws s3 cp "/tmp/garage.$idx.rnd" "s3://eprouvette/&+-é\"/garage.$idx.aws" + for idx in {1..3}.{rnd,b64}; do + aws s3 cp "/tmp/garage.$idx" "s3://eprouvette/&+-é\"/garage.$idx.aws" aws s3 ls s3://eprouvette aws s3 cp "s3://eprouvette/&+-é\"/garage.$idx.aws" "/tmp/garage.$idx.dl" - diff /tmp/garage.$idx.rnd /tmp/garage.$idx.dl + diff /tmp/garage.$idx /tmp/garage.$idx.dl rm /tmp/garage.$idx.dl aws s3 rm "s3://eprouvette/&+-é\"/garage.$idx.aws" done @@ -52,11 +57,11 @@ if [ -z "$SKIP_S3CMD" ]; then echo "🛠️ Testing with s3cmd" source ${SCRIPT_FOLDER}/dev-env-s3cmd.sh s3cmd ls - for idx in $(seq 1 3); do - s3cmd put "/tmp/garage.$idx.rnd" "s3://eprouvette/&+-é\"/garage.$idx.s3cmd" + for idx in {1..3}.{rnd,b64}; do + s3cmd put "/tmp/garage.$idx" "s3://eprouvette/&+-é\"/garage.$idx.s3cmd" s3cmd ls s3://eprouvette s3cmd get "s3://eprouvette/&+-é\"/garage.$idx.s3cmd" "/tmp/garage.$idx.dl" - diff /tmp/garage.$idx.rnd /tmp/garage.$idx.dl + diff /tmp/garage.$idx /tmp/garage.$idx.dl rm /tmp/garage.$idx.dl s3cmd rm "s3://eprouvette/&+-é\"/garage.$idx.s3cmd" done @@ -67,11 +72,11 @@ if [ -z "$SKIP_MC" ]; then echo "🛠️ Testing with mc (minio client)" source ${SCRIPT_FOLDER}/dev-env-mc.sh mc ls garage/ - for idx in $(seq 1 3); do - mc cp "/tmp/garage.$idx.rnd" "garage/eprouvette/&+-é\"/garage.$idx.mc" + for idx in {1..3}.{rnd,b64}; do + mc cp "/tmp/garage.$idx" "garage/eprouvette/&+-é\"/garage.$idx.mc" mc ls garage/eprouvette mc cp "garage/eprouvette/&+-é\"/garage.$idx.mc" "/tmp/garage.$idx.dl" - diff /tmp/garage.$idx.rnd /tmp/garage.$idx.dl + diff /tmp/garage.$idx /tmp/garage.$idx.dl rm /tmp/garage.$idx.dl mc rm "garage/eprouvette/&+-é\"/garage.$idx.mc" done @@ -82,13 +87,13 @@ if [ -z "$SKIP_RCLONE" ]; then echo "🛠️ Testing with rclone" source ${SCRIPT_FOLDER}/dev-env-rclone.sh rclone lsd garage: - for idx in $(seq 1 3); do - cp /tmp/garage.$idx.rnd /tmp/garage.$idx.dl + for idx in {1..3}.{rnd,b64}; do + cp /tmp/garage.$idx /tmp/garage.$idx.dl rclone copy "/tmp/garage.$idx.dl" "garage:eprouvette/&+-é\"/" rm /tmp/garage.$idx.dl rclone ls garage:eprouvette rclone copy "garage:eprouvette/&+-é\"/garage.$idx.dl" "/tmp/" - diff /tmp/garage.$idx.rnd /tmp/garage.$idx.dl + diff /tmp/garage.$idx /tmp/garage.$idx.dl rm /tmp/garage.$idx.dl rclone delete "garage:eprouvette/&+-é\"/garage.$idx.dl" done @@ -100,17 +105,17 @@ if [ -z "$SKIP_DUCK" ]; then source ${SCRIPT_FOLDER}/dev-env-duck.sh duck --list garage:/ duck --mkdir "garage:/eprouvette/duck" - for idx in $(seq 1 3); do - duck --verbose --upload "garage:/eprouvette/duck/" "/tmp/garage.$idx.rnd" + for idx in {1..3}.{rnd,b64}; do + duck --verbose --upload "garage:/eprouvette/duck/" "/tmp/garage.$idx" duck --list garage:/eprouvette/duck/ - duck --download "garage:/eprouvette/duck/garage.$idx.rnd" "/tmp/garage.$idx.dl" - diff /tmp/garage.$idx.rnd /tmp/garage.$idx.dl + duck --download "garage:/eprouvette/duck/garage.$idx" "/tmp/garage.$idx.dl" + diff /tmp/garage.$idx /tmp/garage.$idx.dl rm /tmp/garage.$idx.dl duck --delete "garage:/eprouvette/duck/garage.$idx.dk" done fi -rm /tmp/garage.{1,2,3}.rnd +rm /tmp/garage.{1..3}.{rnd,b64} if [ -z "$SKIP_AWS" ]; then echo "🧪 Website Testing" diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 7979a79a..1d695192 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -23,6 +23,7 @@ arc-swap = "1.0" hex = "0.4" log = "0.4" rand = "0.8" +zstd = { version = "0.9", default-features = false } sled = "0.34" diff --git a/src/model/block.rs b/src/model/block.rs index 8b1919bb..6df8e265 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -11,6 +11,7 @@ use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::{watch, Mutex, Notify}; +use zstd::stream::{decode_all as zstd_decode, Encoder}; use garage_util::data::*; use garage_util::error::*; @@ -55,22 +56,81 @@ pub enum BlockRpc { GetBlock(Hash), /// Message to send a block of data, either because requested, of for first delivery of new /// block - PutBlock(PutBlockMessage), + PutBlock { + hash: Hash, + data: DataBlock, + }, /// Ask other node if they should have this block, but don't actually have it NeedBlockQuery(Hash), /// Response : whether the node do require that block NeedBlockReply(bool), } -/// Structure used to send a block +/// A possibly compressed block of data #[derive(Debug, Serialize, Deserialize)] -pub struct PutBlockMessage { - /// Hash of the block - pub hash: Hash, +pub enum DataBlock { + /// Uncompressed data + Plain(#[serde(with = "serde_bytes")] Vec), + /// Data compressed with zstd + Compressed(#[serde(with = "serde_bytes")] Vec), +} + +impl DataBlock { + /// Query whether this block is compressed + pub fn is_compressed(&self) -> bool { + matches!(self, DataBlock::Compressed(_)) + } + + /// Get the inner, possibly compressed buffer. You should probably use [`DataBlock::verify_get`] + /// instead + pub fn inner_buffer(&self) -> &[u8] { + use DataBlock::*; + let (Plain(ref res) | Compressed(ref res)) = self; + res + } - /// Content of the block - #[serde(with = "serde_bytes")] - pub data: Vec, + /// Get the buffer, possibly decompressing it, and verify it's integrity. + /// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system + /// is used instead. + pub fn verify_get(self, hash: Hash) -> Result, Error> { + match self { + DataBlock::Plain(data) => { + if blake2sum(&data) == hash { + Ok(data) + } else { + Err(Error::CorruptData(hash)) + } + } + DataBlock::Compressed(data) => { + zstd_decode(&data[..]).map_err(|_| Error::CorruptData(hash)) + } + } + } + + /// Verify data integrity. Allocate less than [`DataBlock::verify_get`] and don't consume self, but + /// does not return the buffer content. + pub fn verify(&self, hash: Hash) -> Result<(), Error> { + match self { + DataBlock::Plain(data) => { + if blake2sum(data) == hash { + Ok(()) + } else { + Err(Error::CorruptData(hash)) + } + } + DataBlock::Compressed(data) => zstd::stream::copy_decode(&data[..], std::io::sink()) + .map_err(|_| Error::CorruptData(hash)), + } + } + + pub fn from_buffer(data: Vec, level: Option) -> DataBlock { + if let Some(level) = level { + if let Ok(data) = zstd_encode(&data[..], level) { + return DataBlock::Compressed(data); + } + } + DataBlock::Plain(data) + } } impl Rpc for BlockRpc { @@ -138,10 +198,8 @@ impl BlockManager { block_manager } - // ---- Public interface ---- - - /// Ask nodes that might have a block for it - pub async fn rpc_get_block(&self, hash: &Hash) -> Result, Error> { + /// Ask nodes that might have a (possibly compressed) block for it + async fn rpc_get_raw_block(&self, hash: &Hash) -> Result { let who = self.replication.read_nodes(hash); let resps = self .system @@ -158,8 +216,8 @@ impl BlockManager { .await?; for resp in resps { - if let BlockRpc::PutBlock(msg) = resp { - return Ok(msg.data); + if let BlockRpc::PutBlock { data, .. } = resp { + return Ok(data); } } Err(Error::Message(format!( @@ -168,15 +226,30 @@ impl BlockManager { ))) } + // ---- Public interface ---- + + /// Ask nodes that might have a block for it + pub async fn rpc_get_block(&self, hash: &Hash) -> Result, Error> { + self.rpc_get_raw_block(hash).await?.verify_get(*hash) + } + /// Send block to nodes that should have it pub async fn rpc_put_block(&self, hash: Hash, data: Vec) -> Result<(), Error> { let who = self.replication.write_nodes(&hash); + let compression_level = self + .garage + .load() + .as_ref() + .unwrap() + .config + .compression_level; + let data = DataBlock::from_buffer(data, compression_level); self.system .rpc .try_call_many( &self.endpoint, &who[..], - BlockRpc::PutBlock(PutBlockMessage { hash, data }), + BlockRpc::PutBlock { hash, data }, RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(self.replication.write_quorum()) .with_timeout(BLOCK_RW_TIMEOUT), @@ -306,7 +379,7 @@ impl BlockManager { // ---- Reading and writing blocks locally ---- /// Write a block to disk - async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result { + async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result { self.mutation_lock .lock() .await @@ -316,21 +389,31 @@ impl BlockManager { /// Read block from disk, verifying it's integrity async fn read_block(&self, hash: &Hash) -> Result { - let path = self.block_path(hash); - - let mut f = match fs::File::open(&path).await { - Ok(f) => f, + let mut path = self.block_path(hash); + let compressed = match self.is_block_compressed(hash).await { + Ok(c) => c, Err(e) => { // Not found but maybe we should have had it ?? self.put_to_resync(hash, 2 * BLOCK_RW_TIMEOUT)?; return Err(Into::into(e)); } }; + if compressed { + path.set_extension("zst"); + } + let mut f = fs::File::open(&path).await?; + let mut data = vec![]; f.read_to_end(&mut data).await?; drop(f); - if blake2sum(&data[..]) != *hash { + let data = if compressed { + DataBlock::Compressed(data) + } else { + DataBlock::Plain(data) + }; + + if data.verify(*hash).is_err() { self.mutation_lock .lock() .await @@ -340,7 +423,7 @@ impl BlockManager { return Err(Error::CorruptData(*hash)); } - Ok(BlockRpc::PutBlock(PutBlockMessage { hash: *hash, data })) + Ok(BlockRpc::PutBlock { hash: *hash, data }) } /// Check if this node should have a block, but don't actually have it @@ -362,13 +445,25 @@ impl BlockManager { path } - /// Utility: give the full path where a block should be found + /// Utility: give the full path where a block should be found, minus extension if block is + /// compressed fn block_path(&self, hash: &Hash) -> PathBuf { let mut path = self.block_dir(hash); path.push(hex::encode(hash.as_ref())); path } + /// Utility: check if block is stored compressed. Error if block is not stored + async fn is_block_compressed(&self, hash: &Hash) -> Result { + let mut path = self.block_path(hash); + path.set_extension("zst"); + if fs::metadata(&path).await.is_ok() { + return Ok(true); + } + path.set_extension(""); + fs::metadata(&path).await.map(|_| false).map_err(Into::into) + } + // ---- Resync loop ---- pub fn spawn_background_worker(self: Arc) { @@ -550,8 +645,8 @@ impl BlockManager { hash ); - let block_data = self.rpc_get_block(hash).await?; - self.write_block(hash, &block_data[..]).await?; + let block_data = self.rpc_get_raw_block(hash).await?; + self.write_block(hash, &block_data).await?; } Ok(()) @@ -602,6 +697,7 @@ impl BlockManager { }; let ent_type = data_dir_ent.file_type().await?; + let name = name.strip_suffix(".zst").unwrap_or(&name); if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() { state = self .for_each_file_rec(&data_dir_ent.path(), state, f, must_exit) @@ -631,7 +727,7 @@ impl EndpointHandler for BlockManager { _from: NodeID, ) -> Result { match message { - BlockRpc::PutBlock(m) => self.write_block(&m.hash, &m.data).await, + BlockRpc::PutBlock { hash, data } => self.write_block(hash, data).await, BlockRpc::GetBlock(h) => self.read_block(h).await, BlockRpc::NeedBlockQuery(h) => self.need_block(h).await.map(BlockRpc::NeedBlockReply), _ => Err(Error::BadRpc("Unexpected RPC message".to_string())), @@ -650,9 +746,7 @@ impl BlockManagerLocked { hash: &Hash, mgr: &BlockManager, ) -> Result { - let path = mgr.block_path(hash); - - let exists = fs::metadata(&path).await.is_ok(); + let exists = mgr.is_block_compressed(hash).await.is_ok(); let needed = mgr.get_block_rc(hash)?; Ok(BlockStatus { exists, needed }) @@ -661,16 +755,31 @@ impl BlockManagerLocked { async fn write_block( &self, hash: &Hash, - data: &[u8], + data: &DataBlock, mgr: &BlockManager, ) -> Result { + let compressed = data.is_compressed(); + let data = data.inner_buffer(); + let mut path = mgr.block_dir(hash); fs::create_dir_all(&path).await?; path.push(hex::encode(hash)); - if fs::metadata(&path).await.is_ok() { - return Ok(BlockRpc::Ok); - } + let to_delete = match (mgr.is_block_compressed(hash).await, compressed) { + (Ok(true), _) => return Ok(BlockRpc::Ok), + (Ok(false), false) => return Ok(BlockRpc::Ok), + (Ok(false), true) => { + let path_to_delete = path.clone(); + path.set_extension("zst"); + Some(path_to_delete) + } + (Err(_), compressed) => { + if compressed { + path.set_extension("zst"); + } + None + } + }; let mut path2 = path.clone(); path2.set_extension("tmp"); @@ -679,6 +788,9 @@ impl BlockManagerLocked { drop(f); fs::rename(path2, path).await?; + if let Some(to_delete) = to_delete { + fs::remove_file(to_delete).await?; + } Ok(BlockRpc::Ok) } @@ -688,9 +800,14 @@ impl BlockManagerLocked { "Block {:?} is corrupted. Renaming to .corrupted and resyncing.", hash ); - let path = mgr.block_path(hash); + let mut path = mgr.block_path(hash); let mut path2 = path.clone(); - path2.set_extension("corrupted"); + if mgr.is_block_compressed(hash).await? { + path.set_extension("zst"); + path2.set_extension("zst.corrupted"); + } else { + path2.set_extension("corrupted"); + } fs::rename(path, path2).await?; Ok(()) } @@ -699,7 +816,10 @@ impl BlockManagerLocked { let BlockStatus { exists, needed } = self.check_block_status(hash, mgr).await?; if exists && needed.is_deletable() { - let path = mgr.block_path(hash); + let mut path = mgr.block_path(hash); + if mgr.is_block_compressed(hash).await? { + path.set_extension("zst"); + } fs::remove_file(path).await?; } Ok(()) @@ -806,3 +926,12 @@ impl RcEntry { !self.is_deletable() } } + +fn zstd_encode(mut source: R, level: i32) -> std::io::Result> { + let mut result = Vec::::new(); + let mut encoder = Encoder::new(&mut result, level)?; + encoder.include_checksum(true)?; + std::io::copy(&mut source, &mut encoder)?; + encoder.finish()?; + Ok(result) +} diff --git a/src/util/config.rs b/src/util/config.rs index 33802012..61a749e3 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -30,6 +30,13 @@ pub struct Config { // (we can add more aliases for this later) pub replication_mode: String, + /// Zstd compression level used on data blocks + #[serde( + deserialize_with = "deserialize_compression", + default = "default_compression" + )] + pub compression_level: Option, + /// RPC secret key: 32 bytes hex encoded pub rpc_secret: String, @@ -123,3 +130,57 @@ where Ok(ret) } + +fn default_compression() -> Option { + Some(1) +} + +fn deserialize_compression<'de, D>(deserializer: D) -> Result, D::Error> +where + D: de::Deserializer<'de>, +{ + use std::convert::TryFrom; + + struct OptionVisitor; + + impl<'de> serde::de::Visitor<'de> for OptionVisitor { + type Value = Option; + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("int or 'none'") + } + + fn visit_str(self, value: &str) -> Result + where + E: de::Error, + { + if value.eq_ignore_ascii_case("none") { + Ok(None) + } else { + Err(E::custom(format!( + "Invalid compression level: '{}', should be a number, or 'none'", + value + ))) + } + } + + fn visit_i64(self, v: i64) -> Result + where + E: de::Error, + { + i32::try_from(v) + .map(Some) + .map_err(|_| E::custom("Compression level out of bound".to_owned())) + } + + fn visit_u64(self, v: u64) -> Result + where + E: de::Error, + { + i32::try_from(v) + .map(Some) + .map_err(|_| E::custom("Compression level out of bound".to_owned())) + } + } + + deserializer.deserialize_any(OptionVisitor) +} -- cgit v1.2.3