diff options
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r-- | src/block/manager.rs | 123 |
1 files changed, 45 insertions, 78 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs index 5283886c..4f3d0978 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -1,5 +1,4 @@ use std::path::PathBuf; -use std::pin::Pin; use std::sync::Arc; use std::time::Duration; @@ -9,8 +8,6 @@ use bytes::Bytes; use rand::prelude::*; use serde::{Deserialize, Serialize}; -use futures::Stream; -use futures_util::stream::StreamExt; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::sync::{mpsc, Mutex, MutexGuard}; @@ -20,7 +17,7 @@ use opentelemetry::{ Context, }; -use garage_net::stream::{stream_asyncread, ByteStream}; +use garage_net::stream::{read_stream_to_end, stream_asyncread, ByteStream}; use garage_db as db; @@ -53,9 +50,6 @@ pub const INLINE_THRESHOLD: usize = 3072; // to delete the block locally. pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600); -pub type BlockStream = - Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static>>; - /// RPC messages used to share blocks of data between nodes #[derive(Debug, Serialize, Deserialize)] pub enum BlockRpc { @@ -235,11 +229,9 @@ impl BlockManager { &self, hash: &Hash, order_tag: Option<OrderTag>, - ) -> Result<(DataBlockHeader, ByteStream), Error> { - self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move { - Ok((header, stream)) - }) - .await + ) -> Result<DataBlockStream, Error> { + self.rpc_get_raw_block_internal(hash, order_tag, |stream| async move { Ok(stream) }) + .await } /// Ask nodes that might have a (possibly compressed) block for it @@ -249,10 +241,12 @@ impl BlockManager { hash: &Hash, order_tag: Option<OrderTag>, ) -> Result<DataBlock, Error> { - self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move { + self.rpc_get_raw_block_internal(hash, order_tag, |block_stream| async move { + let (header, stream) = block_stream.into_parts(); read_stream_to_end(stream) .await - .map(|data| DataBlock::from_parts(header, data)) + .err_context("error in block data stream") + .map(|data| DataBlock::from_parts(header, data.into_bytes())) }) .await } @@ -264,7 +258,7 @@ impl BlockManager { f: F, ) -> Result<T, Error> where - F: Fn(DataBlockHeader, ByteStream) -> Fut, + F: Fn(DataBlockStream) -> Fut, Fut: futures::Future<Output = Result<T, Error>>, { let who = self @@ -288,8 +282,8 @@ impl BlockManager { continue; } }; - let (header, stream) = match res.into_parts() { - (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream), + let block_stream = match res.into_parts() { + (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => DataBlockStream::from_parts(header, stream), (Ok(_), _) => { debug!("Get block {:?}: node {:?} returned a malformed response", hash, node); continue; @@ -299,7 +293,7 @@ impl BlockManager { continue; } }; - match f(header, stream).await { + match f(block_stream).await { Ok(ret) => return Ok(ret), Err(e) => { debug!("Get block {:?}: error reading stream from node {:?}: {}", hash, node, e); @@ -323,14 +317,14 @@ impl BlockManager { // ---- Public interface ---- - /// Ask nodes that might have a block for it, - /// return it as a stream + /// Ask nodes that might have a block for it, return it as a stream pub async fn rpc_get_block_streaming( &self, hash: &Hash, order_tag: Option<OrderTag>, - ) -> Result<BlockStream, Error> { - let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?; + ) -> Result<ByteStream, Error> { + let block_stream = self.rpc_get_raw_block_streaming(hash, order_tag).await?; + let (header, stream) = block_stream.into_parts(); match header { DataBlockHeader::Plain => Ok(stream), DataBlockHeader::Compressed => { @@ -343,15 +337,14 @@ impl BlockManager { } } - /// Ask nodes that might have a block for it + /// Ask nodes that might have a block for it, return it as one big Bytes pub async fn rpc_get_block( &self, hash: &Hash, order_tag: Option<OrderTag>, ) -> Result<Bytes, Error> { - self.rpc_get_raw_block(hash, order_tag) - .await? - .verify_get(*hash) + let stream = self.rpc_get_block_streaming(hash, order_tag).await?; + Ok(read_stream_to_end(stream).await?.into_bytes()) } /// Send block to nodes that should have it @@ -484,7 +477,7 @@ impl BlockManager { stream: Option<ByteStream>, ) -> Result<(), Error> { let stream = stream.ok_or_message("missing stream")?; - let bytes = read_stream_to_end(stream).await?; + let bytes = read_stream_to_end(stream).await?.into_bytes(); let data = DataBlock::from_parts(header, bytes); self.write_block(&hash, &data).await } @@ -555,10 +548,7 @@ impl BlockManager { hash: &Hash, block_path: &DataBlockPath, ) -> Result<DataBlock, Error> { - let (path, compressed) = match block_path { - DataBlockPath::Plain(p) => (p, false), - DataBlockPath::Compressed(p) => (p, true), - }; + let (header, path) = block_path.as_parts_ref(); let mut f = fs::File::open(&path).await?; let mut data = vec![]; @@ -566,11 +556,7 @@ impl BlockManager { self.metrics.bytes_read.add(data.len() as u64); drop(f); - let data = if compressed { - DataBlock::Compressed(data.into()) - } else { - DataBlock::Plain(data.into()) - }; + let data = DataBlock::from_parts(header, data.into()); if data.verify(*hash).is_err() { self.metrics.corruption_counter.add(1); @@ -623,20 +609,20 @@ impl BlockManager { // first and then a compressed one (as compression may have been // previously enabled). if fs::metadata(&path).await.is_ok() { - return Some(DataBlockPath::Plain(path)); + return Some(DataBlockPath::plain(path)); } path.set_extension("zst"); if fs::metadata(&path).await.is_ok() { - return Some(DataBlockPath::Compressed(path)); + return Some(DataBlockPath::compressed(path)); } } else { path.set_extension("zst"); if fs::metadata(&path).await.is_ok() { - return Some(DataBlockPath::Compressed(path)); + return Some(DataBlockPath::compressed(path)); } path.set_extension(""); if fs::metadata(&path).await.is_ok() { - return Some(DataBlockPath::Plain(path)); + return Some(DataBlockPath::plain(path)); } } } @@ -706,8 +692,8 @@ impl BlockManagerLocked { mgr: &BlockManager, existing_path: Option<DataBlockPath>, ) -> Result<(), Error> { - let compressed = data.is_compressed(); - let data = data.inner_buffer(); + let (header, data) = data.as_parts_ref(); + let compressed = header.is_compressed(); let directory = mgr.data_layout.load().primary_block_dir(hash); @@ -717,24 +703,25 @@ impl BlockManagerLocked { tgt_path.set_extension("zst"); } - let to_delete = match (existing_path, compressed) { + let existing_info = existing_path.map(|x| x.into_parts()); + let to_delete = match (existing_info, compressed) { // If the block is stored in the wrong directory, // write it again at the correct path and delete the old path - (Some(DataBlockPath::Plain(p)), false) if p != tgt_path => Some(p), - (Some(DataBlockPath::Compressed(p)), true) if p != tgt_path => Some(p), + (Some((DataBlockHeader::Plain, p)), false) if p != tgt_path => Some(p), + (Some((DataBlockHeader::Compressed, p)), true) if p != tgt_path => Some(p), // If the block is already stored not compressed but we have a compressed // copy, write the compressed copy and delete the uncompressed one - (Some(DataBlockPath::Plain(plain_path)), true) => Some(plain_path), + (Some((DataBlockHeader::Plain, plain_path)), true) => Some(plain_path), // If the block is already stored compressed, // keep the stored copy, we have nothing to do - (Some(DataBlockPath::Compressed(_)), _) => return Ok(()), + (Some((DataBlockHeader::Compressed, _)), _) => return Ok(()), // If the block is already stored not compressed, // and we don't have a compressed copy either, // keep the stored copy, we have nothing to do - (Some(DataBlockPath::Plain(_)), false) => return Ok(()), + (Some((DataBlockHeader::Plain, _)), false) => return Ok(()), // If the block isn't stored already, just store what is given to us (None, _) => None, @@ -786,18 +773,14 @@ impl BlockManagerLocked { } async fn move_block_to_corrupted(&self, block_path: &DataBlockPath) -> Result<(), Error> { - let (path, path2) = match block_path { - DataBlockPath::Plain(p) => { - let mut p2 = p.clone(); - p2.set_extension("corrupted"); - (p, p2) - } - DataBlockPath::Compressed(p) => { - let mut p2 = p.clone(); - p2.set_extension("zst.corrupted"); - (p, p2) - } - }; + let (header, path) = block_path.as_parts_ref(); + + let mut path2 = path.clone(); + if header.is_compressed() { + path2.set_extension("zst.corrupted"); + } else { + path2.set_extension("corrupted"); + } fs::rename(path, path2).await?; Ok(()) @@ -807,9 +790,7 @@ impl BlockManagerLocked { let rc = mgr.rc.get_block_rc(hash)?; if rc.is_deletable() { while let Some(path) = mgr.find_block(hash).await { - let path = match path { - DataBlockPath::Plain(p) | DataBlockPath::Compressed(p) => p, - }; + let (_header, path) = path.as_parts_ref(); fs::remove_file(path).await?; mgr.metrics.delete_counter.add(1); } @@ -826,24 +807,10 @@ impl BlockManagerLocked { let data = mgr.read_block_from(hash, &wrong_path).await?; self.write_block_inner(hash, &data, mgr, Some(wrong_path)) .await?; - Ok(data.inner_buffer().len()) + Ok(data.as_parts_ref().1.len()) } } -async fn read_stream_to_end(mut stream: ByteStream) -> Result<Bytes, Error> { - let mut parts: Vec<Bytes> = vec![]; - while let Some(part) = stream.next().await { - parts.push(part.ok_or_message("error in stream")?); - } - - Ok(parts - .iter() - .map(|x| &x[..]) - .collect::<Vec<_>>() - .concat() - .into()) -} - struct DeleteOnDrop(Option<PathBuf>); impl DeleteOnDrop { |