diff options
author | Alex <alex@adnab.me> | 2021-04-14 23:27:33 +0200 |
---|---|---|
committer | Alex <alex@adnab.me> | 2021-04-14 23:27:33 +0200 |
commit | 307858bf0a077e6bb7ce77d9f5ca56c4ae2a5e63 (patch) | |
tree | ed1fa4c1db9bde2ead40ac60c29b681ed21f0de7 /src/model/block.rs | |
parent | 76efb4999d41da71b1c89093ed651a9e162801c6 (diff) | |
parent | d4fd07400008247c475ffe2c66bb2bd57bf7ab5c (diff) | |
download | garage-307858bf0a077e6bb7ce77d9f5ca56c4ae2a5e63.tar.gz garage-307858bf0a077e6bb7ce77d9f5ca56c4ae2a5e63.zip |
Merge pull request 'Compress with zstd' (#44) from trinity-1686a/garage:zstd-block into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/44
Diffstat (limited to 'src/model/block.rs')
-rw-r--r-- | src/model/block.rs | 177 |
1 files changed, 141 insertions, 36 deletions
diff --git a/src/model/block.rs b/src/model/block.rs index 5f428fe1..699ff32d 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::{watch, Mutex, Notify}; +use zstd::stream::{decode_all as zstd_decode, Encoder}; use garage_util::data::*; use garage_util::error::Error; @@ -42,22 +43,37 @@ pub enum Message { GetBlock(Hash), /// Message to send a block of data, either because requested, of for first delivery of new /// block - PutBlock(PutBlockMessage), + PutBlock { + hash: Hash, + data: BlockData, + }, /// Ask other node if they should have this block, but don't actually have it NeedBlockQuery(Hash), /// Response : whether the node do require that block NeedBlockReply(bool), } -/// Structure used to send a block +/// A possibly compressed block of data #[derive(Debug, Serialize, Deserialize)] -pub struct PutBlockMessage { - /// Hash of the block - pub hash: Hash, +pub enum BlockData { + Plain(#[serde(with = "serde_bytes")] Vec<u8>), + Compressed(#[serde(with = "serde_bytes")] Vec<u8>), +} + +impl BlockData { + pub fn is_compressed(&self) -> bool { + match self { + BlockData::Plain(_) => false, + BlockData::Compressed(_) => true, + } + } - /// Content of the block - #[serde(with = "serde_bytes")] - pub data: Vec<u8>, + pub fn buffer(&self) -> &Vec<u8> { + match self { + BlockData::Plain(b) => b, + BlockData::Compressed(b) => b, + } + } } impl RpcMessage for Message {} @@ -134,7 +150,7 @@ impl BlockManager { async fn handle(self: Arc<Self>, msg: &Message) -> Result<Message, Error> { match msg { - Message::PutBlock(m) => self.write_block(&m.hash, &m.data).await, + Message::PutBlock { hash, data } => self.write_block(&hash, &data).await, Message::GetBlock(h) => self.read_block(h).await, Message::NeedBlockQuery(h) => self.need_block(h).await.map(Message::NeedBlockReply), _ => Err(Error::BadRPC(format!("Unexpected RPC message"))), @@ -157,54 +173,96 @@ impl BlockManager { } /// Write a block to disk - async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> { + pub async fn write_block(&self, hash: &Hash, data: &BlockData) -> Result<Message, Error> { + let mut path = self.block_dir(hash); + let _lock = self.data_dir_lock.lock().await; - let mut path = self.block_dir(hash); - fs::create_dir_all(&path).await?; + let clean_plain = match self.is_block_compressed(hash).await { + Ok(true) => return Ok(Message::Ok), + Ok(false) if !data.is_compressed() => return Ok(Message::Ok), // we have a plain block, and the provided block is not compressed either + Ok(false) => true, + Err(_) => false, + }; + fs::create_dir_all(&path).await?; path.push(hex::encode(hash)); - if fs::metadata(&path).await.is_ok() { - return Ok(Message::Ok); + + if data.is_compressed() { + path.set_extension("zst"); } - let mut f = fs::File::create(path).await?; - f.write_all(data).await?; + let buffer = data.buffer(); + + let mut f = fs::File::create(path.clone()).await?; + f.write_all(&buffer).await?; + + if clean_plain { + path.set_extension(""); + fs::remove_file(path).await?; + } drop(f); Ok(Message::Ok) } /// Read block from disk, verifying it's integrity - async fn read_block(&self, hash: &Hash) -> Result<Message, Error> { - let path = self.block_path(hash); + pub async fn read_block(&self, hash: &Hash) -> Result<Message, Error> { + let mut path = self.block_path(hash); - let mut f = match fs::File::open(&path).await { - Ok(f) => f, - Err(e) => { + let mut data = vec![]; + let block = match self.is_block_compressed(hash).await { + Ok(false) => { + let f = fs::File::open(&path).await; + f.map(|f| (f, false)).map_err(Into::into) + } + Ok(true) => { + path.set_extension("zst"); + let f = fs::File::open(&path).await; + f.map(|f| (f, true)).map_err(Into::into) + } + Err(e) => Err(e), + }; + let (mut f, compressed) = match block { + Ok(ok) => ok, + e => { // Not found but maybe we should have had it ?? self.put_to_resync(hash, Duration::from_millis(0))?; - return Err(Into::into(e)); + e? } }; - let mut data = vec![]; f.read_to_end(&mut data).await?; drop(f); - if blake2sum(&data[..]) != *hash { + let sum_ok = if compressed { + zstd_check_checksum(&data[..]) + } else { + blake2sum(&data[..]) == *hash + }; + if !sum_ok { let _lock = self.data_dir_lock.lock().await; warn!( "Block {:?} is corrupted. Renaming to .corrupted and resyncing.", hash ); let mut path2 = path.clone(); - path2.set_extension(".corrupted"); + path2.set_extension("corrupted"); fs::rename(path, path2).await?; self.put_to_resync(&hash, Duration::from_millis(0))?; return Err(Error::CorruptData(*hash)); } - Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data })) + if compressed { + Ok(Message::PutBlock { + hash: *hash, + data: BlockData::Compressed(data), + }) + } else { + Ok(Message::PutBlock { + hash: *hash, + data: BlockData::Plain(data), + }) + } } /// Check if this node should have a block, but don't actually have it @@ -215,14 +273,23 @@ impl BlockManager { .map(|x| u64_from_be_bytes(x) > 0) .unwrap_or(false); if needed { - let path = self.block_path(hash); - let exists = fs::metadata(&path).await.is_ok(); + let exists = self.is_block_compressed(hash).await.is_ok(); Ok(!exists) } else { Ok(false) } } + async fn is_block_compressed(&self, hash: &Hash) -> Result<bool, Error> { + let mut path = self.block_path(hash); + path.set_extension("zst"); + if fs::metadata(&path).await.is_ok() { + return Ok(true); + } + path.set_extension(""); + fs::metadata(&path).await.map(|_| false).map_err(Into::into) + } + fn block_dir(&self, hash: &Hash) -> PathBuf { let mut path = self.data_dir.clone(); path.push(hex::encode(&hash.as_slice()[0..1])); @@ -323,7 +390,7 @@ impl BlockManager { let path = self.block_path(hash); - let exists = fs::metadata(&path).await.is_ok(); + let exists = self.is_block_compressed(hash).await.is_ok(); let needed = self .rc .get(hash.as_ref())? @@ -402,15 +469,14 @@ impl BlockManager { // TODO find a way to not do this if they are sending it to us // Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay // between the RC being incremented and this part being called. - let block_data = self.rpc_get_block(&hash).await?; - self.write_block(hash, &block_data[..]).await?; + let block = self.rpc_get_raw_block(&hash).await?; + self.write_block(hash, &block).await?; } Ok(()) } - /// Ask nodes that might have a block for it - pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> { + async fn rpc_get_raw_block(&self, hash: &Hash) -> Result<BlockData, Error> { let who = self.replication.read_nodes(&hash); let resps = self .rpc_client @@ -424,8 +490,9 @@ impl BlockManager { .await?; for resp in resps { - if let Message::PutBlock(msg) = resp { - return Ok(msg.data); + match resp { + Message::PutBlock { data, .. } => return Ok(data), + _ => {} } } Err(Error::Message(format!( @@ -434,13 +501,38 @@ impl BlockManager { ))) } + /// Ask nodes that might have a block for it + pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> { + self.rpc_get_raw_block(hash) + .await + .and_then(|data| match data { + BlockData::Plain(data) => Ok(data), + BlockData::Compressed(data) => { + zstd_decode(&data[..]).map_err(|_| Error::CorruptData(*hash)) + } + }) + } + /// Send block to nodes that should have it pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> { + let garage = self.garage.load_full().unwrap(); + let compressed = zstd_encode(&data[..], garage.config.compression_level); + let message = if compressed.is_ok() && compressed.as_ref().unwrap().len() < data.len() { + Message::PutBlock { + hash, + data: BlockData::Compressed(compressed.unwrap()), + } + } else { + Message::PutBlock { + hash, + data: BlockData::Plain(data), + } + }; let who = self.replication.write_nodes(&hash); self.rpc_client .try_call_many( &who[..], - Message::PutBlock(PutBlockMessage { hash, data }), + message, RequestStrategy::with_quorum(self.replication.write_quorum()) .with_timeout(BLOCK_RW_TIMEOUT), ) @@ -537,3 +629,16 @@ fn u64_from_be_bytes<T: AsRef<[u8]>>(bytes: T) -> u64 { x8.copy_from_slice(bytes.as_ref()); u64::from_be_bytes(x8) } + +fn zstd_check_checksum<R: std::io::Read>(source: R) -> bool { + zstd::stream::copy_decode(source, std::io::sink()).is_ok() +} + +fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> { + let mut result = Vec::<u8>::new(); + let mut encoder = Encoder::new(&mut result, level)?; + encoder.include_checksum(true)?; + std::io::copy(&mut source, &mut encoder)?; + encoder.finish()?; + Ok(result) +} |