aboutsummaryrefslogtreecommitdiff
path: root/src/block/manager.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r--src/block/manager.rs123
1 files changed, 45 insertions, 78 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 5283886c..4f3d0978 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -1,5 +1,4 @@
use std::path::PathBuf;
-use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
@@ -9,8 +8,6 @@ use bytes::Bytes;
use rand::prelude::*;
use serde::{Deserialize, Serialize};
-use futures::Stream;
-use futures_util::stream::StreamExt;
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::sync::{mpsc, Mutex, MutexGuard};
@@ -20,7 +17,7 @@ use opentelemetry::{
Context,
};
-use garage_net::stream::{stream_asyncread, ByteStream};
+use garage_net::stream::{read_stream_to_end, stream_asyncread, ByteStream};
use garage_db as db;
@@ -53,9 +50,6 @@ pub const INLINE_THRESHOLD: usize = 3072;
// to delete the block locally.
pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600);
-pub type BlockStream =
- Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static>>;
-
/// RPC messages used to share blocks of data between nodes
#[derive(Debug, Serialize, Deserialize)]
pub enum BlockRpc {
@@ -235,11 +229,9 @@ impl BlockManager {
&self,
hash: &Hash,
order_tag: Option<OrderTag>,
- ) -> Result<(DataBlockHeader, ByteStream), Error> {
- self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move {
- Ok((header, stream))
- })
- .await
+ ) -> Result<DataBlockStream, Error> {
+ self.rpc_get_raw_block_internal(hash, order_tag, |stream| async move { Ok(stream) })
+ .await
}
/// Ask nodes that might have a (possibly compressed) block for it
@@ -249,10 +241,12 @@ impl BlockManager {
hash: &Hash,
order_tag: Option<OrderTag>,
) -> Result<DataBlock, Error> {
- self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move {
+ self.rpc_get_raw_block_internal(hash, order_tag, |block_stream| async move {
+ let (header, stream) = block_stream.into_parts();
read_stream_to_end(stream)
.await
- .map(|data| DataBlock::from_parts(header, data))
+ .err_context("error in block data stream")
+ .map(|data| DataBlock::from_parts(header, data.into_bytes()))
})
.await
}
@@ -264,7 +258,7 @@ impl BlockManager {
f: F,
) -> Result<T, Error>
where
- F: Fn(DataBlockHeader, ByteStream) -> Fut,
+ F: Fn(DataBlockStream) -> Fut,
Fut: futures::Future<Output = Result<T, Error>>,
{
let who = self
@@ -288,8 +282,8 @@ impl BlockManager {
continue;
}
};
- let (header, stream) = match res.into_parts() {
- (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream),
+ let block_stream = match res.into_parts() {
+ (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => DataBlockStream::from_parts(header, stream),
(Ok(_), _) => {
debug!("Get block {:?}: node {:?} returned a malformed response", hash, node);
continue;
@@ -299,7 +293,7 @@ impl BlockManager {
continue;
}
};
- match f(header, stream).await {
+ match f(block_stream).await {
Ok(ret) => return Ok(ret),
Err(e) => {
debug!("Get block {:?}: error reading stream from node {:?}: {}", hash, node, e);
@@ -323,14 +317,14 @@ impl BlockManager {
// ---- Public interface ----
- /// Ask nodes that might have a block for it,
- /// return it as a stream
+ /// Ask nodes that might have a block for it, return it as a stream
pub async fn rpc_get_block_streaming(
&self,
hash: &Hash,
order_tag: Option<OrderTag>,
- ) -> Result<BlockStream, Error> {
- let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
+ ) -> Result<ByteStream, Error> {
+ let block_stream = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
+ let (header, stream) = block_stream.into_parts();
match header {
DataBlockHeader::Plain => Ok(stream),
DataBlockHeader::Compressed => {
@@ -343,15 +337,14 @@ impl BlockManager {
}
}
- /// Ask nodes that might have a block for it
+ /// Ask nodes that might have a block for it, return it as one big Bytes
pub async fn rpc_get_block(
&self,
hash: &Hash,
order_tag: Option<OrderTag>,
) -> Result<Bytes, Error> {
- self.rpc_get_raw_block(hash, order_tag)
- .await?
- .verify_get(*hash)
+ let stream = self.rpc_get_block_streaming(hash, order_tag).await?;
+ Ok(read_stream_to_end(stream).await?.into_bytes())
}
/// Send block to nodes that should have it
@@ -484,7 +477,7 @@ impl BlockManager {
stream: Option<ByteStream>,
) -> Result<(), Error> {
let stream = stream.ok_or_message("missing stream")?;
- let bytes = read_stream_to_end(stream).await?;
+ let bytes = read_stream_to_end(stream).await?.into_bytes();
let data = DataBlock::from_parts(header, bytes);
self.write_block(&hash, &data).await
}
@@ -555,10 +548,7 @@ impl BlockManager {
hash: &Hash,
block_path: &DataBlockPath,
) -> Result<DataBlock, Error> {
- let (path, compressed) = match block_path {
- DataBlockPath::Plain(p) => (p, false),
- DataBlockPath::Compressed(p) => (p, true),
- };
+ let (header, path) = block_path.as_parts_ref();
let mut f = fs::File::open(&path).await?;
let mut data = vec![];
@@ -566,11 +556,7 @@ impl BlockManager {
self.metrics.bytes_read.add(data.len() as u64);
drop(f);
- let data = if compressed {
- DataBlock::Compressed(data.into())
- } else {
- DataBlock::Plain(data.into())
- };
+ let data = DataBlock::from_parts(header, data.into());
if data.verify(*hash).is_err() {
self.metrics.corruption_counter.add(1);
@@ -623,20 +609,20 @@ impl BlockManager {
// first and then a compressed one (as compression may have been
// previously enabled).
if fs::metadata(&path).await.is_ok() {
- return Some(DataBlockPath::Plain(path));
+ return Some(DataBlockPath::plain(path));
}
path.set_extension("zst");
if fs::metadata(&path).await.is_ok() {
- return Some(DataBlockPath::Compressed(path));
+ return Some(DataBlockPath::compressed(path));
}
} else {
path.set_extension("zst");
if fs::metadata(&path).await.is_ok() {
- return Some(DataBlockPath::Compressed(path));
+ return Some(DataBlockPath::compressed(path));
}
path.set_extension("");
if fs::metadata(&path).await.is_ok() {
- return Some(DataBlockPath::Plain(path));
+ return Some(DataBlockPath::plain(path));
}
}
}
@@ -706,8 +692,8 @@ impl BlockManagerLocked {
mgr: &BlockManager,
existing_path: Option<DataBlockPath>,
) -> Result<(), Error> {
- let compressed = data.is_compressed();
- let data = data.inner_buffer();
+ let (header, data) = data.as_parts_ref();
+ let compressed = header.is_compressed();
let directory = mgr.data_layout.load().primary_block_dir(hash);
@@ -717,24 +703,25 @@ impl BlockManagerLocked {
tgt_path.set_extension("zst");
}
- let to_delete = match (existing_path, compressed) {
+ let existing_info = existing_path.map(|x| x.into_parts());
+ let to_delete = match (existing_info, compressed) {
// If the block is stored in the wrong directory,
// write it again at the correct path and delete the old path
- (Some(DataBlockPath::Plain(p)), false) if p != tgt_path => Some(p),
- (Some(DataBlockPath::Compressed(p)), true) if p != tgt_path => Some(p),
+ (Some((DataBlockHeader::Plain, p)), false) if p != tgt_path => Some(p),
+ (Some((DataBlockHeader::Compressed, p)), true) if p != tgt_path => Some(p),
// If the block is already stored not compressed but we have a compressed
// copy, write the compressed copy and delete the uncompressed one
- (Some(DataBlockPath::Plain(plain_path)), true) => Some(plain_path),
+ (Some((DataBlockHeader::Plain, plain_path)), true) => Some(plain_path),
// If the block is already stored compressed,
// keep the stored copy, we have nothing to do
- (Some(DataBlockPath::Compressed(_)), _) => return Ok(()),
+ (Some((DataBlockHeader::Compressed, _)), _) => return Ok(()),
// If the block is already stored not compressed,
// and we don't have a compressed copy either,
// keep the stored copy, we have nothing to do
- (Some(DataBlockPath::Plain(_)), false) => return Ok(()),
+ (Some((DataBlockHeader::Plain, _)), false) => return Ok(()),
// If the block isn't stored already, just store what is given to us
(None, _) => None,
@@ -786,18 +773,14 @@ impl BlockManagerLocked {
}
async fn move_block_to_corrupted(&self, block_path: &DataBlockPath) -> Result<(), Error> {
- let (path, path2) = match block_path {
- DataBlockPath::Plain(p) => {
- let mut p2 = p.clone();
- p2.set_extension("corrupted");
- (p, p2)
- }
- DataBlockPath::Compressed(p) => {
- let mut p2 = p.clone();
- p2.set_extension("zst.corrupted");
- (p, p2)
- }
- };
+ let (header, path) = block_path.as_parts_ref();
+
+ let mut path2 = path.clone();
+ if header.is_compressed() {
+ path2.set_extension("zst.corrupted");
+ } else {
+ path2.set_extension("corrupted");
+ }
fs::rename(path, path2).await?;
Ok(())
@@ -807,9 +790,7 @@ impl BlockManagerLocked {
let rc = mgr.rc.get_block_rc(hash)?;
if rc.is_deletable() {
while let Some(path) = mgr.find_block(hash).await {
- let path = match path {
- DataBlockPath::Plain(p) | DataBlockPath::Compressed(p) => p,
- };
+ let (_header, path) = path.as_parts_ref();
fs::remove_file(path).await?;
mgr.metrics.delete_counter.add(1);
}
@@ -826,24 +807,10 @@ impl BlockManagerLocked {
let data = mgr.read_block_from(hash, &wrong_path).await?;
self.write_block_inner(hash, &data, mgr, Some(wrong_path))
.await?;
- Ok(data.inner_buffer().len())
+ Ok(data.as_parts_ref().1.len())
}
}
-async fn read_stream_to_end(mut stream: ByteStream) -> Result<Bytes, Error> {
- let mut parts: Vec<Bytes> = vec![];
- while let Some(part) = stream.next().await {
- parts.push(part.ok_or_message("error in stream")?);
- }
-
- Ok(parts
- .iter()
- .map(|x| &x[..])
- .collect::<Vec<_>>()
- .concat()
- .into())
-}
-
struct DeleteOnDrop(Option<PathBuf>);
impl DeleteOnDrop {