aboutsummaryrefslogtreecommitdiff
path: root/src/block/block.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2022-07-22 18:20:27 +0200
committerAlex Auvolat <alex@adnab.me>2022-07-29 12:25:02 +0200
commit605a630333c8ee60c55fe011a375c01277bba173 (patch)
tree1ae21efe1070806ddc9af7b85cda718e64e105c8 /src/block/block.rs
parenta35d4da721db3550a2833d8576d4283bc999e8df (diff)
downloadgarage-605a630333c8ee60c55fe011a375c01277bba173.tar.gz
garage-605a630333c8ee60c55fe011a375c01277bba173.zip
Use streaming in block manager
Diffstat (limited to 'src/block/block.rs')
-rw-r--r--src/block/block.rs37
1 files changed, 28 insertions, 9 deletions
diff --git a/src/block/block.rs b/src/block/block.rs
index f17bd2c0..935aa900 100644
--- a/src/block/block.rs
+++ b/src/block/block.rs
@@ -5,13 +5,18 @@ use zstd::stream::{decode_all as zstd_decode, Encoder};
use garage_util::data::*;
use garage_util::error::*;
+#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
+pub enum DataBlockHeader {
+ Plain,
+ Compressed,
+}
+
/// A possibly compressed block of data
-#[derive(Debug, Serialize, Deserialize)]
pub enum DataBlock {
/// Uncompressed data
- Plain(#[serde(with = "serde_bytes")] Vec<u8>),
+ Plain(Bytes),
/// Data compressed with zstd
- Compressed(#[serde(with = "serde_bytes")] Vec<u8>),
+ Compressed(Bytes),
}
impl DataBlock {
@@ -31,7 +36,7 @@ impl DataBlock {
/// Get the buffer, possibly decompressing it, and verify it's integrity.
/// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system
/// is used instead.
- pub fn verify_get(self, hash: Hash) -> Result<Vec<u8>, Error> {
+ pub fn verify_get(self, hash: Hash) -> Result<Bytes, Error> {
match self {
DataBlock::Plain(data) => {
if blake2sum(&data) == hash {
@@ -40,9 +45,9 @@ impl DataBlock {
Err(Error::CorruptData(hash))
}
}
- DataBlock::Compressed(data) => {
- zstd_decode(&data[..]).map_err(|_| Error::CorruptData(hash))
- }
+ DataBlock::Compressed(data) => zstd_decode(&data[..])
+ .map_err(|_| Error::CorruptData(hash))
+ .map(Bytes::from),
}
}
@@ -66,14 +71,28 @@ impl DataBlock {
tokio::task::spawn_blocking(move || {
if let Some(level) = level {
if let Ok(data) = zstd_encode(&data[..], level) {
- return DataBlock::Compressed(data);
+ return DataBlock::Compressed(data.into());
}
}
- DataBlock::Plain(data.to_vec()) // TODO: remove to_vec here
+ DataBlock::Plain(data)
})
.await
.unwrap()
}
+
+ pub fn into_parts(self) -> (DataBlockHeader, Bytes) {
+ match self {
+ DataBlock::Plain(data) => (DataBlockHeader::Plain, data),
+ DataBlock::Compressed(data) => (DataBlockHeader::Compressed, data),
+ }
+ }
+
+ pub fn from_parts(h: DataBlockHeader, bytes: Bytes) -> Self {
+ match h {
+ DataBlockHeader::Plain => DataBlock::Plain(bytes),
+ DataBlockHeader::Compressed => DataBlock::Compressed(bytes),
+ }
+ }
}
fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {