aboutsummaryrefslogtreecommitdiff
path: root/src/block/block.rs
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2024-02-23 15:49:43 +0000
committerAlex <alex@adnab.me>2024-02-23 15:49:43 +0000
commit61758ce0f91b930542bd2ee3c72735000cc12e75 (patch)
tree0a8c9c8d57abfa084188873dab2987ecfe93c0d7 /src/block/block.rs
parent74d0c47f21ae2f9998a7dcbca3a27e3cc51e70b6 (diff)
parent6ee691e65f2c6f7b337a62cbfacaddb9ba9cd61a (diff)
downloadgarage-61758ce0f91b930542bd2ee3c72735000cc12e75.tar.gz
garage-61758ce0f91b930542bd2ee3c72735000cc12e75.zip
Merge pull request 'some refactoring on data read/write path' (#729) from refactor-block into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/729
Diffstat (limited to 'src/block/block.rs')
-rw-r--r--src/block/block.rs115
1 files changed, 53 insertions, 62 deletions
diff --git a/src/block/block.rs b/src/block/block.rs
index 20f57aa5..504d11f8 100644
--- a/src/block/block.rs
+++ b/src/block/block.rs
@@ -2,107 +2,98 @@ use std::path::PathBuf;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
-use zstd::stream::{decode_all as zstd_decode, Encoder};
+use zstd::stream::Encoder;
use garage_util::data::*;
use garage_util::error::*;
+use garage_net::stream::ByteStream;
+
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
pub enum DataBlockHeader {
Plain,
Compressed,
}
-/// A possibly compressed block of data
-pub enum DataBlock {
- /// Uncompressed data
- Plain(Bytes),
- /// Data compressed with zstd
- Compressed(Bytes),
-}
-
#[derive(Debug)]
-pub enum DataBlockPath {
- /// Uncompressed data fail
- Plain(PathBuf),
- /// Compressed data fail
- Compressed(PathBuf),
+pub struct DataBlockElem<T> {
+ header: DataBlockHeader,
+ elem: T,
}
-impl DataBlock {
- /// Query whether this block is compressed
+/// A possibly compressed block of data
+pub type DataBlock = DataBlockElem<Bytes>;
+
+/// A path to a possibly compressed block of data
+pub type DataBlockPath = DataBlockElem<PathBuf>;
+
+/// A stream of possibly compressed block data
+pub type DataBlockStream = DataBlockElem<ByteStream>;
+
+impl DataBlockHeader {
pub fn is_compressed(&self) -> bool {
- matches!(self, DataBlock::Compressed(_))
+ matches!(self, DataBlockHeader::Compressed)
}
+}
- /// Get the inner, possibly compressed buffer. You should probably use [`DataBlock::verify_get`]
- /// instead
- pub fn inner_buffer(&self) -> &[u8] {
- use DataBlock::*;
- let (Plain(ref res) | Compressed(ref res)) = self;
- res
+impl<T> DataBlockElem<T> {
+ pub fn from_parts(header: DataBlockHeader, elem: T) -> Self {
+ Self { header, elem }
}
- /// Get the buffer, possibly decompressing it, and verify it's integrity.
- /// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system
- /// is used instead.
- pub fn verify_get(self, hash: Hash) -> Result<Bytes, Error> {
- match self {
- DataBlock::Plain(data) => {
- if blake2sum(&data) == hash {
- Ok(data)
- } else {
- Err(Error::CorruptData(hash))
- }
- }
- DataBlock::Compressed(data) => zstd_decode(&data[..])
- .map_err(|_| Error::CorruptData(hash))
- .map(Bytes::from),
+ pub fn plain(elem: T) -> Self {
+ Self {
+ header: DataBlockHeader::Plain,
+ elem,
+ }
+ }
+
+ pub fn compressed(elem: T) -> Self {
+ Self {
+ header: DataBlockHeader::Compressed,
+ elem,
}
}
- /// Verify data integrity. Allocate less than [`DataBlock::verify_get`] and don't consume self, but
- /// does not return the buffer content.
+ pub fn into_parts(self) -> (DataBlockHeader, T) {
+ (self.header, self.elem)
+ }
+
+ pub fn as_parts_ref(&self) -> (DataBlockHeader, &T) {
+ (self.header, &self.elem)
+ }
+}
+
+impl DataBlock {
+ /// Verify data integrity. Does not return the buffer content.
pub fn verify(&self, hash: Hash) -> Result<(), Error> {
- match self {
- DataBlock::Plain(data) => {
- if blake2sum(data) == hash {
+ match self.header {
+ DataBlockHeader::Plain => {
+ if blake2sum(&self.elem) == hash {
Ok(())
} else {
Err(Error::CorruptData(hash))
}
}
- DataBlock::Compressed(data) => zstd::stream::copy_decode(&data[..], std::io::sink())
- .map_err(|_| Error::CorruptData(hash)),
+ DataBlockHeader::Compressed => {
+ zstd::stream::copy_decode(&self.elem[..], std::io::sink())
+ .map_err(|_| Error::CorruptData(hash))
+ }
}
}
pub async fn from_buffer(data: Bytes, level: Option<i32>) -> DataBlock {
tokio::task::spawn_blocking(move || {
if let Some(level) = level {
- if let Ok(data) = zstd_encode(&data[..], level) {
- return DataBlock::Compressed(data.into());
+ if let Ok(data_compressed) = zstd_encode(&data[..], level) {
+ return DataBlock::compressed(data_compressed.into());
}
}
- DataBlock::Plain(data)
+ DataBlock::plain(data.into())
})
.await
.unwrap()
}
-
- pub fn into_parts(self) -> (DataBlockHeader, Bytes) {
- match self {
- DataBlock::Plain(data) => (DataBlockHeader::Plain, data),
- DataBlock::Compressed(data) => (DataBlockHeader::Compressed, data),
- }
- }
-
- pub fn from_parts(h: DataBlockHeader, bytes: Bytes) -> Self {
- match h {
- DataBlockHeader::Plain => DataBlock::Plain(bytes),
- DataBlockHeader::Compressed => DataBlock::Compressed(bytes),
- }
- }
}
fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {