From 605a630333c8ee60c55fe011a375c01277bba173 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Jul 2022 18:20:27 +0200 Subject: Use streaming in block manager --- src/block/block.rs | 37 ++++++++++++++++++++++++++++--------- 1 file changed, 28 insertions(+), 9 deletions(-) (limited to 'src/block/block.rs') diff --git a/src/block/block.rs b/src/block/block.rs index f17bd2c0..935aa900 100644 --- a/src/block/block.rs +++ b/src/block/block.rs @@ -5,13 +5,18 @@ use zstd::stream::{decode_all as zstd_decode, Encoder}; use garage_util::data::*; use garage_util::error::*; +#[derive(Debug, Serialize, Deserialize, Copy, Clone)] +pub enum DataBlockHeader { + Plain, + Compressed, +} + /// A possibly compressed block of data -#[derive(Debug, Serialize, Deserialize)] pub enum DataBlock { /// Uncompressed data - Plain(#[serde(with = "serde_bytes")] Vec), + Plain(Bytes), /// Data compressed with zstd - Compressed(#[serde(with = "serde_bytes")] Vec), + Compressed(Bytes), } impl DataBlock { @@ -31,7 +36,7 @@ impl DataBlock { /// Get the buffer, possibly decompressing it, and verify it's integrity. /// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system /// is used instead. - pub fn verify_get(self, hash: Hash) -> Result, Error> { + pub fn verify_get(self, hash: Hash) -> Result { match self { DataBlock::Plain(data) => { if blake2sum(&data) == hash { @@ -40,9 +45,9 @@ impl DataBlock { Err(Error::CorruptData(hash)) } } - DataBlock::Compressed(data) => { - zstd_decode(&data[..]).map_err(|_| Error::CorruptData(hash)) - } + DataBlock::Compressed(data) => zstd_decode(&data[..]) + .map_err(|_| Error::CorruptData(hash)) + .map(Bytes::from), } } @@ -66,14 +71,28 @@ impl DataBlock { tokio::task::spawn_blocking(move || { if let Some(level) = level { if let Ok(data) = zstd_encode(&data[..], level) { - return DataBlock::Compressed(data); + return DataBlock::Compressed(data.into()); } } - DataBlock::Plain(data.to_vec()) // TODO: remove to_vec here + DataBlock::Plain(data) }) .await .unwrap() } + + pub fn into_parts(self) -> (DataBlockHeader, Bytes) { + match self { + DataBlock::Plain(data) => (DataBlockHeader::Plain, data), + DataBlock::Compressed(data) => (DataBlockHeader::Compressed, data), + } + } + + pub fn from_parts(h: DataBlockHeader, bytes: Bytes) -> Self { + match h { + DataBlockHeader::Plain => DataBlock::Plain(bytes), + DataBlockHeader::Compressed => DataBlock::Compressed(bytes), + } + } } fn zstd_encode(mut source: R, level: i32) -> std::io::Result> { -- cgit v1.2.3