From 8e7e680afe39f48fe15f365c9ef3fee57596e119 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Jul 2022 15:20:00 +0200 Subject: First adaptation to WIP netapp with streaming body --- src/block/manager.rs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) (limited to 'src/block') diff --git a/src/block/manager.rs b/src/block/manager.rs index be53ec6e..408de148 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -8,7 +8,6 @@ use async_trait::async_trait; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use futures::future::*; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::select; @@ -637,24 +636,24 @@ impl BlockManager { } who.retain(|id| *id != self.system.id); - let msg = Arc::new(BlockRpc::NeedBlockQuery(*hash)); - let who_needs_fut = who.iter().map(|to| { - self.system.rpc.call_arc( + let who_needs_resps = self + .system + .rpc + .call_many( &self.endpoint, - *to, - msg.clone(), + &who, + BlockRpc::NeedBlockQuery(*hash), RequestStrategy::with_priority(PRIO_BACKGROUND) .with_timeout(NEED_BLOCK_QUERY_TIMEOUT), ) - }); - let who_needs_resps = join_all(who_needs_fut).await; + .await?; let mut need_nodes = vec![]; - for (node, needed) in who.iter().zip(who_needs_resps.into_iter()) { + for (node, needed) in who_needs_resps.into_iter() { match needed.err_context("NeedBlockQuery RPC")? { BlockRpc::NeedBlockReply(needed) => { if needed { - need_nodes.push(*node); + need_nodes.push(node); } } m => { -- cgit v1.2.3 From 605a630333c8ee60c55fe011a375c01277bba173 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Jul 2022 18:20:27 +0200 Subject: Use streaming in block manager --- src/block/Cargo.toml | 3 + src/block/block.rs | 37 ++++++-- src/block/manager.rs | 249 +++++++++++++++++++++++++++++++++++++++------------ 3 files changed, 225 insertions(+), 64 deletions(-) (limited to 'src/block') diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml index 2555a44a..3e6f7bc0 100644 --- a/src/block/Cargo.toml +++ b/src/block/Cargo.toml @@ -27,6 +27,8 @@ bytes = "1.0" hex = "0.4" tracing = "0.1.30" rand = "0.8" + +async-compression = { version = "0.3", features = ["tokio", "zstd"] } zstd = { version = "0.9", default-features = false } rmp-serde = "0.15" @@ -36,3 +38,4 @@ serde_bytes = "0.11" futures = "0.3" futures-util = "0.3" tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } +tokio-util = { version = "0.6", features = ["io"] } diff --git a/src/block/block.rs b/src/block/block.rs index f17bd2c0..935aa900 100644 --- a/src/block/block.rs +++ b/src/block/block.rs @@ -5,13 +5,18 @@ use zstd::stream::{decode_all as zstd_decode, Encoder}; use garage_util::data::*; use garage_util::error::*; +#[derive(Debug, Serialize, Deserialize, Copy, Clone)] +pub enum DataBlockHeader { + Plain, + Compressed, +} + /// A possibly compressed block of data -#[derive(Debug, Serialize, Deserialize)] pub enum DataBlock { /// Uncompressed data - Plain(#[serde(with = "serde_bytes")] Vec), + Plain(Bytes), /// Data compressed with zstd - Compressed(#[serde(with = "serde_bytes")] Vec), + Compressed(Bytes), } impl DataBlock { @@ -31,7 +36,7 @@ impl DataBlock { /// Get the buffer, possibly decompressing it, and verify it's integrity. /// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system /// is used instead. - pub fn verify_get(self, hash: Hash) -> Result, Error> { + pub fn verify_get(self, hash: Hash) -> Result { match self { DataBlock::Plain(data) => { if blake2sum(&data) == hash { @@ -40,9 +45,9 @@ impl DataBlock { Err(Error::CorruptData(hash)) } } - DataBlock::Compressed(data) => { - zstd_decode(&data[..]).map_err(|_| Error::CorruptData(hash)) - } + DataBlock::Compressed(data) => zstd_decode(&data[..]) + .map_err(|_| Error::CorruptData(hash)) + .map(Bytes::from), } } @@ -66,14 +71,28 @@ impl DataBlock { tokio::task::spawn_blocking(move || { if let Some(level) = level { if let Ok(data) = zstd_encode(&data[..], level) { - return DataBlock::Compressed(data); + return DataBlock::Compressed(data.into()); } } - DataBlock::Plain(data.to_vec()) // TODO: remove to_vec here + DataBlock::Plain(data) }) .await .unwrap() } + + pub fn into_parts(self) -> (DataBlockHeader, Bytes) { + match self { + DataBlock::Plain(data) => (DataBlockHeader::Plain, data), + DataBlock::Compressed(data) => (DataBlockHeader::Compressed, data), + } + } + + pub fn from_parts(h: DataBlockHeader, bytes: Bytes) -> Self { + match h { + DataBlockHeader::Plain => DataBlock::Plain(bytes), + DataBlockHeader::Compressed => DataBlock::Compressed(bytes), + } + } } fn zstd_encode(mut source: R, level: i32) -> std::io::Result> { diff --git a/src/block/manager.rs b/src/block/manager.rs index 408de148..bb01c300 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -1,5 +1,6 @@ use std::convert::TryInto; use std::path::PathBuf; +use std::pin::Pin; use std::sync::Arc; use std::time::Duration; @@ -8,8 +9,10 @@ use async_trait::async_trait; use bytes::Bytes; use serde::{Deserialize, Serialize}; +use futures::{Stream, TryStreamExt}; +use futures_util::stream::StreamExt; use tokio::fs; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::select; use tokio::sync::{mpsc, watch, Mutex, Notify}; @@ -18,6 +21,8 @@ use opentelemetry::{ Context, KeyValue, }; +use garage_rpc::rpc_helper::netapp::stream::{stream_asyncread, ByteStream}; + use garage_db as db; use garage_db::counted_tree_hack::CountedTree; @@ -70,7 +75,7 @@ pub enum BlockRpc { /// block PutBlock { hash: Hash, - data: DataBlock, + header: DataBlockHeader, }, /// Ask other node if they should have this block, but don't actually have it NeedBlockQuery(Hash), @@ -174,56 +179,146 @@ impl BlockManager { } /// Ask nodes that might have a (possibly compressed) block for it + /// Return it as a stream with a header + async fn rpc_get_raw_block_streaming( + &self, + hash: &Hash, + ) -> Result<(DataBlockHeader, ByteStream), Error> { + let who = self.replication.read_nodes(hash); + + for node in who.iter() { + let node_id = NodeID::from(*node); + let rpc = + self.endpoint + .call_streaming(&node_id, BlockRpc::GetBlock(*hash), PRIO_NORMAL); + tokio::select! { + res = rpc => { + let res = match res { + Ok(res) => res, + Err(e) => { + debug!("Node {:?} returned error: {}", node, e); + continue; + } + }; + let (header, stream) = match res.into_parts() { + (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream), + _ => { + debug!("Node {:?} returned a malformed response", node); + continue; + } + }; + return Ok((header, stream)); + } + _ = tokio::time::sleep(BLOCK_RW_TIMEOUT) => { + debug!("Node {:?} didn't return block in time, trying next.", node); + } + }; + } + + Err(Error::Message(format!( + "Unable to read block {:?}: no node returned a valid block", + hash + ))) + } + + /// Ask nodes that might have a (possibly compressed) block for it + /// Return its entire body async fn rpc_get_raw_block(&self, hash: &Hash) -> Result { let who = self.replication.read_nodes(hash); - let resps = self - .system - .rpc - .try_call_many( - &self.endpoint, - &who[..], - BlockRpc::GetBlock(*hash), - RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(1) - .with_timeout(BLOCK_RW_TIMEOUT) - .interrupt_after_quorum(true), - ) - .await?; - for resp in resps { - if let BlockRpc::PutBlock { data, .. } = resp { - return Ok(data); - } + for node in who.iter() { + let node_id = NodeID::from(*node); + let rpc = + self.endpoint + .call_streaming(&node_id, BlockRpc::GetBlock(*hash), PRIO_NORMAL); + tokio::select! { + res = rpc => { + let res = match res { + Ok(res) => res, + Err(e) => { + debug!("Node {:?} returned error: {}", node, e); + continue; + } + }; + let (header, stream) = match res.into_parts() { + (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream), + _ => { + debug!("Node {:?} returned a malformed response", node); + continue; + } + }; + match read_stream_to_end(stream).await { + Ok(bytes) => return Ok(DataBlock::from_parts(header, bytes)), + Err(e) => { + debug!("Error reading stream from node {:?}: {}", node, e); + } + } + } + _ = tokio::time::sleep(BLOCK_RW_TIMEOUT) => { + debug!("Node {:?} didn't return block in time, trying next.", node); + } + }; } + Err(Error::Message(format!( - "Unable to read block {:?}: no valid blocks returned", + "Unable to read block {:?}: no node returned a valid block", hash ))) } // ---- Public interface ---- + /// Ask nodes that might have a block for it, + /// return it as a stream + pub async fn rpc_get_block_streaming( + &self, + hash: &Hash, + ) -> Result< + Pin> + Send + Sync + 'static>>, + Error, + > { + let (header, stream) = self.rpc_get_raw_block_streaming(hash).await?; + match header { + DataBlockHeader::Plain => Ok(Box::pin(stream.map_err(|_| { + std::io::Error::new(std::io::ErrorKind::Other, "netapp stream error") + }))), + DataBlockHeader::Compressed => { + // Too many things, I hate it. + let reader = stream_asyncread(stream); + let reader = BufReader::new(reader); + let reader = async_compression::tokio::bufread::ZstdDecoder::new(reader); + Ok(Box::pin(tokio_util::io::ReaderStream::new(reader))) + } + } + } + /// Ask nodes that might have a block for it - pub async fn rpc_get_block(&self, hash: &Hash) -> Result, Error> { + pub async fn rpc_get_block(&self, hash: &Hash) -> Result { self.rpc_get_raw_block(hash).await?.verify_get(*hash) } /// Send block to nodes that should have it pub async fn rpc_put_block(&self, hash: Hash, data: Bytes) -> Result<(), Error> { let who = self.replication.write_nodes(&hash); - let data = DataBlock::from_buffer(data, self.compression_level).await; + + let (header, bytes) = DataBlock::from_buffer(data, self.compression_level) + .await + .into_parts(); + let put_block_rpc = + Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes); + self.system .rpc .try_call_many( &self.endpoint, &who[..], - // TODO: remove to_vec() here - BlockRpc::PutBlock { hash, data }, + put_block_rpc, RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(self.replication.write_quorum()) .with_timeout(BLOCK_RW_TIMEOUT), ) .await?; + Ok(()) } @@ -308,13 +403,25 @@ impl BlockManager { // ---- Reading and writing blocks locally ---- + async fn handle_put_block( + &self, + hash: Hash, + header: DataBlockHeader, + stream: Option, + ) -> Result<(), Error> { + let stream = stream.ok_or_message("missing stream")?; + let bytes = read_stream_to_end(stream).await?; + let data = DataBlock::from_parts(header, bytes); + self.write_block(&hash, &data).await + } + /// Write a block to disk - async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result { + async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result<(), Error> { let tracer = opentelemetry::global::tracer("garage"); let write_size = data.inner_buffer().len() as u64; - let res = self.mutation_lock[hash.as_slice()[0] as usize] + self.mutation_lock[hash.as_slice()[0] as usize] .lock() .with_context(Context::current_with_span( tracer.start("Acquire mutation_lock"), @@ -329,21 +436,31 @@ impl BlockManager { self.metrics.bytes_written.add(write_size); - Ok(res) + Ok(()) } - /// Read block from disk, verifying it's integrity - pub(crate) async fn read_block(&self, hash: &Hash) -> Result { - let data = self - .read_block_internal(hash) - .bound_record_duration(&self.metrics.block_read_duration) - .await?; + async fn handle_get_block(&self, hash: &Hash) -> Resp { + let block = match self.read_block(hash).await { + Ok(data) => data, + Err(e) => return Resp::new(Err(e)), + }; + + let (header, data) = block.into_parts(); - self.metrics - .bytes_read - .add(data.inner_buffer().len() as u64); + self.metrics.bytes_read.add(data.len() as u64); - Ok(BlockRpc::PutBlock { hash: *hash, data }) + Resp::new(Ok(BlockRpc::PutBlock { + hash: *hash, + header, + })) + .with_stream_from_buffer(data) + } + + /// Read block from disk, verifying it's integrity + pub(crate) async fn read_block(&self, hash: &Hash) -> Result { + self.read_block_internal(hash) + .bound_record_duration(&self.metrics.block_read_duration) + .await } async fn read_block_internal(&self, hash: &Hash) -> Result { @@ -366,9 +483,9 @@ impl BlockManager { drop(f); let data = if compressed { - DataBlock::Compressed(data) + DataBlock::Compressed(data.into()) } else { - DataBlock::Plain(data) + DataBlock::Plain(data.into()) }; if data.verify(*hash).is_err() { @@ -675,7 +792,13 @@ impl BlockManager { .add(1, &[KeyValue::new("to", format!("{:?}", node))]); } - let put_block_message = self.read_block(hash).await?; + let block = self.read_block(hash).await?; + let (header, bytes) = block.into_parts(); + let put_block_message = Req::new(BlockRpc::PutBlock { + hash: *hash, + header, + })? + .with_stream_from_buffer(bytes); self.system .rpc .try_call_many( @@ -723,17 +846,19 @@ impl BlockManager { } #[async_trait] -impl EndpointHandler for BlockManager { - async fn handle( - self: &Arc, - message: &BlockRpc, - _from: NodeID, - ) -> Result { - match message { - BlockRpc::PutBlock { hash, data } => self.write_block(hash, data).await, - BlockRpc::GetBlock(h) => self.read_block(h).await, - BlockRpc::NeedBlockQuery(h) => self.need_block(h).await.map(BlockRpc::NeedBlockReply), - m => Err(Error::unexpected_rpc_message(m)), +impl StreamingEndpointHandler for BlockManager { + async fn handle(self: &Arc, mut message: Req, _from: NodeID) -> Resp { + match message.msg() { + BlockRpc::PutBlock { hash, header } => Resp::new( + self.handle_put_block(*hash, *header, message.take_stream()) + .await + .map(|_| BlockRpc::Ok), + ), + BlockRpc::GetBlock(h) => self.handle_get_block(h).await, + BlockRpc::NeedBlockQuery(h) => { + Resp::new(self.need_block(h).await.map(BlockRpc::NeedBlockReply)) + } + m => Resp::new(Err(Error::unexpected_rpc_message(m))), } } } @@ -831,7 +956,7 @@ impl BlockManagerLocked { hash: &Hash, data: &DataBlock, mgr: &BlockManager, - ) -> Result { + ) -> Result<(), Error> { let compressed = data.is_compressed(); let data = data.inner_buffer(); @@ -842,8 +967,8 @@ impl BlockManagerLocked { fs::create_dir_all(&directory).await?; let to_delete = match (mgr.is_block_compressed(hash).await, compressed) { - (Ok(true), _) => return Ok(BlockRpc::Ok), - (Ok(false), false) => return Ok(BlockRpc::Ok), + (Ok(true), _) => return Ok(()), + (Ok(false), false) => return Ok(()), (Ok(false), true) => { let path_to_delete = path.clone(); path.set_extension("zst"); @@ -882,7 +1007,7 @@ impl BlockManagerLocked { dir.sync_all().await?; drop(dir); - Ok(BlockRpc::Ok) + Ok(()) } async fn move_block_to_corrupted(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> { @@ -963,3 +1088,17 @@ impl ErrorCounter { self.last_try + self.delay_msec() } } + +async fn read_stream_to_end(mut stream: ByteStream) -> Result { + let mut parts: Vec = vec![]; + while let Some(part) = stream.next().await { + parts.push(part.ok_or_message("error in stream")?); + } + + Ok(parts + .iter() + .map(|x| &x[..]) + .collect::>() + .concat() + .into()) +} -- cgit v1.2.3 From e935861854deed5d1ca66767fc51d9849201a4dd Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 25 Jul 2022 18:19:35 +0200 Subject: Factor out node request order selection logic & use in manager --- src/block/manager.rs | 2 ++ 1 file changed, 2 insertions(+) (limited to 'src/block') diff --git a/src/block/manager.rs b/src/block/manager.rs index bb01c300..80c52510 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -185,6 +185,7 @@ impl BlockManager { hash: &Hash, ) -> Result<(DataBlockHeader, ByteStream), Error> { let who = self.replication.read_nodes(hash); + //let who = self.system.rpc.request_order(&who); for node in who.iter() { let node_id = NodeID::from(*node); @@ -225,6 +226,7 @@ impl BlockManager { /// Return its entire body async fn rpc_get_raw_block(&self, hash: &Hash) -> Result { let who = self.replication.read_nodes(hash); + //let who = self.system.rpc.request_order(&who); for node in who.iter() { let node_id = NodeID::from(*node); -- cgit v1.2.3 From 70231d68b27054c2185b73b5ceee1c445baaaa2d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 31 Aug 2022 19:44:27 +0200 Subject: Fix bytes_read counter --- src/block/manager.rs | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) (limited to 'src/block') diff --git a/src/block/manager.rs b/src/block/manager.rs index 80c52510..b8fe4c74 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -449,8 +449,6 @@ impl BlockManager { let (header, data) = block.into_parts(); - self.metrics.bytes_read.add(data.len() as u64); - Resp::new(Ok(BlockRpc::PutBlock { hash: *hash, header, @@ -460,9 +458,16 @@ impl BlockManager { /// Read block from disk, verifying it's integrity pub(crate) async fn read_block(&self, hash: &Hash) -> Result { - self.read_block_internal(hash) + let data = self + .read_block_internal(hash) .bound_record_duration(&self.metrics.block_read_duration) - .await + .await?; + + self.metrics + .bytes_read + .add(data.inner_buffer().len() as u64); + + Ok(data) } async fn read_block_internal(&self, hash: &Hash) -> Result { -- cgit v1.2.3 From bc977f9a7a7a5bd87ccf5fe96d64b397591f8ba0 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 1 Sep 2022 12:58:20 +0200 Subject: Update to Netapp with OrderTag support and exploit OrderTags --- src/block/manager.rs | 55 +++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 39 insertions(+), 16 deletions(-) (limited to 'src/block') diff --git a/src/block/manager.rs b/src/block/manager.rs index b8fe4c74..b9f6fc0f 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -33,6 +33,7 @@ use garage_util::metrics::RecordDuration; use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; +use garage_rpc::rpc_helper::OrderTag; use garage_rpc::system::System; use garage_rpc::*; @@ -70,7 +71,7 @@ pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600); pub enum BlockRpc { Ok, /// Message to ask for a block of data, by hash - GetBlock(Hash), + GetBlock(Hash, Option), /// Message to send a block of data, either because requested, of for first delivery of new /// block PutBlock { @@ -183,15 +184,18 @@ impl BlockManager { async fn rpc_get_raw_block_streaming( &self, hash: &Hash, + order_tag: Option, ) -> Result<(DataBlockHeader, ByteStream), Error> { let who = self.replication.read_nodes(hash); //let who = self.system.rpc.request_order(&who); for node in who.iter() { let node_id = NodeID::from(*node); - let rpc = - self.endpoint - .call_streaming(&node_id, BlockRpc::GetBlock(*hash), PRIO_NORMAL); + let rpc = self.endpoint.call_streaming( + &node_id, + BlockRpc::GetBlock(*hash, order_tag), + PRIO_NORMAL, + ); tokio::select! { res = rpc => { let res = match res { @@ -224,15 +228,21 @@ impl BlockManager { /// Ask nodes that might have a (possibly compressed) block for it /// Return its entire body - async fn rpc_get_raw_block(&self, hash: &Hash) -> Result { + async fn rpc_get_raw_block( + &self, + hash: &Hash, + order_tag: Option, + ) -> Result { let who = self.replication.read_nodes(hash); //let who = self.system.rpc.request_order(&who); for node in who.iter() { let node_id = NodeID::from(*node); - let rpc = - self.endpoint - .call_streaming(&node_id, BlockRpc::GetBlock(*hash), PRIO_NORMAL); + let rpc = self.endpoint.call_streaming( + &node_id, + BlockRpc::GetBlock(*hash, order_tag), + PRIO_NORMAL, + ); tokio::select! { res = rpc => { let res = match res { @@ -275,11 +285,12 @@ impl BlockManager { pub async fn rpc_get_block_streaming( &self, hash: &Hash, + order_tag: Option, ) -> Result< Pin> + Send + Sync + 'static>>, Error, > { - let (header, stream) = self.rpc_get_raw_block_streaming(hash).await?; + let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?; match header { DataBlockHeader::Plain => Ok(Box::pin(stream.map_err(|_| { std::io::Error::new(std::io::ErrorKind::Other, "netapp stream error") @@ -295,8 +306,14 @@ impl BlockManager { } /// Ask nodes that might have a block for it - pub async fn rpc_get_block(&self, hash: &Hash) -> Result { - self.rpc_get_raw_block(hash).await?.verify_get(*hash) + pub async fn rpc_get_block( + &self, + hash: &Hash, + order_tag: Option, + ) -> Result { + self.rpc_get_raw_block(hash, order_tag) + .await? + .verify_get(*hash) } /// Send block to nodes that should have it @@ -441,7 +458,7 @@ impl BlockManager { Ok(()) } - async fn handle_get_block(&self, hash: &Hash) -> Resp { + async fn handle_get_block(&self, hash: &Hash, order_tag: Option) -> Resp { let block = match self.read_block(hash).await { Ok(data) => data, Err(e) => return Resp::new(Err(e)), @@ -449,11 +466,17 @@ impl BlockManager { let (header, data) = block.into_parts(); - Resp::new(Ok(BlockRpc::PutBlock { + let resp = Resp::new(Ok(BlockRpc::PutBlock { hash: *hash, header, })) - .with_stream_from_buffer(data) + .with_stream_from_buffer(data); + + if let Some(order_tag) = order_tag { + resp.with_order_tag(order_tag) + } else { + resp + } } /// Read block from disk, verifying it's integrity @@ -841,7 +864,7 @@ impl BlockManager { hash ); - let block_data = self.rpc_get_raw_block(hash).await?; + let block_data = self.rpc_get_raw_block(hash, None).await?; self.metrics.resync_recv_counter.add(1); @@ -861,7 +884,7 @@ impl StreamingEndpointHandler for BlockManager { .await .map(|_| BlockRpc::Ok), ), - BlockRpc::GetBlock(h) => self.handle_get_block(h).await, + BlockRpc::GetBlock(h, order_tag) => self.handle_get_block(h, *order_tag).await, BlockRpc::NeedBlockQuery(h) => { Resp::new(self.need_block(h).await.map(BlockRpc::NeedBlockReply)) } -- cgit v1.2.3 From df094bd8075332bb765b8b44c9b19cf2485e9ca8 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 1 Sep 2022 16:30:44 +0200 Subject: Less strict timeouts --- src/block/manager.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'src/block') diff --git a/src/block/manager.rs b/src/block/manager.rs index b9f6fc0f..00438648 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -48,10 +48,14 @@ use crate::repair::*; pub const INLINE_THRESHOLD: usize = 3072; // Timeout for RPCs that read and write blocks to remote nodes -const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(30); +const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(60); // Timeout for RPCs that ask other nodes whether they need a copy // of a given block before we delete it locally -const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); +// The timeout here is relatively low because we don't want to block +// the entire resync loop when some nodes are not responding. +// Nothing will be deleted if the nodes don't answer the queries, +// we will just retry later. +const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(15); // The delay between the time where a resync operation fails // and the time when it is retried, with exponential backoff -- cgit v1.2.3 From 99b532b85bf35b5acf621c229fb991825f3d994c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 1 Sep 2022 16:35:43 +0200 Subject: Apply PRIO_SECONDARY to block data transfers --- src/block/manager.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'src/block') diff --git a/src/block/manager.rs b/src/block/manager.rs index 00438648..a9def3b0 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -198,7 +198,7 @@ impl BlockManager { let rpc = self.endpoint.call_streaming( &node_id, BlockRpc::GetBlock(*hash, order_tag), - PRIO_NORMAL, + PRIO_NORMAL | PRIO_SECONDARY, ); tokio::select! { res = rpc => { @@ -245,7 +245,7 @@ impl BlockManager { let rpc = self.endpoint.call_streaming( &node_id, BlockRpc::GetBlock(*hash, order_tag), - PRIO_NORMAL, + PRIO_NORMAL | PRIO_SECONDARY, ); tokio::select! { res = rpc => { @@ -336,7 +336,7 @@ impl BlockManager { &self.endpoint, &who[..], put_block_rpc, - RequestStrategy::with_priority(PRIO_NORMAL) + RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY) .with_quorum(self.replication.write_quorum()) .with_timeout(BLOCK_RW_TIMEOUT), ) -- cgit v1.2.3 From c2cc08852bcbd94bad5c15c39e7145c0496d7241 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 6 Sep 2022 19:31:42 +0200 Subject: Reenable node ordering --- src/block/manager.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) (limited to 'src/block') diff --git a/src/block/manager.rs b/src/block/manager.rs index a9def3b0..66a454b0 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -9,7 +9,7 @@ use async_trait::async_trait; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use futures::{Stream, TryStreamExt}; +use futures::Stream; use futures_util::stream::StreamExt; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; @@ -191,7 +191,7 @@ impl BlockManager { order_tag: Option, ) -> Result<(DataBlockHeader, ByteStream), Error> { let who = self.replication.read_nodes(hash); - //let who = self.system.rpc.request_order(&who); + let who = self.system.rpc.request_order(&who); for node in who.iter() { let node_id = NodeID::from(*node); @@ -238,7 +238,7 @@ impl BlockManager { order_tag: Option, ) -> Result { let who = self.replication.read_nodes(hash); - //let who = self.system.rpc.request_order(&who); + let who = self.system.rpc.request_order(&who); for node in who.iter() { let node_id = NodeID::from(*node); @@ -296,9 +296,7 @@ impl BlockManager { > { let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?; match header { - DataBlockHeader::Plain => Ok(Box::pin(stream.map_err(|_| { - std::io::Error::new(std::io::ErrorKind::Other, "netapp stream error") - }))), + DataBlockHeader::Plain => Ok(Box::pin(stream)), DataBlockHeader::Compressed => { // Too many things, I hate it. let reader = stream_asyncread(stream); -- cgit v1.2.3 From b823151a0bba7ee6c5f0f96c6b06355572528d94 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 12 Sep 2022 16:57:38 +0200 Subject: improvements in block manager --- src/block/manager.rs | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) (limited to 'src/block') diff --git a/src/block/manager.rs b/src/block/manager.rs index b9cd09e7..ec694fc8 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -11,7 +11,7 @@ use futures::Stream; use futures_util::stream::StreamExt; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; -use tokio::sync::{mpsc, Mutex}; +use tokio::sync::{mpsc, Mutex, MutexGuard}; use opentelemetry::{ trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer}, @@ -261,7 +261,7 @@ impl BlockManager { > { let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?; match header { - DataBlockHeader::Plain => Ok(Box::pin(stream)), + DataBlockHeader::Plain => Ok(stream), DataBlockHeader::Compressed => { // Too many things, I hate it. let reader = stream_asyncread(stream); @@ -389,11 +389,7 @@ impl BlockManager { let write_size = data.inner_buffer().len() as u64; - self.mutation_lock[hash.as_slice()[0] as usize] - .lock() - .with_context(Context::current_with_span( - tracer.start("Acquire mutation_lock"), - )) + self.lock_mutate(hash) .await .write_block(hash, data, self) .bound_record_duration(&self.metrics.block_write_duration) @@ -470,8 +466,7 @@ impl BlockManager { if data.verify(*hash).is_err() { self.metrics.corruption_counter.add(1); - self.mutation_lock[hash.as_slice()[0] as usize] - .lock() + self.lock_mutate(hash) .await .move_block_to_corrupted(hash, self) .await?; @@ -484,8 +479,7 @@ impl BlockManager { /// Check if this node has a block and whether it needs it pub(crate) async fn check_block_status(&self, hash: &Hash) -> Result { - self.mutation_lock[hash.as_slice()[0] as usize] - .lock() + self.lock_mutate(hash) .await .check_block_status(hash, self) .await @@ -499,8 +493,7 @@ impl BlockManager { /// Delete block if it is not needed anymore pub(crate) async fn delete_if_unneeded(&self, hash: &Hash) -> Result<(), Error> { - self.mutation_lock[hash.as_slice()[0] as usize] - .lock() + self.lock_mutate(hash) .await .delete_if_unneeded(hash, self) .await @@ -532,6 +525,16 @@ impl BlockManager { path.set_extension(""); fs::metadata(&path).await.map(|_| false).map_err(Into::into) } + + async fn lock_mutate(&self, hash: &Hash) -> MutexGuard<'_, BlockManagerLocked> { + let tracer = opentelemetry::global::tracer("garage"); + self.mutation_lock[hash.as_slice()[0] as usize] + .lock() + .with_context(Context::current_with_span( + tracer.start("Acquire mutation_lock"), + )) + .await + } } #[async_trait] -- cgit v1.2.3