diff options
Diffstat (limited to 'src/model/block.rs')
-rw-r--r-- | src/model/block.rs | 117 |
1 files changed, 93 insertions, 24 deletions
diff --git a/src/model/block.rs b/src/model/block.rs index 5f428fe1..272bd884 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::{watch, Mutex, Notify}; +use zstd::stream::{decode_all as zstd_decode, encode_all as zstd_encode}; use garage_util::data::*; use garage_util::error::Error; @@ -43,6 +44,7 @@ pub enum Message { /// Message to send a block of data, either because requested, of for first delivery of new /// block PutBlock(PutBlockMessage), + PutCompressedBlock(PutBlockMessage), /// Ask other node if they should have this block, but don't actually have it NeedBlockQuery(Hash), /// Response : whether the node do require that block @@ -134,7 +136,8 @@ impl BlockManager { async fn handle(self: Arc<Self>, msg: &Message) -> Result<Message, Error> { match msg { - Message::PutBlock(m) => self.write_block(&m.hash, &m.data).await, + Message::PutBlock(m) => self.write_block(&m.hash, &m.data, false).await, + Message::PutCompressedBlock(m) => self.write_block(&m.hash, &m.data, true).await, Message::GetBlock(h) => self.read_block(h).await, Message::NeedBlockQuery(h) => self.need_block(h).await.map(Message::NeedBlockReply), _ => Err(Error::BadRPC(format!("Unexpected RPC message"))), @@ -157,15 +160,23 @@ impl BlockManager { } /// Write a block to disk - async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> { + pub async fn write_block( + &self, + hash: &Hash, + data: &[u8], + compressed: bool, + ) -> Result<Message, Error> { + if self.is_block_compressed(hash).await.is_ok() { + return Ok(Message::Ok); + } let _lock = self.data_dir_lock.lock().await; let mut path = self.block_dir(hash); fs::create_dir_all(&path).await?; path.push(hex::encode(hash)); - if fs::metadata(&path).await.is_ok() { - return Ok(Message::Ok); + if compressed { + path.set_extension("zst"); } let mut f = fs::File::create(path).await?; @@ -176,35 +187,61 @@ impl BlockManager { } /// Read block from disk, verifying it's integrity - async fn read_block(&self, hash: &Hash) -> Result<Message, Error> { - let path = self.block_path(hash); + pub async fn read_block(&self, hash: &Hash) -> Result<Message, Error> { + let mut path = self.block_path(hash); - let mut f = match fs::File::open(&path).await { - Ok(f) => f, - Err(e) => { + let mut data = vec![]; + let block = match self.is_block_compressed(hash).await { + Ok(false) => { + let f = fs::File::open(&path).await; + f.map(|f| (f, false)).map_err(Into::into) + } + Ok(true) => { + path.set_extension("zst"); + let f = fs::File::open(&path).await; + f.map(|f| (f, true)).map_err(Into::into) + } + Err(e) => Err(e), + }; + let (mut f, compressed) = match block { + Ok(ok) => ok, + e => { // Not found but maybe we should have had it ?? self.put_to_resync(hash, Duration::from_millis(0))?; - return Err(Into::into(e)); + e? } }; - let mut data = vec![]; f.read_to_end(&mut data).await?; drop(f); - if blake2sum(&data[..]) != *hash { + let sum = if compressed { + zstd_decode(&data[..]) + .ok() + .map(|decompressed| blake2sum(&decompressed[..])) + } else { + Some(blake2sum(&data[..])) + }; + if sum.is_none() || sum.unwrap() != *hash { let _lock = self.data_dir_lock.lock().await; warn!( "Block {:?} is corrupted. Renaming to .corrupted and resyncing.", hash ); let mut path2 = path.clone(); - path2.set_extension(".corrupted"); + path2.set_extension("corrupted"); fs::rename(path, path2).await?; self.put_to_resync(&hash, Duration::from_millis(0))?; return Err(Error::CorruptData(*hash)); } - Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data })) + if compressed { + Ok(Message::PutCompressedBlock(PutBlockMessage { + hash: *hash, + data, + })) + } else { + Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data })) + } } /// Check if this node should have a block, but don't actually have it @@ -215,14 +252,22 @@ impl BlockManager { .map(|x| u64_from_be_bytes(x) > 0) .unwrap_or(false); if needed { - let path = self.block_path(hash); - let exists = fs::metadata(&path).await.is_ok(); + let exists = self.is_block_compressed(hash).await.is_ok(); Ok(!exists) } else { Ok(false) } } + async fn is_block_compressed(&self, hash: &Hash) -> Result<bool, Error> { + let mut path = self.block_path(hash); + if fs::metadata(&path).await.is_ok() { + return Ok(false); + } + path.set_extension("zst"); + fs::metadata(&path).await.map(|_| true).map_err(Into::into) + } + fn block_dir(&self, hash: &Hash) -> PathBuf { let mut path = self.data_dir.clone(); path.push(hex::encode(&hash.as_slice()[0..1])); @@ -323,7 +368,7 @@ impl BlockManager { let path = self.block_path(hash); - let exists = fs::metadata(&path).await.is_ok(); + let exists = self.is_block_compressed(hash).await.is_ok(); let needed = self .rc .get(hash.as_ref())? @@ -402,15 +447,14 @@ impl BlockManager { // TODO find a way to not do this if they are sending it to us // Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay // between the RC being incremented and this part being called. - let block_data = self.rpc_get_block(&hash).await?; - self.write_block(hash, &block_data[..]).await?; + let (block_data, compressed) = self.rpc_get_raw_block(&hash).await?; + self.write_block(hash, &block_data[..], compressed).await?; } Ok(()) } - /// Ask nodes that might have a block for it - pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> { + async fn rpc_get_raw_block(&self, hash: &Hash) -> Result<(Vec<u8>, bool), Error> { let who = self.replication.read_nodes(&hash); let resps = self .rpc_client @@ -424,8 +468,10 @@ impl BlockManager { .await?; for resp in resps { - if let Message::PutBlock(msg) = resp { - return Ok(msg.data); + match resp { + Message::PutBlock(msg) => return Ok((msg.data, false)), + Message::PutCompressedBlock(msg) => return Ok((msg.data, true)), + _ => {} } } Err(Error::Message(format!( @@ -434,13 +480,36 @@ impl BlockManager { ))) } + /// Ask nodes that might have a block for it + pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> { + self.rpc_get_raw_block(hash) + .await + .and_then(|(data, compressed)| { + if compressed { + zstd_decode(&data[..]).map_err(|_| Error::CorruptData(*hash)) + } else { + Ok(data) + } + }) + } + /// Send block to nodes that should have it pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> { + let garage = self.garage.load_full().unwrap(); + let compressed = zstd_encode(&data[..], garage.config.compression_level); + let message = if compressed.is_ok() && compressed.as_ref().unwrap().len() < data.len() { + Message::PutCompressedBlock(PutBlockMessage { + hash, + data: compressed.unwrap(), + }) + } else { + Message::PutBlock(PutBlockMessage { hash, data }) + }; let who = self.replication.write_nodes(&hash); self.rpc_client .try_call_many( &who[..], - Message::PutBlock(PutBlockMessage { hash, data }), + message, RequestStrategy::with_quorum(self.replication.write_quorum()) .with_timeout(BLOCK_RW_TIMEOUT), ) |