aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api/s3/copy.rs29
-rw-r--r--src/api/s3/get.rs43
-rw-r--r--src/api/s3/put.rs43
-rw-r--r--src/block/Cargo.toml4
-rw-r--r--src/block/block.rs37
-rw-r--r--src/block/manager.rs267
-rw-r--r--src/block/resync.rs35
-rw-r--r--src/garage/Cargo.toml5
-rw-r--r--src/garage/admin.rs4
-rw-r--r--src/garage/cli/cmd.rs6
-rw-r--r--src/garage/cli/layout.rs6
-rw-r--r--src/model/Cargo.toml5
-rw-r--r--src/rpc/Cargo.toml5
-rw-r--r--src/rpc/rpc_helper.rs189
-rw-r--r--src/rpc/system.rs13
-rw-r--r--src/table/gc.rs3
-rw-r--r--src/table/schema.rs2
-rw-r--r--src/table/sync.rs3
-rw-r--r--src/table/table.rs2
-rw-r--r--src/util/Cargo.toml5
20 files changed, 452 insertions, 254 deletions
diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs
index 4415a037..a1a8c9a4 100644
--- a/src/api/s3/copy.rs
+++ b/src/api/s3/copy.rs
@@ -5,9 +5,12 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
use futures::{stream, stream::Stream, StreamExt, TryFutureExt};
use md5::{Digest as Md5Digest, Md5};
+use bytes::Bytes;
use hyper::{Body, Request, Response};
use serde::Serialize;
+use garage_rpc::netapp::bytes_buf::BytesBuf;
+use garage_rpc::rpc_helper::OrderTag;
use garage_table::*;
use garage_util::data::*;
use garage_util::time::*;
@@ -305,13 +308,18 @@ pub async fn handle_upload_part_copy(
// if and only if the block returned is a block that already existed
// in the Garage data store (thus we don't need to save it again).
let garage2 = garage.clone();
+ let order_stream = OrderTag::stream();
let source_blocks = stream::iter(blocks_to_copy)
- .flat_map(|(block_hash, range_to_copy)| {
+ .enumerate()
+ .flat_map(|(i, (block_hash, range_to_copy))| {
let garage3 = garage2.clone();
stream::once(async move {
- let data = garage3.block_manager.rpc_get_block(&block_hash).await?;
+ let data = garage3
+ .block_manager
+ .rpc_get_block(&block_hash, Some(order_stream.order(i as u64)))
+ .await?;
match range_to_copy {
- Some(r) => Ok((data[r].to_vec(), None)),
+ Some(r) => Ok((data.slice(r), None)),
None => Ok((data, Some(block_hash))),
}
})
@@ -365,10 +373,7 @@ pub async fn handle_upload_part_copy(
// we need to insert that data as a new block.
async move {
if must_upload {
- garage2
- .block_manager
- .rpc_put_block(final_hash, data.into())
- .await
+ garage2.block_manager.rpc_put_block(final_hash, data).await
} else {
Ok(())
}
@@ -556,13 +561,13 @@ impl CopyPreconditionHeaders {
}
}
-type BlockStreamItemOk = (Vec<u8>, Option<Hash>);
+type BlockStreamItemOk = (Bytes, Option<Hash>);
type BlockStreamItem = Result<BlockStreamItemOk, garage_util::error::Error>;
struct Defragmenter<S: Stream<Item = BlockStreamItem>> {
block_size: usize,
block_stream: Pin<Box<stream::Peekable<S>>>,
- buffer: Vec<u8>,
+ buffer: BytesBuf,
hash: Option<Hash>,
}
@@ -571,7 +576,7 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> {
Self {
block_size,
block_stream,
- buffer: vec![],
+ buffer: BytesBuf::new(),
hash: None,
}
}
@@ -589,7 +594,7 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> {
if self.buffer.is_empty() {
let (next_block, next_block_hash) = self.block_stream.next().await.unwrap()?;
- self.buffer = next_block;
+ self.buffer.extend(next_block);
self.hash = next_block_hash;
} else if self.buffer.len() + peeked_next_block.len() > self.block_size {
break;
@@ -600,7 +605,7 @@ impl<S: Stream<Item = BlockStreamItem>> Defragmenter<S> {
}
}
- Ok((std::mem::take(&mut self.buffer), self.hash.take()))
+ Ok((self.buffer.take_all(), self.hash.take()))
}
}
diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs
index 7fa1a177..dd95f6e7 100644
--- a/src/api/s3/get.rs
+++ b/src/api/s3/get.rs
@@ -10,6 +10,7 @@ use http::header::{
use hyper::body::Bytes;
use hyper::{Body, Request, Response, StatusCode};
+use garage_rpc::rpc_helper::OrderTag;
use garage_table::EmptyKey;
use garage_util::data::*;
@@ -242,10 +243,15 @@ pub async fn handle_get(
Ok(resp_builder.body(body)?)
}
ObjectVersionData::FirstBlock(_, first_block_hash) => {
- let read_first_block = garage.block_manager.rpc_get_block(first_block_hash);
+ let order_stream = OrderTag::stream();
+
+ let read_first_block = garage
+ .block_manager
+ .rpc_get_block_streaming(first_block_hash, Some(order_stream.order(0)));
let get_next_blocks = garage.version_table.get(&last_v.uuid, &EmptyKey);
- let (first_block, version) = futures::try_join!(read_first_block, get_next_blocks)?;
+ let (first_block_stream, version) =
+ futures::try_join!(read_first_block, get_next_blocks)?;
let version = version.ok_or(Error::NoSuchKey)?;
let mut blocks = version
@@ -254,24 +260,33 @@ pub async fn handle_get(
.iter()
.map(|(_, vb)| (vb.hash, None))
.collect::<Vec<_>>();
- blocks[0].1 = Some(first_block);
+ blocks[0].1 = Some(first_block_stream);
let body_stream = futures::stream::iter(blocks)
- .map(move |(hash, data_opt)| {
+ .enumerate()
+ .map(move |(i, (hash, stream_opt))| {
let garage = garage.clone();
async move {
- if let Some(data) = data_opt {
- Ok(Bytes::from(data))
+ if let Some(stream) = stream_opt {
+ stream
} else {
garage
.block_manager
- .rpc_get_block(&hash)
+ .rpc_get_block_streaming(&hash, Some(order_stream.order(i as u64)))
.await
- .map(Bytes::from)
+ .unwrap_or_else(|e| {
+ Box::pin(futures::stream::once(async move {
+ Err(std::io::Error::new(
+ std::io::ErrorKind::Other,
+ format!("Could not get block {}: {}", i, e),
+ ))
+ }))
+ })
}
}
})
- .buffered(2);
+ .buffered(2)
+ .flatten();
let body = hyper::body::Body::wrap_stream(body_stream);
Ok(resp_builder.body(body)?)
@@ -434,12 +449,16 @@ fn body_from_blocks_range(
true_offset += b.size;
}
+ let order_stream = OrderTag::stream();
let body_stream = futures::stream::iter(blocks)
- .map(move |(block, true_offset)| {
+ .enumerate()
+ .map(move |(i, (block, true_offset))| {
let garage = garage.clone();
async move {
- let data = garage.block_manager.rpc_get_block(&block.hash).await?;
- let data = Bytes::from(data);
+ let data = garage
+ .block_manager
+ .rpc_get_block(&block.hash, Some(order_stream.order(i as u64)))
+ .await?;
let start_in_block = if true_offset > begin {
0
} else {
diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs
index dc0530df..97b8e4e3 100644
--- a/src/api/s3/put.rs
+++ b/src/api/s3/put.rs
@@ -1,4 +1,4 @@
-use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
+use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::Arc;
use futures::prelude::*;
@@ -13,6 +13,7 @@ use opentelemetry::{
Context,
};
+use garage_rpc::netapp::bytes_buf::BytesBuf;
use garage_table::*;
use garage_util::async_hash::*;
use garage_util::data::*;
@@ -108,7 +109,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
size,
etag: data_md5sum_hex.clone(),
},
- first_block,
+ first_block.to_vec(),
)),
};
@@ -136,7 +137,6 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
garage.version_table.insert(&version).await?;
// Transfer data and verify checksum
- let first_block = Bytes::from(first_block);
let first_block_hash = async_blake2sum(first_block.clone()).await;
let tx_result = (|| async {
@@ -318,7 +318,6 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
chunker.next(),
)?;
if let Some(block) = next_block {
- let block = Bytes::from(block);
let (_, _, block_hash) = futures::future::join3(
md5hasher.update(block.clone()),
sha256hasher.update(block.clone()),
@@ -387,8 +386,7 @@ struct StreamChunker<S: Stream<Item = Result<Bytes, Error>>> {
stream: S,
read_all: bool,
block_size: usize,
- buf: VecDeque<Bytes>,
- buf_len: usize,
+ buf: BytesBuf,
}
impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
@@ -397,45 +395,25 @@ impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
stream,
read_all: false,
block_size,
- buf: VecDeque::with_capacity(8),
- buf_len: 0,
+ buf: BytesBuf::new(),
}
}
- async fn next(&mut self) -> Result<Option<Vec<u8>>, Error> {
- while !self.read_all && self.buf_len < self.block_size {
+ async fn next(&mut self) -> Result<Option<Bytes>, Error> {
+ while !self.read_all && self.buf.len() < self.block_size {
if let Some(block) = self.stream.next().await {
let bytes = block?;
trace!("Body next: {} bytes", bytes.len());
- self.buf_len += bytes.len();
- self.buf.push_back(bytes);
+ self.buf.extend(bytes);
} else {
self.read_all = true;
}
}
- if self.buf_len == 0 {
+ if self.buf.is_empty() {
Ok(None)
} else {
- let mut slices = Vec::with_capacity(self.buf.len());
- let mut taken = 0;
- while self.buf_len > 0 && taken < self.block_size {
- let front = self.buf.pop_front().unwrap();
- if taken + front.len() <= self.block_size {
- taken += front.len();
- self.buf_len -= front.len();
- slices.push(front);
- } else {
- let front_take = self.block_size - taken;
- slices.push(front.slice(..front_take));
- self.buf.push_front(front.slice(front_take..));
- self.buf_len -= front_take;
- break;
- }
- }
- Ok(Some(
- slices.iter().map(|x| &x[..]).collect::<Vec<_>>().concat(),
- ))
+ Ok(Some(self.buf.take_max(self.block_size)))
}
}
}
@@ -545,7 +523,6 @@ pub async fn handle_put_part(
// Copy block to store
let version = Version::new(version_uuid, bucket_id, key, false);
- let first_block = Bytes::from(first_block);
let first_block_hash = async_blake2sum(first_block.clone()).await;
let (_, data_md5sum, data_sha256sum) = read_and_put_blocks(
diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml
index 8cf5a01c..cd409001 100644
--- a/src/block/Cargo.toml
+++ b/src/block/Cargo.toml
@@ -27,6 +27,8 @@ bytes = "1.0"
hex = "0.4"
tracing = "0.1.30"
rand = "0.8"
+
+async-compression = { version = "0.3", features = ["tokio", "zstd"] }
zstd = { version = "0.9", default-features = false }
rmp-serde = "0.15"
@@ -36,7 +38,7 @@ serde_bytes = "0.11"
futures = "0.3"
futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
-
+tokio-util = { version = "0.6", features = ["io"] }
[features]
system-libs = [ "zstd/pkg-config" ]
diff --git a/src/block/block.rs b/src/block/block.rs
index f17bd2c0..935aa900 100644
--- a/src/block/block.rs
+++ b/src/block/block.rs
@@ -5,13 +5,18 @@ use zstd::stream::{decode_all as zstd_decode, Encoder};
use garage_util::data::*;
use garage_util::error::*;
+#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
+pub enum DataBlockHeader {
+ Plain,
+ Compressed,
+}
+
/// A possibly compressed block of data
-#[derive(Debug, Serialize, Deserialize)]
pub enum DataBlock {
/// Uncompressed data
- Plain(#[serde(with = "serde_bytes")] Vec<u8>),
+ Plain(Bytes),
/// Data compressed with zstd
- Compressed(#[serde(with = "serde_bytes")] Vec<u8>),
+ Compressed(Bytes),
}
impl DataBlock {
@@ -31,7 +36,7 @@ impl DataBlock {
/// Get the buffer, possibly decompressing it, and verify it's integrity.
/// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system
/// is used instead.
- pub fn verify_get(self, hash: Hash) -> Result<Vec<u8>, Error> {
+ pub fn verify_get(self, hash: Hash) -> Result<Bytes, Error> {
match self {
DataBlock::Plain(data) => {
if blake2sum(&data) == hash {
@@ -40,9 +45,9 @@ impl DataBlock {
Err(Error::CorruptData(hash))
}
}
- DataBlock::Compressed(data) => {
- zstd_decode(&data[..]).map_err(|_| Error::CorruptData(hash))
- }
+ DataBlock::Compressed(data) => zstd_decode(&data[..])
+ .map_err(|_| Error::CorruptData(hash))
+ .map(Bytes::from),
}
}
@@ -66,14 +71,28 @@ impl DataBlock {
tokio::task::spawn_blocking(move || {
if let Some(level) = level {
if let Ok(data) = zstd_encode(&data[..], level) {
- return DataBlock::Compressed(data);
+ return DataBlock::Compressed(data.into());
}
}
- DataBlock::Plain(data.to_vec()) // TODO: remove to_vec here
+ DataBlock::Plain(data)
})
.await
.unwrap()
}
+
+ pub fn into_parts(self) -> (DataBlockHeader, Bytes) {
+ match self {
+ DataBlock::Plain(data) => (DataBlockHeader::Plain, data),
+ DataBlock::Compressed(data) => (DataBlockHeader::Compressed, data),
+ }
+ }
+
+ pub fn from_parts(h: DataBlockHeader, bytes: Bytes) -> Self {
+ match h {
+ DataBlockHeader::Plain => DataBlock::Plain(bytes),
+ DataBlockHeader::Compressed => DataBlock::Compressed(bytes),
+ }
+ }
}
fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {
diff --git a/src/block/manager.rs b/src/block/manager.rs
index b5199b62..b9cd09e7 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -1,4 +1,5 @@
use std::path::PathBuf;
+use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
@@ -6,8 +7,10 @@ use async_trait::async_trait;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
+use futures::Stream;
+use futures_util::stream::StreamExt;
use tokio::fs;
-use tokio::io::{AsyncReadExt, AsyncWriteExt};
+use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::sync::{mpsc, Mutex};
use opentelemetry::{
@@ -15,12 +18,15 @@ use opentelemetry::{
Context,
};
+use garage_rpc::rpc_helper::netapp::stream::{stream_asyncread, ByteStream};
+
use garage_db as db;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
+use garage_rpc::rpc_helper::OrderTag;
use garage_rpc::system::System;
use garage_rpc::*;
@@ -36,7 +42,7 @@ use crate::resync::*;
pub const INLINE_THRESHOLD: usize = 3072;
// Timeout for RPCs that read and write blocks to remote nodes
-pub(crate) const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(30);
+pub(crate) const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(60);
// The delay between the moment when the reference counter
// drops to zero, and the moment where we allow ourselves
@@ -48,12 +54,12 @@ pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600);
pub enum BlockRpc {
Ok,
/// Message to ask for a block of data, by hash
- GetBlock(Hash),
+ GetBlock(Hash, Option<OrderTag>),
/// Message to send a block of data, either because requested, of for first delivery of new
/// block
PutBlock {
hash: Hash,
- data: DataBlock,
+ header: DataBlockHeader,
},
/// Ask other node if they should have this block, but don't actually have it
NeedBlockQuery(Hash),
@@ -143,56 +149,162 @@ impl BlockManager {
}
/// Ask nodes that might have a (possibly compressed) block for it
- pub(crate) async fn rpc_get_raw_block(&self, hash: &Hash) -> Result<DataBlock, Error> {
+ /// Return it as a stream with a header
+ async fn rpc_get_raw_block_streaming(
+ &self,
+ hash: &Hash,
+ order_tag: Option<OrderTag>,
+ ) -> Result<(DataBlockHeader, ByteStream), Error> {
let who = self.replication.read_nodes(hash);
- let resps = self
- .system
- .rpc
- .try_call_many(
- &self.endpoint,
- &who[..],
- BlockRpc::GetBlock(*hash),
- RequestStrategy::with_priority(PRIO_NORMAL)
- .with_quorum(1)
- .with_timeout(BLOCK_RW_TIMEOUT)
- .interrupt_after_quorum(true),
- )
- .await?;
+ let who = self.system.rpc.request_order(&who);
+
+ for node in who.iter() {
+ let node_id = NodeID::from(*node);
+ let rpc = self.endpoint.call_streaming(
+ &node_id,
+ BlockRpc::GetBlock(*hash, order_tag),
+ PRIO_NORMAL | PRIO_SECONDARY,
+ );
+ tokio::select! {
+ res = rpc => {
+ let res = match res {
+ Ok(res) => res,
+ Err(e) => {
+ debug!("Node {:?} returned error: {}", node, e);
+ continue;
+ }
+ };
+ let (header, stream) = match res.into_parts() {
+ (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream),
+ _ => {
+ debug!("Node {:?} returned a malformed response", node);
+ continue;
+ }
+ };
+ return Ok((header, stream));
+ }
+ _ = tokio::time::sleep(BLOCK_RW_TIMEOUT) => {
+ debug!("Node {:?} didn't return block in time, trying next.", node);
+ }
+ };
+ }
- for resp in resps {
- if let BlockRpc::PutBlock { data, .. } = resp {
- return Ok(data);
- }
+ Err(Error::Message(format!(
+ "Unable to read block {:?}: no node returned a valid block",
+ hash
+ )))
+ }
+
+ /// Ask nodes that might have a (possibly compressed) block for it
+ /// Return its entire body
+ pub(crate) async fn rpc_get_raw_block(
+ &self,
+ hash: &Hash,
+ order_tag: Option<OrderTag>,
+ ) -> Result<DataBlock, Error> {
+ let who = self.replication.read_nodes(hash);
+ let who = self.system.rpc.request_order(&who);
+
+ for node in who.iter() {
+ let node_id = NodeID::from(*node);
+ let rpc = self.endpoint.call_streaming(
+ &node_id,
+ BlockRpc::GetBlock(*hash, order_tag),
+ PRIO_NORMAL | PRIO_SECONDARY,
+ );
+ tokio::select! {
+ res = rpc => {
+ let res = match res {
+ Ok(res) => res,
+ Err(e) => {
+ debug!("Node {:?} returned error: {}", node, e);
+ continue;
+ }
+ };
+ let (header, stream) = match res.into_parts() {
+ (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream),
+ _ => {
+ debug!("Node {:?} returned a malformed response", node);
+ continue;
+ }
+ };
+ match read_stream_to_end(stream).await {
+ Ok(bytes) => return Ok(DataBlock::from_parts(header, bytes)),
+ Err(e) => {
+ debug!("Error reading stream from node {:?}: {}", node, e);
+ }
+ }
+ }
+ _ = tokio::time::sleep(BLOCK_RW_TIMEOUT) => {
+ debug!("Node {:?} didn't return block in time, trying next.", node);
+ }
+ };
}
+
Err(Error::Message(format!(
- "Unable to read block {:?}: no valid blocks returned",
+ "Unable to read block {:?}: no node returned a valid block",
hash
)))
}
// ---- Public interface ----
+ /// Ask nodes that might have a block for it,
+ /// return it as a stream
+ pub async fn rpc_get_block_streaming(
+ &self,
+ hash: &Hash,
+ order_tag: Option<OrderTag>,
+ ) -> Result<
+ Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static>>,
+ Error,
+ > {
+ let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
+ match header {
+ DataBlockHeader::Plain => Ok(Box::pin(stream)),
+ DataBlockHeader::Compressed => {
+ // Too many things, I hate it.
+ let reader = stream_asyncread(stream);
+ let reader = BufReader::new(reader);
+ let reader = async_compression::tokio::bufread::ZstdDecoder::new(reader);
+ Ok(Box::pin(tokio_util::io::ReaderStream::new(reader)))
+ }
+ }
+ }
+
/// Ask nodes that might have a block for it
- pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
- self.rpc_get_raw_block(hash).await?.verify_get(*hash)
+ pub async fn rpc_get_block(
+ &self,
+ hash: &Hash,
+ order_tag: Option<OrderTag>,
+ ) -> Result<Bytes, Error> {
+ self.rpc_get_raw_block(hash, order_tag)
+ .await?
+ .verify_get(*hash)
}
/// Send block to nodes that should have it
pub async fn rpc_put_block(&self, hash: Hash, data: Bytes) -> Result<(), Error> {
let who = self.replication.write_nodes(&hash);
- let data = DataBlock::from_buffer(data, self.compression_level).await;
+
+ let (header, bytes) = DataBlock::from_buffer(data, self.compression_level)
+ .await
+ .into_parts();
+ let put_block_rpc =
+ Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes);
+
self.system
.rpc
.try_call_many(
&self.endpoint,
&who[..],
- // TODO: remove to_vec() here
- BlockRpc::PutBlock { hash, data },
- RequestStrategy::with_priority(PRIO_NORMAL)
+ put_block_rpc,
+ RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
.with_quorum(self.replication.write_quorum())
.with_timeout(BLOCK_RW_TIMEOUT),
)
.await?;
+
Ok(())
}
@@ -259,17 +371,25 @@ impl BlockManager {
// ---- Reading and writing blocks locally ----
- /// Write a block to disk
- pub(crate) async fn write_block(
+ async fn handle_put_block(
&self,
- hash: &Hash,
- data: &DataBlock,
- ) -> Result<BlockRpc, Error> {
+ hash: Hash,
+ header: DataBlockHeader,
+ stream: Option<ByteStream>,
+ ) -> Result<(), Error> {
+ let stream = stream.ok_or_message("missing stream")?;
+ let bytes = read_stream_to_end(stream).await?;
+ let data = DataBlock::from_parts(header, bytes);
+ self.write_block(&hash, &data).await
+ }
+
+ /// Write a block to disk
+ pub(crate) async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result<(), Error> {
let tracer = opentelemetry::global::tracer("garage");
let write_size = data.inner_buffer().len() as u64;
- let res = self.mutation_lock[hash.as_slice()[0] as usize]
+ self.mutation_lock[hash.as_slice()[0] as usize]
.lock()
.with_context(Context::current_with_span(
tracer.start("Acquire mutation_lock"),
@@ -284,11 +404,32 @@ impl BlockManager {
self.metrics.bytes_written.add(write_size);
- Ok(res)
+ Ok(())
+ }
+
+ async fn handle_get_block(&self, hash: &Hash, order_tag: Option<OrderTag>) -> Resp<BlockRpc> {
+ let block = match self.read_block(hash).await {
+ Ok(data) => data,
+ Err(e) => return Resp::new(Err(e)),
+ };
+
+ let (header, data) = block.into_parts();
+
+ let resp = Resp::new(Ok(BlockRpc::PutBlock {
+ hash: *hash,
+ header,
+ }))
+ .with_stream_from_buffer(data);
+
+ if let Some(order_tag) = order_tag {
+ resp.with_order_tag(order_tag)
+ } else {
+ resp
+ }
}
/// Read block from disk, verifying it's integrity
- pub(crate) async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> {
+ pub(crate) async fn read_block(&self, hash: &Hash) -> Result<DataBlock, Error> {
let data = self
.read_block_internal(hash)
.bound_record_duration(&self.metrics.block_read_duration)
@@ -298,7 +439,7 @@ impl BlockManager {
.bytes_read
.add(data.inner_buffer().len() as u64);
- Ok(BlockRpc::PutBlock { hash: *hash, data })
+ Ok(data)
}
async fn read_block_internal(&self, hash: &Hash) -> Result<DataBlock, Error> {
@@ -321,9 +462,9 @@ impl BlockManager {
drop(f);
let data = if compressed {
- DataBlock::Compressed(data)
+ DataBlock::Compressed(data.into())
} else {
- DataBlock::Plain(data)
+ DataBlock::Plain(data.into())
};
if data.verify(*hash).is_err() {
@@ -394,17 +535,19 @@ impl BlockManager {
}
#[async_trait]
-impl EndpointHandler<BlockRpc> for BlockManager {
- async fn handle(
- self: &Arc<Self>,
- message: &BlockRpc,
- _from: NodeID,
- ) -> Result<BlockRpc, Error> {
- match message {
- BlockRpc::PutBlock { hash, data } => self.write_block(hash, data).await,
- BlockRpc::GetBlock(h) => self.read_block(h).await,
- BlockRpc::NeedBlockQuery(h) => self.need_block(h).await.map(BlockRpc::NeedBlockReply),
- m => Err(Error::unexpected_rpc_message(m)),
+impl StreamingEndpointHandler<BlockRpc> for BlockManager {
+ async fn handle(self: &Arc<Self>, mut message: Req<BlockRpc>, _from: NodeID) -> Resp<BlockRpc> {
+ match message.msg() {
+ BlockRpc::PutBlock { hash, header } => Resp::new(
+ self.handle_put_block(*hash, *header, message.take_stream())
+ .await
+ .map(|_| BlockRpc::Ok),
+ ),
+ BlockRpc::GetBlock(h, order_tag) => self.handle_get_block(h, *order_tag).await,
+ BlockRpc::NeedBlockQuery(h) => {
+ Resp::new(self.need_block(h).await.map(BlockRpc::NeedBlockReply))
+ }
+ m => Resp::new(Err(Error::unexpected_rpc_message(m))),
}
}
}
@@ -431,7 +574,7 @@ impl BlockManagerLocked {
hash: &Hash,
data: &DataBlock,
mgr: &BlockManager,
- ) -> Result<BlockRpc, Error> {
+ ) -> Result<(), Error> {
let compressed = data.is_compressed();
let data = data.inner_buffer();
@@ -442,8 +585,8 @@ impl BlockManagerLocked {
fs::create_dir_all(&directory).await?;
let to_delete = match (mgr.is_block_compressed(hash).await, compressed) {
- (Ok(true), _) => return Ok(BlockRpc::Ok),
- (Ok(false), false) => return Ok(BlockRpc::Ok),
+ (Ok(true), _) => return Ok(()),
+ (Ok(false), false) => return Ok(()),
(Ok(false), true) => {
let path_to_delete = path.clone();
path.set_extension("zst");
@@ -482,7 +625,7 @@ impl BlockManagerLocked {
dir.sync_all().await?;
drop(dir);
- Ok(BlockRpc::Ok)
+ Ok(())
}
async fn move_block_to_corrupted(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> {
@@ -516,3 +659,17 @@ impl BlockManagerLocked {
Ok(())
}
}
+
+async fn read_stream_to_end(mut stream: ByteStream) -> Result<Bytes, Error> {
+ let mut parts: Vec<Bytes> = vec![];
+ while let Some(part) = stream.next().await {
+ parts.push(part.ok_or_message("error in stream")?);
+ }
+
+ Ok(parts
+ .iter()
+ .map(|x| &x[..])
+ .collect::<Vec<_>>()
+ .concat()
+ .into())
+}
diff --git a/src/block/resync.rs b/src/block/resync.rs
index 39e4d50f..bde3e98c 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -7,7 +7,6 @@ use arc_swap::ArcSwap;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
-use futures::future::*;
use tokio::select;
use tokio::sync::{watch, Notify};
@@ -36,7 +35,11 @@ use crate::manager::*;
// Timeout for RPCs that ask other nodes whether they need a copy
// of a given block before we delete it locally
-pub(crate) const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
+// The timeout here is relatively low because we don't want to block
+// the entire resync loop when some nodes are not responding.
+// Nothing will be deleted if the nodes don't answer the queries,
+// we will just retry later.
+const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(15);
// The delay between the time where a resync operation fails
// and the time when it is retried, with exponential backoff
@@ -336,24 +339,24 @@ impl BlockResyncManager {
}
who.retain(|id| *id != manager.system.id);
- let msg = Arc::new(BlockRpc::NeedBlockQuery(*hash));
- let who_needs_fut = who.iter().map(|to| {
- manager.system.rpc.call_arc(
+ let who_needs_resps = manager
+ .system
+ .rpc
+ .call_many(
&manager.endpoint,
- *to,
- msg.clone(),
+ &who,
+ BlockRpc::NeedBlockQuery(*hash),
RequestStrategy::with_priority(PRIO_BACKGROUND)
.with_timeout(NEED_BLOCK_QUERY_TIMEOUT),
)
- });
- let who_needs_resps = join_all(who_needs_fut).await;
+ .await?;
let mut need_nodes = vec![];
- for (node, needed) in who.iter().zip(who_needs_resps.into_iter()) {
+ for (node, needed) in who_needs_resps {
match needed.err_context("NeedBlockQuery RPC")? {
BlockRpc::NeedBlockReply(needed) => {
if needed {
- need_nodes.push(*node);
+ need_nodes.push(node);
}
}
m => {
@@ -376,7 +379,13 @@ impl BlockResyncManager {
.add(1, &[KeyValue::new("to", format!("{:?}", node))]);
}
- let put_block_message = manager.read_block(hash).await?;
+ let block = manager.read_block(hash).await?;
+ let (header, bytes) = block.into_parts();
+ let put_block_message = Req::new(BlockRpc::PutBlock {
+ hash: *hash,
+ header,
+ })?
+ .with_stream_from_buffer(bytes);
manager
.system
.rpc
@@ -409,7 +418,7 @@ impl BlockResyncManager {
hash
);
- let block_data = manager.rpc_get_raw_block(hash).await?;
+ let block_data = manager.rpc_get_raw_block(hash, None).await?;
manager.metrics.resync_recv_counter.add(1);
diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml
index f30b88ac..dcb3b78e 100644
--- a/src/garage/Cargo.toml
+++ b/src/garage/Cargo.toml
@@ -50,9 +50,8 @@ futures = "0.3"
futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
-#netapp = { version = "0.3.2", git = "https://git.deuxfleurs.fr/lx/netapp" }
-#netapp = { version = "0.4", path = "../../../netapp" }
-netapp = "0.4"
+#netapp = "0.4"
+netapp = { version = "0.5", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] }
opentelemetry = { version = "0.17", features = [ "rt-tokio" ] }
opentelemetry-prometheus = { version = "0.10", optional = true }
diff --git a/src/garage/admin.rs b/src/garage/admin.rs
index b4d2d1a1..802a8261 100644
--- a/src/garage/admin.rs
+++ b/src/garage/admin.rs
@@ -683,7 +683,7 @@ impl AdminRpcHandler {
.endpoint
.call(
&node,
- &AdminRpc::LaunchRepair(opt_to_send.clone()),
+ AdminRpc::LaunchRepair(opt_to_send.clone()),
PRIO_NORMAL,
)
.await;
@@ -723,7 +723,7 @@ impl AdminRpcHandler {
let node_id = (*node).into();
match self
.endpoint
- .call(&node_id, &AdminRpc::Stats(opt), PRIO_NORMAL)
+ .call(&node_id, AdminRpc::Stats(opt), PRIO_NORMAL)
.await?
{
Ok(AdminRpc::Ok(s)) => writeln!(&mut ret, "{}", s).unwrap(),
diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs
index 1aa2c2ff..c8b96489 100644
--- a/src/garage/cli/cmd.rs
+++ b/src/garage/cli/cmd.rs
@@ -47,7 +47,7 @@ pub async fn cli_command_dispatch(
pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> Result<(), Error> {
let status = match rpc_cli
- .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL)
+ .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL)
.await??
{
SystemRpc::ReturnKnownNodes(nodes) => nodes,
@@ -149,7 +149,7 @@ pub async fn cmd_connect(
args: ConnectNodeOpt,
) -> Result<(), Error> {
match rpc_cli
- .call(&rpc_host, &SystemRpc::Connect(args.node), PRIO_NORMAL)
+ .call(&rpc_host, SystemRpc::Connect(args.node), PRIO_NORMAL)
.await??
{
SystemRpc::Ok => {
@@ -165,7 +165,7 @@ pub async fn cmd_admin(
rpc_host: NodeID,
args: AdminRpc,
) -> Result<(), HelperError> {
- match rpc_cli.call(&rpc_host, &args, PRIO_NORMAL).await?? {
+ match rpc_cli.call(&rpc_host, args, PRIO_NORMAL).await?? {
AdminRpc::Ok(msg) => {
println!("{}", msg);
}
diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs
index db0af57c..3884bb92 100644
--- a/src/garage/cli/layout.rs
+++ b/src/garage/cli/layout.rs
@@ -36,7 +36,7 @@ pub async fn cmd_assign_role(
args: AssignRoleOpt,
) -> Result<(), Error> {
let status = match rpc_cli
- .call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL)
+ .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL)
.await??
{
SystemRpc::ReturnKnownNodes(nodes) => nodes,
@@ -245,7 +245,7 @@ pub async fn fetch_layout(
rpc_host: NodeID,
) -> Result<ClusterLayout, Error> {
match rpc_cli
- .call(&rpc_host, &SystemRpc::PullClusterLayout, PRIO_NORMAL)
+ .call(&rpc_host, SystemRpc::PullClusterLayout, PRIO_NORMAL)
.await??
{
SystemRpc::AdvertiseClusterLayout(t) => Ok(t),
@@ -261,7 +261,7 @@ pub async fn send_layout(
rpc_cli
.call(
&rpc_host,
- &SystemRpc::AdvertiseClusterLayout(layout),
+ SystemRpc::AdvertiseClusterLayout(layout),
PRIO_NORMAL,
)
.await??;
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml
index bbcfe89c..d6e2adfe 100644
--- a/src/model/Cargo.toml
+++ b/src/model/Cargo.toml
@@ -39,9 +39,8 @@ futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
opentelemetry = "0.17"
-#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
-#netapp = { version = "0.4", path = "../../../netapp" }
-netapp = "0.4"
+#netapp = "0.4"
+netapp = { version = "0.5", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] }
[features]
k2v = [ "garage_util/k2v" ]
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index 7263d9cc..d7136401 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -45,9 +45,8 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi
tokio-stream = { version = "0.1", features = ["net"] }
opentelemetry = "0.17"
-#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
-#netapp = { version = "0.4", path = "../../../netapp", features = ["telemetry"] }
-netapp = { version = "0.4.5", features = ["telemetry"] }
+#netapp = { version = "0.4.5", features = ["telemetry"] }
+netapp = { version = "0.5.0", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] }
hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] }
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 34717d3b..19abb4c5 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -15,10 +15,14 @@ use opentelemetry::{
Context,
};
-pub use netapp::endpoint::{Endpoint, EndpointHandler, Message as Rpc};
+pub use netapp::endpoint::{Endpoint, EndpointHandler, StreamingEndpointHandler};
+use netapp::message::IntoReq;
+pub use netapp::message::{
+ Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL,
+ PRIO_SECONDARY,
+};
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
-pub use netapp::proto::*;
-pub use netapp::{NetApp, NodeID};
+pub use netapp::{self, NetApp, NodeID};
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
@@ -28,12 +32,10 @@ use garage_util::metrics::RecordDuration;
use crate::metrics::RpcMetrics;
use crate::ring::Ring;
-const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
+const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
-// Try to never have more than 200MB of outgoing requests
-// buffered at the same time. Other requests are queued until
-// space is freed.
-const REQUEST_BUFFER_SIZE: usize = 200 * 1024 * 1024;
+// Don't allow more than 100 concurrent outgoing RPCs.
+const MAX_CONCURRENT_REQUESTS: usize = 100;
/// Strategy to apply when making RPC
#[derive(Copy, Clone)]
@@ -95,7 +97,7 @@ impl RpcHelper {
background: Arc<BackgroundRunner>,
ring: watch::Receiver<Arc<Ring>>,
) -> Self {
- let sem = Arc::new(Semaphore::new(REQUEST_BUFFER_SIZE));
+ let sem = Arc::new(Semaphore::new(MAX_CONCURRENT_REQUESTS));
let metrics = RpcMetrics::new(sem.clone());
@@ -109,30 +111,17 @@ impl RpcHelper {
}))
}
- pub async fn call<M, H, S>(
+ pub async fn call<M, N, H, S>(
&self,
endpoint: &Endpoint<M, H>,
to: Uuid,
- msg: M,
+ msg: N,
strat: RequestStrategy,
) -> Result<S, Error>
where
M: Rpc<Response = Result<S, Error>>,
- H: EndpointHandler<M>,
- {
- self.call_arc(endpoint, to, Arc::new(msg), strat).await
- }
-
- pub async fn call_arc<M, H, S>(
- &self,
- endpoint: &Endpoint<M, H>,
- to: Uuid,
- msg: Arc<M>,
- strat: RequestStrategy,
- ) -> Result<S, Error>
- where
- M: Rpc<Response = Result<S, Error>>,
- H: EndpointHandler<M>,
+ N: IntoReq<M> + Send,
+ H: StreamingEndpointHandler<M>,
{
let metric_tags = [
KeyValue::new("rpc_endpoint", endpoint.path().to_string()),
@@ -140,11 +129,10 @@ impl RpcHelper {
KeyValue::new("to", format!("{:?}", to)),
];
- let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32;
let permit = self
.0
.request_buffer_semaphore
- .acquire_many(msg_size)
+ .acquire()
.record_duration(&self.0.metrics.rpc_queueing_time, &metric_tags)
.await?;
@@ -152,7 +140,7 @@ impl RpcHelper {
let node_id = to.into();
let rpc_call = endpoint
- .call(&node_id, msg, strat.rs_priority)
+ .call_streaming(&node_id, msg, strat.rs_priority)
.record_duration(&self.0.metrics.rpc_duration, &metric_tags);
select! {
@@ -162,7 +150,7 @@ impl RpcHelper {
if res.is_err() {
self.0.metrics.rpc_netapp_error_counter.add(1, &metric_tags);
}
- let res = res?;
+ let res = res?.into_msg();
if res.is_err() {
self.0.metrics.rpc_garage_error_counter.add(1, &metric_tags);
@@ -178,38 +166,42 @@ impl RpcHelper {
}
}
- pub async fn call_many<M, H, S>(
+ pub async fn call_many<M, N, H, S>(
&self,
endpoint: &Endpoint<M, H>,
to: &[Uuid],
- msg: M,
+ msg: N,
strat: RequestStrategy,
- ) -> Vec<(Uuid, Result<S, Error>)>
+ ) -> Result<Vec<(Uuid, Result<S, Error>)>, Error>
where
M: Rpc<Response = Result<S, Error>>,
- H: EndpointHandler<M>,
+ N: IntoReq<M>,
+ H: StreamingEndpointHandler<M>,
{
- let msg = Arc::new(msg);
+ let msg = msg.into_req().map_err(netapp::error::Error::from)?;
+
let resps = join_all(
to.iter()
- .map(|to| self.call_arc(endpoint, *to, msg.clone(), strat)),
+ .map(|to| self.call(endpoint, *to, msg.clone(), strat)),
)
.await;
- to.iter()
+ Ok(to
+ .iter()
.cloned()
.zip(resps.into_iter())
- .collect::<Vec<_>>()
+ .collect::<Vec<_>>())
}
- pub async fn broadcast<M, H, S>(
+ pub async fn broadcast<M, N, H, S>(
&self,
endpoint: &Endpoint<M, H>,
- msg: M,
+ msg: N,
strat: RequestStrategy,
- ) -> Vec<(Uuid, Result<S, Error>)>
+ ) -> Result<Vec<(Uuid, Result<S, Error>)>, Error>
where
M: Rpc<Response = Result<S, Error>>,
- H: EndpointHandler<M>,
+ N: IntoReq<M>,
+ H: StreamingEndpointHandler<M>,
{
let to = self
.0
@@ -223,16 +215,17 @@ impl RpcHelper {
/// Make a RPC call to multiple servers, returning either a Vec of responses,
/// or an error if quorum could not be reached due to too many errors
- pub async fn try_call_many<M, H, S>(
+ pub async fn try_call_many<M, N, H, S>(
&self,
endpoint: &Arc<Endpoint<M, H>>,
to: &[Uuid],
- msg: M,
+ msg: N,
strategy: RequestStrategy,
) -> Result<Vec<S>, Error>
where
M: Rpc<Response = Result<S, Error>> + 'static,
- H: EndpointHandler<M> + 'static,
+ N: IntoReq<M>,
+ H: StreamingEndpointHandler<M> + 'static,
S: Send + 'static,
{
let quorum = strategy.rs_quorum.unwrap_or(to.len());
@@ -262,20 +255,21 @@ impl RpcHelper {
.await
}
- async fn try_call_many_internal<M, H, S>(
+ async fn try_call_many_internal<M, N, H, S>(
&self,
endpoint: &Arc<Endpoint<M, H>>,
to: &[Uuid],
- msg: M,
+ msg: N,
strategy: RequestStrategy,
quorum: usize,
) -> Result<Vec<S>, Error>
where
M: Rpc<Response = Result<S, Error>> + 'static,
- H: EndpointHandler<M> + 'static,
+ N: IntoReq<M>,
+ H: StreamingEndpointHandler<M> + 'static,
S: Send + 'static,
{
- let msg = Arc::new(msg);
+ let msg = msg.into_req().map_err(netapp::error::Error::from)?;
// Build future for each request
// They are not started now: they are added below in a FuturesUnordered
@@ -285,7 +279,7 @@ impl RpcHelper {
let msg = msg.clone();
let endpoint2 = endpoint.clone();
(to, async move {
- self2.call_arc(&endpoint2, to, msg, strategy).await
+ self2.call(&endpoint2, to, msg, strategy).await
})
});
@@ -299,47 +293,19 @@ impl RpcHelper {
// to reach a quorum, priorizing nodes with the lowest latency.
// When there are errors, we start new requests to compensate.
- // Retrieve some status variables that we will use to sort requests
- let peer_list = self.0.fullmesh.get_peer_list();
- let ring: Arc<Ring> = self.0.ring.borrow().clone();
- let our_zone = match ring.layout.node_role(&self.0.our_node_id) {
- Some(pc) => &pc.zone,
- None => "",
- };
-
- // Augment requests with some information used to sort them.
- // The tuples are as follows:
- // (is another node?, is another zone?, latency, node ID, request future)
- // We store all of these tuples in a vec that we can sort.
- // By sorting this vec, we priorize ourself, then nodes in the same zone,
- // and within a same zone we priorize nodes with the lowest latency.
- let mut requests = requests
- .map(|(to, fut)| {
- let peer_zone = match ring.layout.node_role(&to) {
- Some(pc) => &pc.zone,
- None => "",
- };
- let peer_avg_ping = peer_list
- .iter()
- .find(|x| x.id.as_ref() == to.as_slice())
- .and_then(|pi| pi.avg_ping)
- .unwrap_or_else(|| Duration::from_secs(1));
- (
- to != self.0.our_node_id,
- peer_zone != our_zone,
- peer_avg_ping,
- to,
- fut,
- )
- })
+ // Reorder requests to priorize closeness / low latency
+ let request_order = self.request_order(to);
+ let mut ord_requests = vec![(); request_order.len()]
+ .into_iter()
+ .map(|_| None)
.collect::<Vec<_>>();
-
- // Sort requests by (priorize ourself, priorize same zone, priorize low latency)
- requests
- .sort_by_key(|(diffnode, diffzone, ping, _to, _fut)| (*diffnode, *diffzone, *ping));
+ for (to, fut) in requests {
+ let i = request_order.iter().position(|x| *x == to).unwrap();
+ ord_requests[i] = Some((to, fut));
+ }
// Make an iterator to take requests in their sorted order
- let mut requests = requests.into_iter();
+ let mut requests = ord_requests.into_iter().map(Option::unwrap);
// resp_stream will contain all of the requests that are currently in flight.
// (for the moment none, they will be added in the loop below)
@@ -350,7 +316,7 @@ impl RpcHelper {
// If the current set of requests that are running is not enough to possibly
// reach quorum, start some new requests.
while successes.len() + resp_stream.len() < quorum {
- if let Some((_, _, _, req_to, fut)) = requests.next() {
+ if let Some((req_to, fut)) = requests.next() {
let tracer = opentelemetry::global::tracer("garage");
let span = tracer.start(format!("RPC to {:?}", req_to));
resp_stream.push(tokio::spawn(
@@ -420,4 +386,49 @@ impl RpcHelper {
Err(Error::Quorum(quorum, successes.len(), to.len(), errors))
}
}
+
+ pub fn request_order(&self, nodes: &[Uuid]) -> Vec<Uuid> {
+ // Retrieve some status variables that we will use to sort requests
+ let peer_list = self.0.fullmesh.get_peer_list();
+ let ring: Arc<Ring> = self.0.ring.borrow().clone();
+ let our_zone = match ring.layout.node_role(&self.0.our_node_id) {
+ Some(pc) => &pc.zone,
+ None => "",
+ };
+
+ // Augment requests with some information used to sort them.
+ // The tuples are as follows:
+ // (is another node?, is another zone?, latency, node ID, request future)
+ // We store all of these tuples in a vec that we can sort.
+ // By sorting this vec, we priorize ourself, then nodes in the same zone,
+ // and within a same zone we priorize nodes with the lowest latency.
+ let mut nodes = nodes
+ .iter()
+ .map(|to| {
+ let peer_zone = match ring.layout.node_role(to) {
+ Some(pc) => &pc.zone,
+ None => "",
+ };
+ let peer_avg_ping = peer_list
+ .iter()
+ .find(|x| x.id.as_ref() == to.as_slice())
+ .and_then(|pi| pi.avg_ping)
+ .unwrap_or_else(|| Duration::from_secs(1));
+ (
+ *to != self.0.our_node_id,
+ peer_zone != our_zone,
+ peer_avg_ping,
+ *to,
+ )
+ })
+ .collect::<Vec<_>>();
+
+ // Sort requests by (priorize ourself, priorize same zone, priorize low latency)
+ nodes.sort_by_key(|(diffnode, diffzone, ping, _to)| (*diffnode, *diffzone, *ping));
+
+ nodes
+ .into_iter()
+ .map(|(_, _, _, to)| to)
+ .collect::<Vec<_>>()
+ }
}
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index d621f59f..c0e70c61 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -16,8 +16,8 @@ use tokio::sync::watch;
use tokio::sync::Mutex;
use netapp::endpoint::{Endpoint, EndpointHandler};
+use netapp::message::*;
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
-use netapp::proto::*;
use netapp::util::parse_and_resolve_peer_addr;
use netapp::{NetApp, NetworkKey, NodeID, NodeKey};
@@ -37,7 +37,7 @@ use crate::rpc_helper::*;
const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10);
-const PING_TIMEOUT: Duration = Duration::from_secs(2);
+const SYSTEM_RPC_TIMEOUT: Duration = Duration::from_secs(15);
/// Version tag used for version check upon Netapp connection.
/// Cluster nodes with different version tags are deemed
@@ -538,7 +538,7 @@ impl System {
SystemRpc::AdvertiseClusterLayout(layout),
RequestStrategy::with_priority(PRIO_HIGH),
)
- .await;
+ .await?;
Ok(())
});
self.background.spawn(self.clone().save_cluster_layout());
@@ -553,11 +553,12 @@ impl System {
self.update_local_status();
let local_status: NodeStatus = self.local_status.load().as_ref().clone();
- self.rpc
+ let _ = self
+ .rpc
.broadcast(
&self.system_endpoint,
SystemRpc::AdvertiseStatus(local_status),
- RequestStrategy::with_priority(PRIO_HIGH).with_timeout(PING_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_HIGH).with_timeout(SYSTEM_RPC_TIMEOUT),
)
.await;
@@ -681,7 +682,7 @@ impl System {
&self.system_endpoint,
peer,
SystemRpc::PullClusterLayout,
- RequestStrategy::with_priority(PRIO_HIGH).with_timeout(PING_TIMEOUT),
+ RequestStrategy::with_priority(PRIO_HIGH).with_timeout(SYSTEM_RPC_TIMEOUT),
)
.await;
if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp {
diff --git a/src/table/gc.rs b/src/table/gc.rs
index 12218d97..6cae9701 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -25,7 +25,8 @@ use crate::replication::*;
use crate::schema::*;
const TABLE_GC_BATCH_SIZE: usize = 1024;
-const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
+// Same timeout as NEED_BLOCK_QUERY_TIMEOUT in block manager
+const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(15);
// GC delay for table entries: 1 day (24 hours)
// (the delay before the entry is added in the GC todo list
diff --git a/src/table/schema.rs b/src/table/schema.rs
index 74f57798..f37e98d8 100644
--- a/src/table/schema.rs
+++ b/src/table/schema.rs
@@ -60,7 +60,7 @@ pub trait Entry<P: PartitionKey, S: SortKey>:
}
/// Trait for the schema used in a table
-pub trait TableSchema: Send + Sync {
+pub trait TableSchema: Send + Sync + 'static {
/// The name of the table in the database
const TABLE_NAME: &'static str;
diff --git a/src/table/sync.rs b/src/table/sync.rs
index b3756a5e..62b88a58 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -24,7 +24,8 @@ use crate::merkle::*;
use crate::replication::*;
use crate::*;
-const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(30);
+// Sync RPC can contain a lot of data, so have a 1min timeout
+const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(60);
// Do anti-entropy every 10 minutes
const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60);
diff --git a/src/table/table.rs b/src/table/table.rs
index 3c211728..51f3837f 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -31,7 +31,7 @@ use crate::schema::*;
use crate::sync::*;
use crate::util::*;
-pub const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10);
+pub const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(30);
pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> {
pub system: Arc<System>,
diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml
index d5c194e8..c648e13b 100644
--- a/src/util/Cargo.toml
+++ b/src/util/Cargo.toml
@@ -39,9 +39,8 @@ toml = "0.5"
futures = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
-#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" }
-#netapp = { version = "0.4", path = "../../../netapp" }
-netapp = "0.4"
+#netapp = "0.4"
+netapp = { version = "0.5", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] }
http = "0.2"
hyper = "0.14"