diff options
author | Alex <alex@adnab.me> | 2024-02-23 15:49:43 +0000 |
---|---|---|
committer | Alex <alex@adnab.me> | 2024-02-23 15:49:43 +0000 |
commit | 61758ce0f91b930542bd2ee3c72735000cc12e75 (patch) | |
tree | 0a8c9c8d57abfa084188873dab2987ecfe93c0d7 /src/block/block.rs | |
parent | 74d0c47f21ae2f9998a7dcbca3a27e3cc51e70b6 (diff) | |
parent | 6ee691e65f2c6f7b337a62cbfacaddb9ba9cd61a (diff) | |
download | garage-61758ce0f91b930542bd2ee3c72735000cc12e75.tar.gz garage-61758ce0f91b930542bd2ee3c72735000cc12e75.zip |
Merge pull request 'some refactoring on data read/write path' (#729) from refactor-block into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/729
Diffstat (limited to 'src/block/block.rs')
-rw-r--r-- | src/block/block.rs | 115 |
1 files changed, 53 insertions, 62 deletions
diff --git a/src/block/block.rs b/src/block/block.rs index 20f57aa5..504d11f8 100644 --- a/src/block/block.rs +++ b/src/block/block.rs @@ -2,107 +2,98 @@ use std::path::PathBuf; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use zstd::stream::{decode_all as zstd_decode, Encoder}; +use zstd::stream::Encoder; use garage_util::data::*; use garage_util::error::*; +use garage_net::stream::ByteStream; + #[derive(Debug, Serialize, Deserialize, Copy, Clone)] pub enum DataBlockHeader { Plain, Compressed, } -/// A possibly compressed block of data -pub enum DataBlock { - /// Uncompressed data - Plain(Bytes), - /// Data compressed with zstd - Compressed(Bytes), -} - #[derive(Debug)] -pub enum DataBlockPath { - /// Uncompressed data fail - Plain(PathBuf), - /// Compressed data fail - Compressed(PathBuf), +pub struct DataBlockElem<T> { + header: DataBlockHeader, + elem: T, } -impl DataBlock { - /// Query whether this block is compressed +/// A possibly compressed block of data +pub type DataBlock = DataBlockElem<Bytes>; + +/// A path to a possibly compressed block of data +pub type DataBlockPath = DataBlockElem<PathBuf>; + +/// A stream of possibly compressed block data +pub type DataBlockStream = DataBlockElem<ByteStream>; + +impl DataBlockHeader { pub fn is_compressed(&self) -> bool { - matches!(self, DataBlock::Compressed(_)) + matches!(self, DataBlockHeader::Compressed) } +} - /// Get the inner, possibly compressed buffer. You should probably use [`DataBlock::verify_get`] - /// instead - pub fn inner_buffer(&self) -> &[u8] { - use DataBlock::*; - let (Plain(ref res) | Compressed(ref res)) = self; - res +impl<T> DataBlockElem<T> { + pub fn from_parts(header: DataBlockHeader, elem: T) -> Self { + Self { header, elem } } - /// Get the buffer, possibly decompressing it, and verify it's integrity. - /// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system - /// is used instead. - pub fn verify_get(self, hash: Hash) -> Result<Bytes, Error> { - match self { - DataBlock::Plain(data) => { - if blake2sum(&data) == hash { - Ok(data) - } else { - Err(Error::CorruptData(hash)) - } - } - DataBlock::Compressed(data) => zstd_decode(&data[..]) - .map_err(|_| Error::CorruptData(hash)) - .map(Bytes::from), + pub fn plain(elem: T) -> Self { + Self { + header: DataBlockHeader::Plain, + elem, + } + } + + pub fn compressed(elem: T) -> Self { + Self { + header: DataBlockHeader::Compressed, + elem, } } - /// Verify data integrity. Allocate less than [`DataBlock::verify_get`] and don't consume self, but - /// does not return the buffer content. + pub fn into_parts(self) -> (DataBlockHeader, T) { + (self.header, self.elem) + } + + pub fn as_parts_ref(&self) -> (DataBlockHeader, &T) { + (self.header, &self.elem) + } +} + +impl DataBlock { + /// Verify data integrity. Does not return the buffer content. pub fn verify(&self, hash: Hash) -> Result<(), Error> { - match self { - DataBlock::Plain(data) => { - if blake2sum(data) == hash { + match self.header { + DataBlockHeader::Plain => { + if blake2sum(&self.elem) == hash { Ok(()) } else { Err(Error::CorruptData(hash)) } } - DataBlock::Compressed(data) => zstd::stream::copy_decode(&data[..], std::io::sink()) - .map_err(|_| Error::CorruptData(hash)), + DataBlockHeader::Compressed => { + zstd::stream::copy_decode(&self.elem[..], std::io::sink()) + .map_err(|_| Error::CorruptData(hash)) + } } } pub async fn from_buffer(data: Bytes, level: Option<i32>) -> DataBlock { tokio::task::spawn_blocking(move || { if let Some(level) = level { - if let Ok(data) = zstd_encode(&data[..], level) { - return DataBlock::Compressed(data.into()); + if let Ok(data_compressed) = zstd_encode(&data[..], level) { + return DataBlock::compressed(data_compressed.into()); } } - DataBlock::Plain(data) + DataBlock::plain(data.into()) }) .await .unwrap() } - - pub fn into_parts(self) -> (DataBlockHeader, Bytes) { - match self { - DataBlock::Plain(data) => (DataBlockHeader::Plain, data), - DataBlock::Compressed(data) => (DataBlockHeader::Compressed, data), - } - } - - pub fn from_parts(h: DataBlockHeader, bytes: Bytes) -> Self { - match h { - DataBlockHeader::Plain => DataBlock::Plain(bytes), - DataBlockHeader::Compressed => DataBlock::Compressed(bytes), - } - } } fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> { |