aboutsummaryrefslogtreecommitdiff
path: root/src/block/manager.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r--src/block/manager.rs292
1 files changed, 226 insertions, 66 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index b5199b62..ec694fc8 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -1,4 +1,5 @@
use std::path::PathBuf;
+use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
@@ -6,21 +7,26 @@ use async_trait::async_trait;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
+use futures::Stream;
+use futures_util::stream::StreamExt;
use tokio::fs;
-use tokio::io::{AsyncReadExt, AsyncWriteExt};
-use tokio::sync::{mpsc, Mutex};
+use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
+use tokio::sync::{mpsc, Mutex, MutexGuard};
use opentelemetry::{
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
Context,
};
+use garage_rpc::rpc_helper::netapp::stream::{stream_asyncread, ByteStream};
+
use garage_db as db;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
+use garage_rpc::rpc_helper::OrderTag;
use garage_rpc::system::System;
use garage_rpc::*;
@@ -36,7 +42,7 @@ use crate::resync::*;
pub const INLINE_THRESHOLD: usize = 3072;
// Timeout for RPCs that read and write blocks to remote nodes
-pub(crate) const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(30);
+pub(crate) const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(60);
// The delay between the moment when the reference counter
// drops to zero, and the moment where we allow ourselves
@@ -48,12 +54,12 @@ pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600);
pub enum BlockRpc {
Ok,
/// Message to ask for a block of data, by hash
- GetBlock(Hash),
+ GetBlock(Hash, Option<OrderTag>),
/// Message to send a block of data, either because requested, of for first delivery of new
/// block
PutBlock {
hash: Hash,
- data: DataBlock,
+ header: DataBlockHeader,
},
/// Ask other node if they should have this block, but don't actually have it
NeedBlockQuery(Hash),
@@ -143,56 +149,162 @@ impl BlockManager {
}
/// Ask nodes that might have a (possibly compressed) block for it
- pub(crate) async fn rpc_get_raw_block(&self, hash: &Hash) -> Result<DataBlock, Error> {
+ /// Return it as a stream with a header
+ async fn rpc_get_raw_block_streaming(
+ &self,
+ hash: &Hash,
+ order_tag: Option<OrderTag>,
+ ) -> Result<(DataBlockHeader, ByteStream), Error> {
let who = self.replication.read_nodes(hash);
- let resps = self
- .system
- .rpc
- .try_call_many(
- &self.endpoint,
- &who[..],
- BlockRpc::GetBlock(*hash),
- RequestStrategy::with_priority(PRIO_NORMAL)
- .with_quorum(1)
- .with_timeout(BLOCK_RW_TIMEOUT)
- .interrupt_after_quorum(true),
- )
- .await?;
+ let who = self.system.rpc.request_order(&who);
+
+ for node in who.iter() {
+ let node_id = NodeID::from(*node);
+ let rpc = self.endpoint.call_streaming(
+ &node_id,
+ BlockRpc::GetBlock(*hash, order_tag),
+ PRIO_NORMAL | PRIO_SECONDARY,
+ );
+ tokio::select! {
+ res = rpc => {
+ let res = match res {
+ Ok(res) => res,
+ Err(e) => {
+ debug!("Node {:?} returned error: {}", node, e);
+ continue;
+ }
+ };
+ let (header, stream) = match res.into_parts() {
+ (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream),
+ _ => {
+ debug!("Node {:?} returned a malformed response", node);
+ continue;
+ }
+ };
+ return Ok((header, stream));
+ }
+ _ = tokio::time::sleep(BLOCK_RW_TIMEOUT) => {
+ debug!("Node {:?} didn't return block in time, trying next.", node);
+ }
+ };
+ }
- for resp in resps {
- if let BlockRpc::PutBlock { data, .. } = resp {
- return Ok(data);
- }
+ Err(Error::Message(format!(
+ "Unable to read block {:?}: no node returned a valid block",
+ hash
+ )))
+ }
+
+ /// Ask nodes that might have a (possibly compressed) block for it
+ /// Return its entire body
+ pub(crate) async fn rpc_get_raw_block(
+ &self,
+ hash: &Hash,
+ order_tag: Option<OrderTag>,
+ ) -> Result<DataBlock, Error> {
+ let who = self.replication.read_nodes(hash);
+ let who = self.system.rpc.request_order(&who);
+
+ for node in who.iter() {
+ let node_id = NodeID::from(*node);
+ let rpc = self.endpoint.call_streaming(
+ &node_id,
+ BlockRpc::GetBlock(*hash, order_tag),
+ PRIO_NORMAL | PRIO_SECONDARY,
+ );
+ tokio::select! {
+ res = rpc => {
+ let res = match res {
+ Ok(res) => res,
+ Err(e) => {
+ debug!("Node {:?} returned error: {}", node, e);
+ continue;
+ }
+ };
+ let (header, stream) = match res.into_parts() {
+ (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream),
+ _ => {
+ debug!("Node {:?} returned a malformed response", node);
+ continue;
+ }
+ };
+ match read_stream_to_end(stream).await {
+ Ok(bytes) => return Ok(DataBlock::from_parts(header, bytes)),
+ Err(e) => {
+ debug!("Error reading stream from node {:?}: {}", node, e);
+ }
+ }
+ }
+ _ = tokio::time::sleep(BLOCK_RW_TIMEOUT) => {
+ debug!("Node {:?} didn't return block in time, trying next.", node);
+ }
+ };
}
+
Err(Error::Message(format!(
- "Unable to read block {:?}: no valid blocks returned",
+ "Unable to read block {:?}: no node returned a valid block",
hash
)))
}
// ---- Public interface ----
+ /// Ask nodes that might have a block for it,
+ /// return it as a stream
+ pub async fn rpc_get_block_streaming(
+ &self,
+ hash: &Hash,
+ order_tag: Option<OrderTag>,
+ ) -> Result<
+ Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static>>,
+ Error,
+ > {
+ let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
+ match header {
+ DataBlockHeader::Plain => Ok(stream),
+ DataBlockHeader::Compressed => {
+ // Too many things, I hate it.
+ let reader = stream_asyncread(stream);
+ let reader = BufReader::new(reader);
+ let reader = async_compression::tokio::bufread::ZstdDecoder::new(reader);
+ Ok(Box::pin(tokio_util::io::ReaderStream::new(reader)))
+ }
+ }
+ }
+
/// Ask nodes that might have a block for it
- pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
- self.rpc_get_raw_block(hash).await?.verify_get(*hash)
+ pub async fn rpc_get_block(
+ &self,
+ hash: &Hash,
+ order_tag: Option<OrderTag>,
+ ) -> Result<Bytes, Error> {
+ self.rpc_get_raw_block(hash, order_tag)
+ .await?
+ .verify_get(*hash)
}
/// Send block to nodes that should have it
pub async fn rpc_put_block(&self, hash: Hash, data: Bytes) -> Result<(), Error> {
let who = self.replication.write_nodes(&hash);
- let data = DataBlock::from_buffer(data, self.compression_level).await;
+
+ let (header, bytes) = DataBlock::from_buffer(data, self.compression_level)
+ .await
+ .into_parts();
+ let put_block_rpc =
+ Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes);
+
self.system
.rpc
.try_call_many(
&self.endpoint,
&who[..],
- // TODO: remove to_vec() here
- BlockRpc::PutBlock { hash, data },
- RequestStrategy::with_priority(PRIO_NORMAL)
+ put_block_rpc,
+ RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
.with_quorum(self.replication.write_quorum())
.with_timeout(BLOCK_RW_TIMEOUT),
)
.await?;
+
Ok(())
}
@@ -259,21 +371,25 @@ impl BlockManager {
// ---- Reading and writing blocks locally ----
- /// Write a block to disk
- pub(crate) async fn write_block(
+ async fn handle_put_block(
&self,
- hash: &Hash,
- data: &DataBlock,
- ) -> Result<BlockRpc, Error> {
+ hash: Hash,
+ header: DataBlockHeader,
+ stream: Option<ByteStream>,
+ ) -> Result<(), Error> {
+ let stream = stream.ok_or_message("missing stream")?;
+ let bytes = read_stream_to_end(stream).await?;
+ let data = DataBlock::from_parts(header, bytes);
+ self.write_block(&hash, &data).await
+ }
+
+ /// Write a block to disk
+ pub(crate) async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result<(), Error> {
let tracer = opentelemetry::global::tracer("garage");
let write_size = data.inner_buffer().len() as u64;
- let res = self.mutation_lock[hash.as_slice()[0] as usize]
- .lock()
- .with_context(Context::current_with_span(
- tracer.start("Acquire mutation_lock"),
- ))
+ self.lock_mutate(hash)
.await
.write_block(hash, data, self)
.bound_record_duration(&self.metrics.block_write_duration)
@@ -284,11 +400,32 @@ impl BlockManager {
self.metrics.bytes_written.add(write_size);
- Ok(res)
+ Ok(())
+ }
+
+ async fn handle_get_block(&self, hash: &Hash, order_tag: Option<OrderTag>) -> Resp<BlockRpc> {
+ let block = match self.read_block(hash).await {
+ Ok(data) => data,
+ Err(e) => return Resp::new(Err(e)),
+ };
+
+ let (header, data) = block.into_parts();
+
+ let resp = Resp::new(Ok(BlockRpc::PutBlock {
+ hash: *hash,
+ header,
+ }))
+ .with_stream_from_buffer(data);
+
+ if let Some(order_tag) = order_tag {
+ resp.with_order_tag(order_tag)
+ } else {
+ resp
+ }
}
/// Read block from disk, verifying it's integrity
- pub(crate) async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> {
+ pub(crate) async fn read_block(&self, hash: &Hash) -> Result<DataBlock, Error> {
let data = self
.read_block_internal(hash)
.bound_record_duration(&self.metrics.block_read_duration)
@@ -298,7 +435,7 @@ impl BlockManager {
.bytes_read
.add(data.inner_buffer().len() as u64);
- Ok(BlockRpc::PutBlock { hash: *hash, data })
+ Ok(data)
}
async fn read_block_internal(&self, hash: &Hash) -> Result<DataBlock, Error> {
@@ -321,16 +458,15 @@ impl BlockManager {
drop(f);
let data = if compressed {
- DataBlock::Compressed(data)
+ DataBlock::Compressed(data.into())
} else {
- DataBlock::Plain(data)
+ DataBlock::Plain(data.into())
};
if data.verify(*hash).is_err() {
self.metrics.corruption_counter.add(1);
- self.mutation_lock[hash.as_slice()[0] as usize]
- .lock()
+ self.lock_mutate(hash)
.await
.move_block_to_corrupted(hash, self)
.await?;
@@ -343,8 +479,7 @@ impl BlockManager {
/// Check if this node has a block and whether it needs it
pub(crate) async fn check_block_status(&self, hash: &Hash) -> Result<BlockStatus, Error> {
- self.mutation_lock[hash.as_slice()[0] as usize]
- .lock()
+ self.lock_mutate(hash)
.await
.check_block_status(hash, self)
.await
@@ -358,8 +493,7 @@ impl BlockManager {
/// Delete block if it is not needed anymore
pub(crate) async fn delete_if_unneeded(&self, hash: &Hash) -> Result<(), Error> {
- self.mutation_lock[hash.as_slice()[0] as usize]
- .lock()
+ self.lock_mutate(hash)
.await
.delete_if_unneeded(hash, self)
.await
@@ -391,20 +525,32 @@ impl BlockManager {
path.set_extension("");
fs::metadata(&path).await.map(|_| false).map_err(Into::into)
}
+
+ async fn lock_mutate(&self, hash: &Hash) -> MutexGuard<'_, BlockManagerLocked> {
+ let tracer = opentelemetry::global::tracer("garage");
+ self.mutation_lock[hash.as_slice()[0] as usize]
+ .lock()
+ .with_context(Context::current_with_span(
+ tracer.start("Acquire mutation_lock"),
+ ))
+ .await
+ }
}
#[async_trait]
-impl EndpointHandler<BlockRpc> for BlockManager {
- async fn handle(
- self: &Arc<Self>,
- message: &BlockRpc,
- _from: NodeID,
- ) -> Result<BlockRpc, Error> {
- match message {
- BlockRpc::PutBlock { hash, data } => self.write_block(hash, data).await,
- BlockRpc::GetBlock(h) => self.read_block(h).await,
- BlockRpc::NeedBlockQuery(h) => self.need_block(h).await.map(BlockRpc::NeedBlockReply),
- m => Err(Error::unexpected_rpc_message(m)),
+impl StreamingEndpointHandler<BlockRpc> for BlockManager {
+ async fn handle(self: &Arc<Self>, mut message: Req<BlockRpc>, _from: NodeID) -> Resp<BlockRpc> {
+ match message.msg() {
+ BlockRpc::PutBlock { hash, header } => Resp::new(
+ self.handle_put_block(*hash, *header, message.take_stream())
+ .await
+ .map(|_| BlockRpc::Ok),
+ ),
+ BlockRpc::GetBlock(h, order_tag) => self.handle_get_block(h, *order_tag).await,
+ BlockRpc::NeedBlockQuery(h) => {
+ Resp::new(self.need_block(h).await.map(BlockRpc::NeedBlockReply))
+ }
+ m => Resp::new(Err(Error::unexpected_rpc_message(m))),
}
}
}
@@ -431,7 +577,7 @@ impl BlockManagerLocked {
hash: &Hash,
data: &DataBlock,
mgr: &BlockManager,
- ) -> Result<BlockRpc, Error> {
+ ) -> Result<(), Error> {
let compressed = data.is_compressed();
let data = data.inner_buffer();
@@ -442,8 +588,8 @@ impl BlockManagerLocked {
fs::create_dir_all(&directory).await?;
let to_delete = match (mgr.is_block_compressed(hash).await, compressed) {
- (Ok(true), _) => return Ok(BlockRpc::Ok),
- (Ok(false), false) => return Ok(BlockRpc::Ok),
+ (Ok(true), _) => return Ok(()),
+ (Ok(false), false) => return Ok(()),
(Ok(false), true) => {
let path_to_delete = path.clone();
path.set_extension("zst");
@@ -482,7 +628,7 @@ impl BlockManagerLocked {
dir.sync_all().await?;
drop(dir);
- Ok(BlockRpc::Ok)
+ Ok(())
}
async fn move_block_to_corrupted(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> {
@@ -516,3 +662,17 @@ impl BlockManagerLocked {
Ok(())
}
}
+
+async fn read_stream_to_end(mut stream: ByteStream) -> Result<Bytes, Error> {
+ let mut parts: Vec<Bytes> = vec![];
+ while let Some(part) = stream.next().await {
+ parts.push(part.ok_or_message("error in stream")?);
+ }
+
+ Ok(parts
+ .iter()
+ .map(|x| &x[..])
+ .collect::<Vec<_>>()
+ .concat()
+ .into())
+}