diff options
Diffstat (limited to 'src/model')
-rw-r--r-- | src/model/Cargo.toml | 1 | ||||
-rw-r--r-- | src/model/block.rs | 201 |
2 files changed, 166 insertions, 36 deletions
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 7979a79a..1d695192 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -23,6 +23,7 @@ arc-swap = "1.0" hex = "0.4" log = "0.4" rand = "0.8" +zstd = { version = "0.9", default-features = false } sled = "0.34" diff --git a/src/model/block.rs b/src/model/block.rs index 8b1919bb..6df8e265 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -11,6 +11,7 @@ use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::{watch, Mutex, Notify}; +use zstd::stream::{decode_all as zstd_decode, Encoder}; use garage_util::data::*; use garage_util::error::*; @@ -55,22 +56,81 @@ pub enum BlockRpc { GetBlock(Hash), /// Message to send a block of data, either because requested, of for first delivery of new /// block - PutBlock(PutBlockMessage), + PutBlock { + hash: Hash, + data: DataBlock, + }, /// Ask other node if they should have this block, but don't actually have it NeedBlockQuery(Hash), /// Response : whether the node do require that block NeedBlockReply(bool), } -/// Structure used to send a block +/// A possibly compressed block of data #[derive(Debug, Serialize, Deserialize)] -pub struct PutBlockMessage { - /// Hash of the block - pub hash: Hash, +pub enum DataBlock { + /// Uncompressed data + Plain(#[serde(with = "serde_bytes")] Vec<u8>), + /// Data compressed with zstd + Compressed(#[serde(with = "serde_bytes")] Vec<u8>), +} + +impl DataBlock { + /// Query whether this block is compressed + pub fn is_compressed(&self) -> bool { + matches!(self, DataBlock::Compressed(_)) + } + + /// Get the inner, possibly compressed buffer. You should probably use [`DataBlock::verify_get`] + /// instead + pub fn inner_buffer(&self) -> &[u8] { + use DataBlock::*; + let (Plain(ref res) | Compressed(ref res)) = self; + res + } - /// Content of the block - #[serde(with = "serde_bytes")] - pub data: Vec<u8>, + /// Get the buffer, possibly decompressing it, and verify it's integrity. + /// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system + /// is used instead. + pub fn verify_get(self, hash: Hash) -> Result<Vec<u8>, Error> { + match self { + DataBlock::Plain(data) => { + if blake2sum(&data) == hash { + Ok(data) + } else { + Err(Error::CorruptData(hash)) + } + } + DataBlock::Compressed(data) => { + zstd_decode(&data[..]).map_err(|_| Error::CorruptData(hash)) + } + } + } + + /// Verify data integrity. Allocate less than [`DataBlock::verify_get`] and don't consume self, but + /// does not return the buffer content. + pub fn verify(&self, hash: Hash) -> Result<(), Error> { + match self { + DataBlock::Plain(data) => { + if blake2sum(data) == hash { + Ok(()) + } else { + Err(Error::CorruptData(hash)) + } + } + DataBlock::Compressed(data) => zstd::stream::copy_decode(&data[..], std::io::sink()) + .map_err(|_| Error::CorruptData(hash)), + } + } + + pub fn from_buffer(data: Vec<u8>, level: Option<i32>) -> DataBlock { + if let Some(level) = level { + if let Ok(data) = zstd_encode(&data[..], level) { + return DataBlock::Compressed(data); + } + } + DataBlock::Plain(data) + } } impl Rpc for BlockRpc { @@ -138,10 +198,8 @@ impl BlockManager { block_manager } - // ---- Public interface ---- - - /// Ask nodes that might have a block for it - pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> { + /// Ask nodes that might have a (possibly compressed) block for it + async fn rpc_get_raw_block(&self, hash: &Hash) -> Result<DataBlock, Error> { let who = self.replication.read_nodes(hash); let resps = self .system @@ -158,8 +216,8 @@ impl BlockManager { .await?; for resp in resps { - if let BlockRpc::PutBlock(msg) = resp { - return Ok(msg.data); + if let BlockRpc::PutBlock { data, .. } = resp { + return Ok(data); } } Err(Error::Message(format!( @@ -168,15 +226,30 @@ impl BlockManager { ))) } + // ---- Public interface ---- + + /// Ask nodes that might have a block for it + pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> { + self.rpc_get_raw_block(hash).await?.verify_get(*hash) + } + /// Send block to nodes that should have it pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> { let who = self.replication.write_nodes(&hash); + let compression_level = self + .garage + .load() + .as_ref() + .unwrap() + .config + .compression_level; + let data = DataBlock::from_buffer(data, compression_level); self.system .rpc .try_call_many( &self.endpoint, &who[..], - BlockRpc::PutBlock(PutBlockMessage { hash, data }), + BlockRpc::PutBlock { hash, data }, RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(self.replication.write_quorum()) .with_timeout(BLOCK_RW_TIMEOUT), @@ -306,7 +379,7 @@ impl BlockManager { // ---- Reading and writing blocks locally ---- /// Write a block to disk - async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<BlockRpc, Error> { + async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result<BlockRpc, Error> { self.mutation_lock .lock() .await @@ -316,21 +389,31 @@ impl BlockManager { /// Read block from disk, verifying it's integrity async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> { - let path = self.block_path(hash); - - let mut f = match fs::File::open(&path).await { - Ok(f) => f, + let mut path = self.block_path(hash); + let compressed = match self.is_block_compressed(hash).await { + Ok(c) => c, Err(e) => { // Not found but maybe we should have had it ?? self.put_to_resync(hash, 2 * BLOCK_RW_TIMEOUT)?; return Err(Into::into(e)); } }; + if compressed { + path.set_extension("zst"); + } + let mut f = fs::File::open(&path).await?; + let mut data = vec![]; f.read_to_end(&mut data).await?; drop(f); - if blake2sum(&data[..]) != *hash { + let data = if compressed { + DataBlock::Compressed(data) + } else { + DataBlock::Plain(data) + }; + + if data.verify(*hash).is_err() { self.mutation_lock .lock() .await @@ -340,7 +423,7 @@ impl BlockManager { return Err(Error::CorruptData(*hash)); } - Ok(BlockRpc::PutBlock(PutBlockMessage { hash: *hash, data })) + Ok(BlockRpc::PutBlock { hash: *hash, data }) } /// Check if this node should have a block, but don't actually have it @@ -362,13 +445,25 @@ impl BlockManager { path } - /// Utility: give the full path where a block should be found + /// Utility: give the full path where a block should be found, minus extension if block is + /// compressed fn block_path(&self, hash: &Hash) -> PathBuf { let mut path = self.block_dir(hash); path.push(hex::encode(hash.as_ref())); path } + /// Utility: check if block is stored compressed. Error if block is not stored + async fn is_block_compressed(&self, hash: &Hash) -> Result<bool, Error> { + let mut path = self.block_path(hash); + path.set_extension("zst"); + if fs::metadata(&path).await.is_ok() { + return Ok(true); + } + path.set_extension(""); + fs::metadata(&path).await.map(|_| false).map_err(Into::into) + } + // ---- Resync loop ---- pub fn spawn_background_worker(self: Arc<Self>) { @@ -550,8 +645,8 @@ impl BlockManager { hash ); - let block_data = self.rpc_get_block(hash).await?; - self.write_block(hash, &block_data[..]).await?; + let block_data = self.rpc_get_raw_block(hash).await?; + self.write_block(hash, &block_data).await?; } Ok(()) @@ -602,6 +697,7 @@ impl BlockManager { }; let ent_type = data_dir_ent.file_type().await?; + let name = name.strip_suffix(".zst").unwrap_or(&name); if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() { state = self .for_each_file_rec(&data_dir_ent.path(), state, f, must_exit) @@ -631,7 +727,7 @@ impl EndpointHandler<BlockRpc> for BlockManager { _from: NodeID, ) -> Result<BlockRpc, Error> { match message { - BlockRpc::PutBlock(m) => self.write_block(&m.hash, &m.data).await, + BlockRpc::PutBlock { hash, data } => self.write_block(hash, data).await, BlockRpc::GetBlock(h) => self.read_block(h).await, BlockRpc::NeedBlockQuery(h) => self.need_block(h).await.map(BlockRpc::NeedBlockReply), _ => Err(Error::BadRpc("Unexpected RPC message".to_string())), @@ -650,9 +746,7 @@ impl BlockManagerLocked { hash: &Hash, mgr: &BlockManager, ) -> Result<BlockStatus, Error> { - let path = mgr.block_path(hash); - - let exists = fs::metadata(&path).await.is_ok(); + let exists = mgr.is_block_compressed(hash).await.is_ok(); let needed = mgr.get_block_rc(hash)?; Ok(BlockStatus { exists, needed }) @@ -661,16 +755,31 @@ impl BlockManagerLocked { async fn write_block( &self, hash: &Hash, - data: &[u8], + data: &DataBlock, mgr: &BlockManager, ) -> Result<BlockRpc, Error> { + let compressed = data.is_compressed(); + let data = data.inner_buffer(); + let mut path = mgr.block_dir(hash); fs::create_dir_all(&path).await?; path.push(hex::encode(hash)); - if fs::metadata(&path).await.is_ok() { - return Ok(BlockRpc::Ok); - } + let to_delete = match (mgr.is_block_compressed(hash).await, compressed) { + (Ok(true), _) => return Ok(BlockRpc::Ok), + (Ok(false), false) => return Ok(BlockRpc::Ok), + (Ok(false), true) => { + let path_to_delete = path.clone(); + path.set_extension("zst"); + Some(path_to_delete) + } + (Err(_), compressed) => { + if compressed { + path.set_extension("zst"); + } + None + } + }; let mut path2 = path.clone(); path2.set_extension("tmp"); @@ -679,6 +788,9 @@ impl BlockManagerLocked { drop(f); fs::rename(path2, path).await?; + if let Some(to_delete) = to_delete { + fs::remove_file(to_delete).await?; + } Ok(BlockRpc::Ok) } @@ -688,9 +800,14 @@ impl BlockManagerLocked { "Block {:?} is corrupted. Renaming to .corrupted and resyncing.", hash ); - let path = mgr.block_path(hash); + let mut path = mgr.block_path(hash); let mut path2 = path.clone(); - path2.set_extension("corrupted"); + if mgr.is_block_compressed(hash).await? { + path.set_extension("zst"); + path2.set_extension("zst.corrupted"); + } else { + path2.set_extension("corrupted"); + } fs::rename(path, path2).await?; Ok(()) } @@ -699,7 +816,10 @@ impl BlockManagerLocked { let BlockStatus { exists, needed } = self.check_block_status(hash, mgr).await?; if exists && needed.is_deletable() { - let path = mgr.block_path(hash); + let mut path = mgr.block_path(hash); + if mgr.is_block_compressed(hash).await? { + path.set_extension("zst"); + } fs::remove_file(path).await?; } Ok(()) @@ -806,3 +926,12 @@ impl RcEntry { !self.is_deletable() } } + +fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> { + let mut result = Vec::<u8>::new(); + let mut encoder = Encoder::new(&mut result, level)?; + encoder.include_checksum(true)?; + std::io::copy(&mut source, &mut encoder)?; + encoder.finish()?; + Ok(result) +} |