From 93552b9275a6a83653353e27c0ee0c64c7a23d59 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 23 Feb 2024 11:33:38 +0100 Subject: [refactor-block] Remove redundant BlockStream type --- src/api/s3/get.rs | 12 ++++++------ src/block/manager.rs | 7 +------ 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index 53f0a345..efb8d4ab 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -13,7 +13,7 @@ use http::header::{ use hyper::{body::Body, Request, Response, StatusCode}; use tokio::sync::mpsc; -use garage_block::manager::BlockStream; +use garage_net::stream::ByteStream; use garage_rpc::rpc_helper::OrderTag; use garage_table::EmptyKey; use garage_util::data::*; @@ -286,7 +286,7 @@ pub async fn handle_get( Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?) } ObjectVersionData::FirstBlock(_, first_block_hash) => { - let (tx, rx) = mpsc::channel::(2); + let (tx, rx) = mpsc::channel::(2); let order_stream = OrderTag::stream(); let first_block_hash = *first_block_hash; @@ -494,7 +494,7 @@ fn body_from_blocks_range( } let order_stream = OrderTag::stream(); - let (tx, rx) = mpsc::channel::(2); + let (tx, rx) = mpsc::channel::(2); tokio::spawn(async move { match async { @@ -542,7 +542,7 @@ fn body_from_blocks_range( }) .filter_map(futures::future::ready); - let block_stream: BlockStream = Box::pin(block_stream); + let block_stream: ByteStream = Box::pin(block_stream); tx.send(Box::pin(block_stream)) .await .ok_or_message("channel closed")?; @@ -562,7 +562,7 @@ fn body_from_blocks_range( response_body_from_block_stream(rx) } -fn response_body_from_block_stream(rx: mpsc::Receiver) -> ResBody { +fn response_body_from_block_stream(rx: mpsc::Receiver) -> ResBody { let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx) .flatten() .map(|x| { @@ -572,7 +572,7 @@ fn response_body_from_block_stream(rx: mpsc::Receiver) -> ResBody { ResBody::new(http_body_util::StreamBody::new(body_stream)) } -fn error_stream_item(e: E) -> BlockStream { +fn error_stream_item(e: E) -> ByteStream { let err = std::io::Error::new( std::io::ErrorKind::Other, format!("Error while getting object data: {}", e), diff --git a/src/block/manager.rs b/src/block/manager.rs index 848d9141..96ea2c96 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -1,5 +1,4 @@ use std::path::PathBuf; -use std::pin::Pin; use std::sync::Arc; use std::time::Duration; @@ -9,7 +8,6 @@ use bytes::Bytes; use rand::prelude::*; use serde::{Deserialize, Serialize}; -use futures::Stream; use futures_util::stream::StreamExt; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; @@ -53,9 +51,6 @@ pub const INLINE_THRESHOLD: usize = 3072; // to delete the block locally. pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600); -pub type BlockStream = - Pin> + Send + Sync + 'static>>; - /// RPC messages used to share blocks of data between nodes #[derive(Debug, Serialize, Deserialize)] pub enum BlockRpc { @@ -327,7 +322,7 @@ impl BlockManager { &self, hash: &Hash, order_tag: Option, - ) -> Result { + ) -> Result { let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?; match header { DataBlockHeader::Plain => Ok(stream), -- cgit v1.2.3 From 9b41f4ff2016b099ee706747828f5828f8c96b1c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 23 Feb 2024 11:46:57 +0100 Subject: [refactor-block] move read_stream_to_end to garage_net --- src/block/manager.rs | 22 ++++------------------ src/net/bytes_buf.rs | 13 +++++++++++++ src/net/stream.rs | 11 +++++++++++ 3 files changed, 28 insertions(+), 18 deletions(-) diff --git a/src/block/manager.rs b/src/block/manager.rs index 96ea2c96..54290888 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -8,7 +8,6 @@ use bytes::Bytes; use rand::prelude::*; use serde::{Deserialize, Serialize}; -use futures_util::stream::StreamExt; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::sync::{mpsc, Mutex, MutexGuard}; @@ -18,7 +17,7 @@ use opentelemetry::{ Context, }; -use garage_net::stream::{stream_asyncread, ByteStream}; +use garage_net::stream::{read_stream_to_end, stream_asyncread, ByteStream}; use garage_db as db; @@ -247,7 +246,8 @@ impl BlockManager { self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move { read_stream_to_end(stream) .await - .map(|data| DataBlock::from_parts(header, data)) + .err_context("error in block data stream") + .map(|data| DataBlock::from_parts(header, data.into_bytes())) }) .await } @@ -477,7 +477,7 @@ impl BlockManager { stream: Option, ) -> Result<(), Error> { let stream = stream.ok_or_message("missing stream")?; - let bytes = read_stream_to_end(stream).await?; + let bytes = read_stream_to_end(stream).await?.into_bytes(); let data = DataBlock::from_parts(header, bytes); self.write_block(&hash, &data).await } @@ -823,20 +823,6 @@ impl BlockManagerLocked { } } -async fn read_stream_to_end(mut stream: ByteStream) -> Result { - let mut parts: Vec = vec![]; - while let Some(part) = stream.next().await { - parts.push(part.ok_or_message("error in stream")?); - } - - Ok(parts - .iter() - .map(|x| &x[..]) - .collect::>() - .concat() - .into()) -} - struct DeleteOnDrop(Option); impl DeleteOnDrop { diff --git a/src/net/bytes_buf.rs b/src/net/bytes_buf.rs index 3929a860..1d928ffb 100644 --- a/src/net/bytes_buf.rs +++ b/src/net/bytes_buf.rs @@ -3,6 +3,8 @@ use std::collections::VecDeque; use bytes::BytesMut; +use crate::stream::ByteStream; + pub use bytes::Bytes; /// A circular buffer of bytes, internally represented as a list of Bytes @@ -119,6 +121,17 @@ impl BytesBuf { pub fn into_slices(self) -> VecDeque { self.buf } + + /// Return the entire buffer concatenated into a single big Bytes + pub fn into_bytes(mut self) -> Bytes { + self.take_all() + } + + /// Return the content as a stream of individual chunks + pub fn into_stream(self) -> ByteStream { + use futures::stream::StreamExt; + Box::pin(futures::stream::iter(self.buf).map(|x| Ok(x))) + } } impl Default for BytesBuf { diff --git a/src/net/stream.rs b/src/net/stream.rs index 88c3fed4..3ac6896d 100644 --- a/src/net/stream.rs +++ b/src/net/stream.rs @@ -200,3 +200,14 @@ pub fn asyncread_stream(reader: R) -> Byte pub fn stream_asyncread(stream: ByteStream) -> impl AsyncRead + Send + Sync + 'static { tokio_util::io::StreamReader::new(stream) } + +/// Reads all of the content of a `ByteStream` into a BytesBuf +/// that contains everything +pub async fn read_stream_to_end(mut stream: ByteStream) -> Result { + let mut buf = BytesBuf::new(); + while let Some(part) = stream.next().await { + buf.extend(part?); + } + + Ok(buf) +} -- cgit v1.2.3 From 07c78959480327af3709dc0e7b6c8f798cef9c92 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 23 Feb 2024 11:47:44 +0100 Subject: [refactor-block] simplify rpc_get_block --- src/block/block.rs | 23 ++--------------------- src/block/manager.rs | 5 ++--- 2 files changed, 4 insertions(+), 24 deletions(-) diff --git a/src/block/block.rs b/src/block/block.rs index 20f57aa5..34afea5d 100644 --- a/src/block/block.rs +++ b/src/block/block.rs @@ -2,7 +2,7 @@ use std::path::PathBuf; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use zstd::stream::{decode_all as zstd_decode, Encoder}; +use zstd::stream::Encoder; use garage_util::data::*; use garage_util::error::*; @@ -43,26 +43,7 @@ impl DataBlock { res } - /// Get the buffer, possibly decompressing it, and verify it's integrity. - /// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system - /// is used instead. - pub fn verify_get(self, hash: Hash) -> Result { - match self { - DataBlock::Plain(data) => { - if blake2sum(&data) == hash { - Ok(data) - } else { - Err(Error::CorruptData(hash)) - } - } - DataBlock::Compressed(data) => zstd_decode(&data[..]) - .map_err(|_| Error::CorruptData(hash)) - .map(Bytes::from), - } - } - - /// Verify data integrity. Allocate less than [`DataBlock::verify_get`] and don't consume self, but - /// does not return the buffer content. + /// Verify data integrity. Does not return the buffer content. pub fn verify(&self, hash: Hash) -> Result<(), Error> { match self { DataBlock::Plain(data) => { diff --git a/src/block/manager.rs b/src/block/manager.rs index 54290888..64e2ea27 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -342,9 +342,8 @@ impl BlockManager { hash: &Hash, order_tag: Option, ) -> Result { - self.rpc_get_raw_block(hash, order_tag) - .await? - .verify_get(*hash) + let stream = self.rpc_get_block_streaming(hash, order_tag).await?; + Ok(read_stream_to_end(stream).await?.into_bytes()) } /// Send block to nodes that should have it -- cgit v1.2.3 From cd1069c1d4219deb7d48758fb9d0a45cc4c1eddb Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 23 Feb 2024 12:11:14 +0100 Subject: [refactor-block] refactor DataBlock and DataBlockPath --- src/block/block.rs | 100 +++++++++++++++++++++++++++++++-------------------- src/block/manager.rs | 56 ++++++++++++----------------- src/block/repair.rs | 4 +-- 3 files changed, 85 insertions(+), 75 deletions(-) diff --git a/src/block/block.rs b/src/block/block.rs index 34afea5d..0b14bad4 100644 --- a/src/block/block.rs +++ b/src/block/block.rs @@ -13,77 +13,99 @@ pub enum DataBlockHeader { Compressed, } -/// A possibly compressed block of data -pub enum DataBlock { - /// Uncompressed data - Plain(Bytes), - /// Data compressed with zstd - Compressed(Bytes), +#[derive(Debug)] +pub struct DataBlockElem { + header: DataBlockHeader, + elem: T, } -#[derive(Debug)] -pub enum DataBlockPath { - /// Uncompressed data fail - Plain(PathBuf), - /// Compressed data fail - Compressed(PathBuf), +/// A possibly compressed block of data +pub type DataBlock = DataBlockElem; + +/// A path to a possibly compressed block of data +pub type DataBlockPath = DataBlockElem; + +impl DataBlockHeader { + pub fn is_compressed(&self) -> bool { + matches!(self, DataBlockHeader::Compressed) + } } -impl DataBlock { +impl DataBlockElem { + pub fn from_parts(header: DataBlockHeader, elem: T) -> Self { + Self { header, elem } + } + + pub fn plain(elem: T) -> Self { + Self { + header: DataBlockHeader::Plain, + elem, + } + } + + pub fn compressed(elem: T) -> Self { + Self { + header: DataBlockHeader::Compressed, + elem, + } + } + + pub fn into_parts(self) -> (DataBlockHeader, T) { + (self.header, self.elem) + } + + pub fn as_parts_ref(&self) -> (DataBlockHeader, &T) { + (self.header, &self.elem) + } + /// Query whether this block is compressed pub fn is_compressed(&self) -> bool { - matches!(self, DataBlock::Compressed(_)) + self.header.is_compressed() } +} +impl DataBlock { /// Get the inner, possibly compressed buffer. You should probably use [`DataBlock::verify_get`] /// instead pub fn inner_buffer(&self) -> &[u8] { - use DataBlock::*; - let (Plain(ref res) | Compressed(ref res)) = self; - res + &self.elem } /// Verify data integrity. Does not return the buffer content. pub fn verify(&self, hash: Hash) -> Result<(), Error> { - match self { - DataBlock::Plain(data) => { - if blake2sum(data) == hash { + match self.header { + DataBlockHeader::Plain => { + if blake2sum(&self.elem) == hash { Ok(()) } else { Err(Error::CorruptData(hash)) } } - DataBlock::Compressed(data) => zstd::stream::copy_decode(&data[..], std::io::sink()) - .map_err(|_| Error::CorruptData(hash)), + DataBlockHeader::Compressed => { + zstd::stream::copy_decode(&self.elem[..], std::io::sink()) + .map_err(|_| Error::CorruptData(hash)) + } } } pub async fn from_buffer(data: Bytes, level: Option) -> DataBlock { tokio::task::spawn_blocking(move || { if let Some(level) = level { - if let Ok(data) = zstd_encode(&data[..], level) { - return DataBlock::Compressed(data.into()); + if let Ok(data_compressed) = zstd_encode(&data[..], level) { + return DataBlock { + header: DataBlockHeader::Compressed, + elem: data_compressed.into(), + }; } } - DataBlock::Plain(data) + DataBlock { + header: DataBlockHeader::Plain, + elem: data.into(), + } }) .await .unwrap() } - - pub fn into_parts(self) -> (DataBlockHeader, Bytes) { - match self { - DataBlock::Plain(data) => (DataBlockHeader::Plain, data), - DataBlock::Compressed(data) => (DataBlockHeader::Compressed, data), - } - } - - pub fn from_parts(h: DataBlockHeader, bytes: Bytes) -> Self { - match h { - DataBlockHeader::Plain => DataBlock::Plain(bytes), - DataBlockHeader::Compressed => DataBlock::Compressed(bytes), - } - } } fn zstd_encode(mut source: R, level: i32) -> std::io::Result> { diff --git a/src/block/manager.rs b/src/block/manager.rs index 64e2ea27..6303248a 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -547,10 +547,7 @@ impl BlockManager { hash: &Hash, block_path: &DataBlockPath, ) -> Result { - let (path, compressed) = match block_path { - DataBlockPath::Plain(p) => (p, false), - DataBlockPath::Compressed(p) => (p, true), - }; + let (header, path) = block_path.as_parts_ref(); let mut f = fs::File::open(&path).await?; let mut data = vec![]; @@ -558,11 +555,7 @@ impl BlockManager { self.metrics.bytes_read.add(data.len() as u64); drop(f); - let data = if compressed { - DataBlock::Compressed(data.into()) - } else { - DataBlock::Plain(data.into()) - }; + let data = DataBlock::from_parts(header, data.into()); if data.verify(*hash).is_err() { self.metrics.corruption_counter.add(1); @@ -615,20 +608,20 @@ impl BlockManager { // first and then a compressed one (as compression may have been // previously enabled). if fs::metadata(&path).await.is_ok() { - return Some(DataBlockPath::Plain(path)); + return Some(DataBlockPath::plain(path)); } path.set_extension("zst"); if fs::metadata(&path).await.is_ok() { - return Some(DataBlockPath::Compressed(path)); + return Some(DataBlockPath::compressed(path)); } } else { path.set_extension("zst"); if fs::metadata(&path).await.is_ok() { - return Some(DataBlockPath::Compressed(path)); + return Some(DataBlockPath::compressed(path)); } path.set_extension(""); if fs::metadata(&path).await.is_ok() { - return Some(DataBlockPath::Plain(path)); + return Some(DataBlockPath::plain(path)); } } } @@ -709,24 +702,25 @@ impl BlockManagerLocked { tgt_path.set_extension("zst"); } - let to_delete = match (existing_path, compressed) { + let existing_info = existing_path.map(|x| x.into_parts()); + let to_delete = match (existing_info, compressed) { // If the block is stored in the wrong directory, // write it again at the correct path and delete the old path - (Some(DataBlockPath::Plain(p)), false) if p != tgt_path => Some(p), - (Some(DataBlockPath::Compressed(p)), true) if p != tgt_path => Some(p), + (Some((DataBlockHeader::Plain, p)), false) if p != tgt_path => Some(p), + (Some((DataBlockHeader::Compressed, p)), true) if p != tgt_path => Some(p), // If the block is already stored not compressed but we have a compressed // copy, write the compressed copy and delete the uncompressed one - (Some(DataBlockPath::Plain(plain_path)), true) => Some(plain_path), + (Some((DataBlockHeader::Plain, plain_path)), true) => Some(plain_path), // If the block is already stored compressed, // keep the stored copy, we have nothing to do - (Some(DataBlockPath::Compressed(_)), _) => return Ok(()), + (Some((DataBlockHeader::Compressed, _)), _) => return Ok(()), // If the block is already stored not compressed, // and we don't have a compressed copy either, // keep the stored copy, we have nothing to do - (Some(DataBlockPath::Plain(_)), false) => return Ok(()), + (Some((DataBlockHeader::Plain, _)), false) => return Ok(()), // If the block isn't stored already, just store what is given to us (None, _) => None, @@ -778,18 +772,14 @@ impl BlockManagerLocked { } async fn move_block_to_corrupted(&self, block_path: &DataBlockPath) -> Result<(), Error> { - let (path, path2) = match block_path { - DataBlockPath::Plain(p) => { - let mut p2 = p.clone(); - p2.set_extension("corrupted"); - (p, p2) - } - DataBlockPath::Compressed(p) => { - let mut p2 = p.clone(); - p2.set_extension("zst.corrupted"); - (p, p2) - } - }; + let (header, path) = block_path.as_parts_ref(); + + let mut path2 = path.clone(); + if header.is_compressed() { + path2.set_extension("zst.corrupted"); + } else { + path2.set_extension("corrupted"); + } fs::rename(path, path2).await?; Ok(()) @@ -799,9 +789,7 @@ impl BlockManagerLocked { let rc = mgr.rc.get_block_rc(hash)?; if rc.is_deletable() { while let Some(path) = mgr.find_block(hash).await { - let path = match path { - DataBlockPath::Plain(p) | DataBlockPath::Compressed(p) => p, - }; + let (_header, path) = path.as_parts_ref(); fs::remove_file(path).await?; mgr.metrics.delete_counter.add(1); } diff --git a/src/block/repair.rs b/src/block/repair.rs index 77ee0d14..2c8acbc9 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -584,8 +584,8 @@ impl Worker for RebalanceWorker { let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash); if path.ancestors().all(|x| x != prim_loc) { let block_path = match path.extension() { - None => DataBlockPath::Plain(path.clone()), - Some(x) if x.to_str() == Some("zst") => DataBlockPath::Compressed(path.clone()), + None => DataBlockPath::plain(path.clone()), + Some(x) if x.to_str() == Some("zst") => DataBlockPath::compressed(path.clone()), _ => { warn!("not rebalancing file: {}", path.to_string_lossy()); return Ok(WorkerState::Busy); -- cgit v1.2.3 From e9c42bca347e3a67f8d6bae953bdf0b53ce37d00 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 23 Feb 2024 12:22:29 +0100 Subject: [refactor-block] add DataBlockStream type --- src/block/block.rs | 5 +++++ src/block/manager.rs | 27 +++++++++++++-------------- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/src/block/block.rs b/src/block/block.rs index 0b14bad4..3f5c4f94 100644 --- a/src/block/block.rs +++ b/src/block/block.rs @@ -7,6 +7,8 @@ use zstd::stream::Encoder; use garage_util::data::*; use garage_util::error::*; +use garage_net::stream::ByteStream; + #[derive(Debug, Serialize, Deserialize, Copy, Clone)] pub enum DataBlockHeader { Plain, @@ -25,6 +27,9 @@ pub type DataBlock = DataBlockElem; /// A path to a possibly compressed block of data pub type DataBlockPath = DataBlockElem; +/// A stream of possibly compressed block data +pub type DataBlockStream = DataBlockElem; + impl DataBlockHeader { pub fn is_compressed(&self) -> bool { matches!(self, DataBlockHeader::Compressed) diff --git a/src/block/manager.rs b/src/block/manager.rs index 6303248a..6773dfd1 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -229,11 +229,9 @@ impl BlockManager { &self, hash: &Hash, order_tag: Option, - ) -> Result<(DataBlockHeader, ByteStream), Error> { - self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move { - Ok((header, stream)) - }) - .await + ) -> Result { + self.rpc_get_raw_block_internal(hash, order_tag, |stream| async move { Ok(stream) }) + .await } /// Ask nodes that might have a (possibly compressed) block for it @@ -243,7 +241,8 @@ impl BlockManager { hash: &Hash, order_tag: Option, ) -> Result { - self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move { + self.rpc_get_raw_block_internal(hash, order_tag, |block_stream| async move { + let (header, stream) = block_stream.into_parts(); read_stream_to_end(stream) .await .err_context("error in block data stream") @@ -259,7 +258,7 @@ impl BlockManager { f: F, ) -> Result where - F: Fn(DataBlockHeader, ByteStream) -> Fut, + F: Fn(DataBlockStream) -> Fut, Fut: futures::Future>, { let who = self.replication.read_nodes(hash); @@ -281,8 +280,8 @@ impl BlockManager { continue; } }; - let (header, stream) = match res.into_parts() { - (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream), + let block_stream = match res.into_parts() { + (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => DataBlockStream::from_parts(header, stream), (Ok(_), _) => { debug!("Get block {:?}: node {:?} returned a malformed response", hash, node); continue; @@ -292,7 +291,7 @@ impl BlockManager { continue; } }; - match f(header, stream).await { + match f(block_stream).await { Ok(ret) => return Ok(ret), Err(e) => { debug!("Get block {:?}: error reading stream from node {:?}: {}", hash, node, e); @@ -316,14 +315,14 @@ impl BlockManager { // ---- Public interface ---- - /// Ask nodes that might have a block for it, - /// return it as a stream + /// Ask nodes that might have a block for it, return it as a stream pub async fn rpc_get_block_streaming( &self, hash: &Hash, order_tag: Option, ) -> Result { - let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?; + let block_stream = self.rpc_get_raw_block_streaming(hash, order_tag).await?; + let (header, stream) = block_stream.into_parts(); match header { DataBlockHeader::Plain => Ok(stream), DataBlockHeader::Compressed => { @@ -336,7 +335,7 @@ impl BlockManager { } } - /// Ask nodes that might have a block for it + /// Ask nodes that might have a block for it, return it as one big Bytes pub async fn rpc_get_block( &self, hash: &Hash, -- cgit v1.2.3 From 6ee691e65f2c6f7b337a62cbfacaddb9ba9cd61a Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 23 Feb 2024 12:26:35 +0100 Subject: [refactor-block] simplify some more --- src/block/block.rs | 21 ++------------------- src/block/manager.rs | 6 +++--- 2 files changed, 5 insertions(+), 22 deletions(-) diff --git a/src/block/block.rs b/src/block/block.rs index 3f5c4f94..504d11f8 100644 --- a/src/block/block.rs +++ b/src/block/block.rs @@ -62,20 +62,9 @@ impl DataBlockElem { pub fn as_parts_ref(&self) -> (DataBlockHeader, &T) { (self.header, &self.elem) } - - /// Query whether this block is compressed - pub fn is_compressed(&self) -> bool { - self.header.is_compressed() - } } impl DataBlock { - /// Get the inner, possibly compressed buffer. You should probably use [`DataBlock::verify_get`] - /// instead - pub fn inner_buffer(&self) -> &[u8] { - &self.elem - } - /// Verify data integrity. Does not return the buffer content. pub fn verify(&self, hash: Hash) -> Result<(), Error> { match self.header { @@ -97,16 +86,10 @@ impl DataBlock { tokio::task::spawn_blocking(move || { if let Some(level) = level { if let Ok(data_compressed) = zstd_encode(&data[..], level) { - return DataBlock { - header: DataBlockHeader::Compressed, - elem: data_compressed.into(), - }; + return DataBlock::compressed(data_compressed.into()); } } - DataBlock { - header: DataBlockHeader::Plain, - elem: data.into(), - } + DataBlock::plain(data.into()) }) .await .unwrap() diff --git a/src/block/manager.rs b/src/block/manager.rs index 6773dfd1..817866f6 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -690,8 +690,8 @@ impl BlockManagerLocked { mgr: &BlockManager, existing_path: Option, ) -> Result<(), Error> { - let compressed = data.is_compressed(); - let data = data.inner_buffer(); + let (header, data) = data.as_parts_ref(); + let compressed = header.is_compressed(); let directory = mgr.data_layout.load().primary_block_dir(hash); @@ -805,7 +805,7 @@ impl BlockManagerLocked { let data = mgr.read_block_from(hash, &wrong_path).await?; self.write_block_inner(hash, &data, mgr, Some(wrong_path)) .await?; - Ok(data.inner_buffer().len()) + Ok(data.as_parts_ref().1.len()) } } -- cgit v1.2.3