From 07c78959480327af3709dc0e7b6c8f798cef9c92 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 23 Feb 2024 11:47:44 +0100 Subject: [refactor-block] simplify rpc_get_block --- src/block/block.rs | 23 ++--------------------- 1 file changed, 2 insertions(+), 21 deletions(-) (limited to 'src/block/block.rs') diff --git a/src/block/block.rs b/src/block/block.rs index 20f57aa5..34afea5d 100644 --- a/src/block/block.rs +++ b/src/block/block.rs @@ -2,7 +2,7 @@ use std::path::PathBuf; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use zstd::stream::{decode_all as zstd_decode, Encoder}; +use zstd::stream::Encoder; use garage_util::data::*; use garage_util::error::*; @@ -43,26 +43,7 @@ impl DataBlock { res } - /// Get the buffer, possibly decompressing it, and verify it's integrity. - /// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system - /// is used instead. - pub fn verify_get(self, hash: Hash) -> Result { - match self { - DataBlock::Plain(data) => { - if blake2sum(&data) == hash { - Ok(data) - } else { - Err(Error::CorruptData(hash)) - } - } - DataBlock::Compressed(data) => zstd_decode(&data[..]) - .map_err(|_| Error::CorruptData(hash)) - .map(Bytes::from), - } - } - - /// Verify data integrity. Allocate less than [`DataBlock::verify_get`] and don't consume self, but - /// does not return the buffer content. + /// Verify data integrity. Does not return the buffer content. pub fn verify(&self, hash: Hash) -> Result<(), Error> { match self { DataBlock::Plain(data) => { -- cgit v1.2.3 From cd1069c1d4219deb7d48758fb9d0a45cc4c1eddb Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 23 Feb 2024 12:11:14 +0100 Subject: [refactor-block] refactor DataBlock and DataBlockPath --- src/block/block.rs | 100 ++++++++++++++++++++++++++++++++--------------------- 1 file changed, 61 insertions(+), 39 deletions(-) (limited to 'src/block/block.rs') diff --git a/src/block/block.rs b/src/block/block.rs index 34afea5d..0b14bad4 100644 --- a/src/block/block.rs +++ b/src/block/block.rs @@ -13,77 +13,99 @@ pub enum DataBlockHeader { Compressed, } -/// A possibly compressed block of data -pub enum DataBlock { - /// Uncompressed data - Plain(Bytes), - /// Data compressed with zstd - Compressed(Bytes), +#[derive(Debug)] +pub struct DataBlockElem { + header: DataBlockHeader, + elem: T, } -#[derive(Debug)] -pub enum DataBlockPath { - /// Uncompressed data fail - Plain(PathBuf), - /// Compressed data fail - Compressed(PathBuf), +/// A possibly compressed block of data +pub type DataBlock = DataBlockElem; + +/// A path to a possibly compressed block of data +pub type DataBlockPath = DataBlockElem; + +impl DataBlockHeader { + pub fn is_compressed(&self) -> bool { + matches!(self, DataBlockHeader::Compressed) + } } -impl DataBlock { +impl DataBlockElem { + pub fn from_parts(header: DataBlockHeader, elem: T) -> Self { + Self { header, elem } + } + + pub fn plain(elem: T) -> Self { + Self { + header: DataBlockHeader::Plain, + elem, + } + } + + pub fn compressed(elem: T) -> Self { + Self { + header: DataBlockHeader::Compressed, + elem, + } + } + + pub fn into_parts(self) -> (DataBlockHeader, T) { + (self.header, self.elem) + } + + pub fn as_parts_ref(&self) -> (DataBlockHeader, &T) { + (self.header, &self.elem) + } + /// Query whether this block is compressed pub fn is_compressed(&self) -> bool { - matches!(self, DataBlock::Compressed(_)) + self.header.is_compressed() } +} +impl DataBlock { /// Get the inner, possibly compressed buffer. You should probably use [`DataBlock::verify_get`] /// instead pub fn inner_buffer(&self) -> &[u8] { - use DataBlock::*; - let (Plain(ref res) | Compressed(ref res)) = self; - res + &self.elem } /// Verify data integrity. Does not return the buffer content. pub fn verify(&self, hash: Hash) -> Result<(), Error> { - match self { - DataBlock::Plain(data) => { - if blake2sum(data) == hash { + match self.header { + DataBlockHeader::Plain => { + if blake2sum(&self.elem) == hash { Ok(()) } else { Err(Error::CorruptData(hash)) } } - DataBlock::Compressed(data) => zstd::stream::copy_decode(&data[..], std::io::sink()) - .map_err(|_| Error::CorruptData(hash)), + DataBlockHeader::Compressed => { + zstd::stream::copy_decode(&self.elem[..], std::io::sink()) + .map_err(|_| Error::CorruptData(hash)) + } } } pub async fn from_buffer(data: Bytes, level: Option) -> DataBlock { tokio::task::spawn_blocking(move || { if let Some(level) = level { - if let Ok(data) = zstd_encode(&data[..], level) { - return DataBlock::Compressed(data.into()); + if let Ok(data_compressed) = zstd_encode(&data[..], level) { + return DataBlock { + header: DataBlockHeader::Compressed, + elem: data_compressed.into(), + }; } } - DataBlock::Plain(data) + DataBlock { + header: DataBlockHeader::Plain, + elem: data.into(), + } }) .await .unwrap() } - - pub fn into_parts(self) -> (DataBlockHeader, Bytes) { - match self { - DataBlock::Plain(data) => (DataBlockHeader::Plain, data), - DataBlock::Compressed(data) => (DataBlockHeader::Compressed, data), - } - } - - pub fn from_parts(h: DataBlockHeader, bytes: Bytes) -> Self { - match h { - DataBlockHeader::Plain => DataBlock::Plain(bytes), - DataBlockHeader::Compressed => DataBlock::Compressed(bytes), - } - } } fn zstd_encode(mut source: R, level: i32) -> std::io::Result> { -- cgit v1.2.3 From e9c42bca347e3a67f8d6bae953bdf0b53ce37d00 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 23 Feb 2024 12:22:29 +0100 Subject: [refactor-block] add DataBlockStream type --- src/block/block.rs | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'src/block/block.rs') diff --git a/src/block/block.rs b/src/block/block.rs index 0b14bad4..3f5c4f94 100644 --- a/src/block/block.rs +++ b/src/block/block.rs @@ -7,6 +7,8 @@ use zstd::stream::Encoder; use garage_util::data::*; use garage_util::error::*; +use garage_net::stream::ByteStream; + #[derive(Debug, Serialize, Deserialize, Copy, Clone)] pub enum DataBlockHeader { Plain, @@ -25,6 +27,9 @@ pub type DataBlock = DataBlockElem; /// A path to a possibly compressed block of data pub type DataBlockPath = DataBlockElem; +/// A stream of possibly compressed block data +pub type DataBlockStream = DataBlockElem; + impl DataBlockHeader { pub fn is_compressed(&self) -> bool { matches!(self, DataBlockHeader::Compressed) -- cgit v1.2.3 From 6ee691e65f2c6f7b337a62cbfacaddb9ba9cd61a Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 23 Feb 2024 12:26:35 +0100 Subject: [refactor-block] simplify some more --- src/block/block.rs | 21 ++------------------- 1 file changed, 2 insertions(+), 19 deletions(-) (limited to 'src/block/block.rs') diff --git a/src/block/block.rs b/src/block/block.rs index 3f5c4f94..504d11f8 100644 --- a/src/block/block.rs +++ b/src/block/block.rs @@ -62,20 +62,9 @@ impl DataBlockElem { pub fn as_parts_ref(&self) -> (DataBlockHeader, &T) { (self.header, &self.elem) } - - /// Query whether this block is compressed - pub fn is_compressed(&self) -> bool { - self.header.is_compressed() - } } impl DataBlock { - /// Get the inner, possibly compressed buffer. You should probably use [`DataBlock::verify_get`] - /// instead - pub fn inner_buffer(&self) -> &[u8] { - &self.elem - } - /// Verify data integrity. Does not return the buffer content. pub fn verify(&self, hash: Hash) -> Result<(), Error> { match self.header { @@ -97,16 +86,10 @@ impl DataBlock { tokio::task::spawn_blocking(move || { if let Some(level) = level { if let Ok(data_compressed) = zstd_encode(&data[..], level) { - return DataBlock { - header: DataBlockHeader::Compressed, - elem: data_compressed.into(), - }; + return DataBlock::compressed(data_compressed.into()); } } - DataBlock { - header: DataBlockHeader::Plain, - elem: data.into(), - } + DataBlock::plain(data.into()) }) .await .unwrap() -- cgit v1.2.3