diff options
Diffstat (limited to 'src/block')
-rw-r--r-- | src/block/Cargo.toml | 3 | ||||
-rw-r--r-- | src/block/block.rs | 37 | ||||
-rw-r--r-- | src/block/manager.rs | 272 |
3 files changed, 237 insertions, 75 deletions
diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml index 2555a44a..3e6f7bc0 100644 --- a/src/block/Cargo.toml +++ b/src/block/Cargo.toml @@ -27,6 +27,8 @@ bytes = "1.0" hex = "0.4" tracing = "0.1.30" rand = "0.8" + +async-compression = { version = "0.3", features = ["tokio", "zstd"] } zstd = { version = "0.9", default-features = false } rmp-serde = "0.15" @@ -36,3 +38,4 @@ serde_bytes = "0.11" futures = "0.3" futures-util = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } +tokio-util = { version = "0.6", features = ["io"] } diff --git a/src/block/block.rs b/src/block/block.rs index f17bd2c0..935aa900 100644 --- a/src/block/block.rs +++ b/src/block/block.rs @@ -5,13 +5,18 @@ use zstd::stream::{decode_all as zstd_decode, Encoder}; use garage_util::data::*; use garage_util::error::*; +#[derive(Debug, Serialize, Deserialize, Copy, Clone)] +pub enum DataBlockHeader { + Plain, + Compressed, +} + /// A possibly compressed block of data -#[derive(Debug, Serialize, Deserialize)] pub enum DataBlock { /// Uncompressed data - Plain(#[serde(with = "serde_bytes")] Vec<u8>), + Plain(Bytes), /// Data compressed with zstd - Compressed(#[serde(with = "serde_bytes")] Vec<u8>), + Compressed(Bytes), } impl DataBlock { @@ -31,7 +36,7 @@ impl DataBlock { /// Get the buffer, possibly decompressing it, and verify it's integrity. /// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system /// is used instead. - pub fn verify_get(self, hash: Hash) -> Result<Vec<u8>, Error> { + pub fn verify_get(self, hash: Hash) -> Result<Bytes, Error> { match self { DataBlock::Plain(data) => { if blake2sum(&data) == hash { @@ -40,9 +45,9 @@ impl DataBlock { Err(Error::CorruptData(hash)) } } - DataBlock::Compressed(data) => { - zstd_decode(&data[..]).map_err(|_| Error::CorruptData(hash)) - } + DataBlock::Compressed(data) => zstd_decode(&data[..]) + .map_err(|_| Error::CorruptData(hash)) + .map(Bytes::from), } } @@ -66,14 +71,28 @@ impl DataBlock { tokio::task::spawn_blocking(move || { if let Some(level) = level { if let Ok(data) = zstd_encode(&data[..], level) { - return DataBlock::Compressed(data); + return DataBlock::Compressed(data.into()); } } - DataBlock::Plain(data.to_vec()) // TODO: remove to_vec here + DataBlock::Plain(data) }) .await .unwrap() } + + pub fn into_parts(self) -> (DataBlockHeader, Bytes) { + match self { + DataBlock::Plain(data) => (DataBlockHeader::Plain, data), + DataBlock::Compressed(data) => (DataBlockHeader::Compressed, data), + } + } + + pub fn from_parts(h: DataBlockHeader, bytes: Bytes) -> Self { + match h { + DataBlockHeader::Plain => DataBlock::Plain(bytes), + DataBlockHeader::Compressed => DataBlock::Compressed(bytes), + } + } } fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> { diff --git a/src/block/manager.rs b/src/block/manager.rs index be53ec6e..80c52510 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -1,5 +1,6 @@ use std::convert::TryInto; use std::path::PathBuf; +use std::pin::Pin; use std::sync::Arc; use std::time::Duration; @@ -8,9 +9,10 @@ use async_trait::async_trait; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use futures::future::*; +use futures::{Stream, TryStreamExt}; +use futures_util::stream::StreamExt; use tokio::fs; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::select; use tokio::sync::{mpsc, watch, Mutex, Notify}; @@ -19,6 +21,8 @@ use opentelemetry::{ Context, KeyValue, }; +use garage_rpc::rpc_helper::netapp::stream::{stream_asyncread, ByteStream}; + use garage_db as db; use garage_db::counted_tree_hack::CountedTree; @@ -71,7 +75,7 @@ pub enum BlockRpc { /// block PutBlock { hash: Hash, - data: DataBlock, + header: DataBlockHeader, }, /// Ask other node if they should have this block, but don't actually have it NeedBlockQuery(Hash), @@ -175,56 +179,148 @@ impl BlockManager { } /// Ask nodes that might have a (possibly compressed) block for it - async fn rpc_get_raw_block(&self, hash: &Hash) -> Result<DataBlock, Error> { + /// Return it as a stream with a header + async fn rpc_get_raw_block_streaming( + &self, + hash: &Hash, + ) -> Result<(DataBlockHeader, ByteStream), Error> { let who = self.replication.read_nodes(hash); - let resps = self - .system - .rpc - .try_call_many( - &self.endpoint, - &who[..], - BlockRpc::GetBlock(*hash), - RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(1) - .with_timeout(BLOCK_RW_TIMEOUT) - .interrupt_after_quorum(true), - ) - .await?; + //let who = self.system.rpc.request_order(&who); + + for node in who.iter() { + let node_id = NodeID::from(*node); + let rpc = + self.endpoint + .call_streaming(&node_id, BlockRpc::GetBlock(*hash), PRIO_NORMAL); + tokio::select! { + res = rpc => { + let res = match res { + Ok(res) => res, + Err(e) => { + debug!("Node {:?} returned error: {}", node, e); + continue; + } + }; + let (header, stream) = match res.into_parts() { + (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream), + _ => { + debug!("Node {:?} returned a malformed response", node); + continue; + } + }; + return Ok((header, stream)); + } + _ = tokio::time::sleep(BLOCK_RW_TIMEOUT) => { + debug!("Node {:?} didn't return block in time, trying next.", node); + } + }; + } - for resp in resps { - if let BlockRpc::PutBlock { data, .. } = resp { - return Ok(data); - } + Err(Error::Message(format!( + "Unable to read block {:?}: no node returned a valid block", + hash + ))) + } + + /// Ask nodes that might have a (possibly compressed) block for it + /// Return its entire body + async fn rpc_get_raw_block(&self, hash: &Hash) -> Result<DataBlock, Error> { + let who = self.replication.read_nodes(hash); + //let who = self.system.rpc.request_order(&who); + + for node in who.iter() { + let node_id = NodeID::from(*node); + let rpc = + self.endpoint + .call_streaming(&node_id, BlockRpc::GetBlock(*hash), PRIO_NORMAL); + tokio::select! { + res = rpc => { + let res = match res { + Ok(res) => res, + Err(e) => { + debug!("Node {:?} returned error: {}", node, e); + continue; + } + }; + let (header, stream) = match res.into_parts() { + (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream), + _ => { + debug!("Node {:?} returned a malformed response", node); + continue; + } + }; + match read_stream_to_end(stream).await { + Ok(bytes) => return Ok(DataBlock::from_parts(header, bytes)), + Err(e) => { + debug!("Error reading stream from node {:?}: {}", node, e); + } + } + } + _ = tokio::time::sleep(BLOCK_RW_TIMEOUT) => { + debug!("Node {:?} didn't return block in time, trying next.", node); + } + }; } + Err(Error::Message(format!( - "Unable to read block {:?}: no valid blocks returned", + "Unable to read block {:?}: no node returned a valid block", hash ))) } // ---- Public interface ---- + /// Ask nodes that might have a block for it, + /// return it as a stream + pub async fn rpc_get_block_streaming( + &self, + hash: &Hash, + ) -> Result< + Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static>>, + Error, + > { + let (header, stream) = self.rpc_get_raw_block_streaming(hash).await?; + match header { + DataBlockHeader::Plain => Ok(Box::pin(stream.map_err(|_| { + std::io::Error::new(std::io::ErrorKind::Other, "netapp stream error") + }))), + DataBlockHeader::Compressed => { + // Too many things, I hate it. + let reader = stream_asyncread(stream); + let reader = BufReader::new(reader); + let reader = async_compression::tokio::bufread::ZstdDecoder::new(reader); + Ok(Box::pin(tokio_util::io::ReaderStream::new(reader))) + } + } + } + /// Ask nodes that might have a block for it - pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> { + pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Bytes, Error> { self.rpc_get_raw_block(hash).await?.verify_get(*hash) } /// Send block to nodes that should have it pub async fn rpc_put_block(&self, hash: Hash, data: Bytes) -> Result<(), Error> { let who = self.replication.write_nodes(&hash); - let data = DataBlock::from_buffer(data, self.compression_level).await; + + let (header, bytes) = DataBlock::from_buffer(data, self.compression_level) + .await + .into_parts(); + let put_block_rpc = + Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes); + self.system .rpc .try_call_many( &self.endpoint, &who[..], - // TODO: remove to_vec() here - BlockRpc::PutBlock { hash, data }, + put_block_rpc, RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(self.replication.write_quorum()) .with_timeout(BLOCK_RW_TIMEOUT), ) .await?; + Ok(()) } @@ -309,13 +405,25 @@ impl BlockManager { // ---- Reading and writing blocks locally ---- + async fn handle_put_block( + &self, + hash: Hash, + header: DataBlockHeader, + stream: Option<ByteStream>, + ) -> Result<(), Error> { + let stream = stream.ok_or_message("missing stream")?; + let bytes = read_stream_to_end(stream).await?; + let data = DataBlock::from_parts(header, bytes); + self.write_block(&hash, &data).await + } + /// Write a block to disk - async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result<BlockRpc, Error> { + async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result<(), Error> { let tracer = opentelemetry::global::tracer("garage"); let write_size = data.inner_buffer().len() as u64; - let res = self.mutation_lock[hash.as_slice()[0] as usize] + self.mutation_lock[hash.as_slice()[0] as usize] .lock() .with_context(Context::current_with_span( tracer.start("Acquire mutation_lock"), @@ -330,21 +438,31 @@ impl BlockManager { self.metrics.bytes_written.add(write_size); - Ok(res) + Ok(()) } - /// Read block from disk, verifying it's integrity - pub(crate) async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> { - let data = self - .read_block_internal(hash) - .bound_record_duration(&self.metrics.block_read_duration) - .await?; + async fn handle_get_block(&self, hash: &Hash) -> Resp<BlockRpc> { + let block = match self.read_block(hash).await { + Ok(data) => data, + Err(e) => return Resp::new(Err(e)), + }; + + let (header, data) = block.into_parts(); - self.metrics - .bytes_read - .add(data.inner_buffer().len() as u64); + self.metrics.bytes_read.add(data.len() as u64); - Ok(BlockRpc::PutBlock { hash: *hash, data }) + Resp::new(Ok(BlockRpc::PutBlock { + hash: *hash, + header, + })) + .with_stream_from_buffer(data) + } + + /// Read block from disk, verifying it's integrity + pub(crate) async fn read_block(&self, hash: &Hash) -> Result<DataBlock, Error> { + self.read_block_internal(hash) + .bound_record_duration(&self.metrics.block_read_duration) + .await } async fn read_block_internal(&self, hash: &Hash) -> Result<DataBlock, Error> { @@ -367,9 +485,9 @@ impl BlockManager { drop(f); let data = if compressed { - DataBlock::Compressed(data) + DataBlock::Compressed(data.into()) } else { - DataBlock::Plain(data) + DataBlock::Plain(data.into()) }; if data.verify(*hash).is_err() { @@ -637,24 +755,24 @@ impl BlockManager { } who.retain(|id| *id != self.system.id); - let msg = Arc::new(BlockRpc::NeedBlockQuery(*hash)); - let who_needs_fut = who.iter().map(|to| { - self.system.rpc.call_arc( + let who_needs_resps = self + .system + .rpc + .call_many( &self.endpoint, - *to, - msg.clone(), + &who, + BlockRpc::NeedBlockQuery(*hash), RequestStrategy::with_priority(PRIO_BACKGROUND) .with_timeout(NEED_BLOCK_QUERY_TIMEOUT), ) - }); - let who_needs_resps = join_all(who_needs_fut).await; + .await?; let mut need_nodes = vec![]; - for (node, needed) in who.iter().zip(who_needs_resps.into_iter()) { + for (node, needed) in who_needs_resps.into_iter() { match needed.err_context("NeedBlockQuery RPC")? { BlockRpc::NeedBlockReply(needed) => { if needed { - need_nodes.push(*node); + need_nodes.push(node); } } m => { @@ -676,7 +794,13 @@ impl BlockManager { .add(1, &[KeyValue::new("to", format!("{:?}", node))]); } - let put_block_message = self.read_block(hash).await?; + let block = self.read_block(hash).await?; + let (header, bytes) = block.into_parts(); + let put_block_message = Req::new(BlockRpc::PutBlock { + hash: *hash, + header, + })? + .with_stream_from_buffer(bytes); self.system .rpc .try_call_many( @@ -724,17 +848,19 @@ impl BlockManager { } #[async_trait] -impl EndpointHandler<BlockRpc> for BlockManager { - async fn handle( - self: &Arc<Self>, - message: &BlockRpc, - _from: NodeID, - ) -> Result<BlockRpc, Error> { - match message { - BlockRpc::PutBlock { hash, data } => self.write_block(hash, data).await, - BlockRpc::GetBlock(h) => self.read_block(h).await, - BlockRpc::NeedBlockQuery(h) => self.need_block(h).await.map(BlockRpc::NeedBlockReply), - m => Err(Error::unexpected_rpc_message(m)), +impl StreamingEndpointHandler<BlockRpc> for BlockManager { + async fn handle(self: &Arc<Self>, mut message: Req<BlockRpc>, _from: NodeID) -> Resp<BlockRpc> { + match message.msg() { + BlockRpc::PutBlock { hash, header } => Resp::new( + self.handle_put_block(*hash, *header, message.take_stream()) + .await + .map(|_| BlockRpc::Ok), + ), + BlockRpc::GetBlock(h) => self.handle_get_block(h).await, + BlockRpc::NeedBlockQuery(h) => { + Resp::new(self.need_block(h).await.map(BlockRpc::NeedBlockReply)) + } + m => Resp::new(Err(Error::unexpected_rpc_message(m))), } } } @@ -832,7 +958,7 @@ impl BlockManagerLocked { hash: &Hash, data: &DataBlock, mgr: &BlockManager, - ) -> Result<BlockRpc, Error> { + ) -> Result<(), Error> { let compressed = data.is_compressed(); let data = data.inner_buffer(); @@ -843,8 +969,8 @@ impl BlockManagerLocked { fs::create_dir_all(&directory).await?; let to_delete = match (mgr.is_block_compressed(hash).await, compressed) { - (Ok(true), _) => return Ok(BlockRpc::Ok), - (Ok(false), false) => return Ok(BlockRpc::Ok), + (Ok(true), _) => return Ok(()), + (Ok(false), false) => return Ok(()), (Ok(false), true) => { let path_to_delete = path.clone(); path.set_extension("zst"); @@ -883,7 +1009,7 @@ impl BlockManagerLocked { dir.sync_all().await?; drop(dir); - Ok(BlockRpc::Ok) + Ok(()) } async fn move_block_to_corrupted(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> { @@ -964,3 +1090,17 @@ impl ErrorCounter { self.last_try + self.delay_msec() } } + +async fn read_stream_to_end(mut stream: ByteStream) -> Result<Bytes, Error> { + let mut parts: Vec<Bytes> = vec![]; + while let Some(part) = stream.next().await { + parts.push(part.ok_or_message("error in stream")?); + } + + Ok(parts + .iter() + .map(|x| &x[..]) + .collect::<Vec<_>>() + .concat() + .into()) +} |