aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex <alex@adnab.me>2024-03-28 12:40:27 +0000
committerAlex <alex@adnab.me>2024-03-28 12:40:27 +0000
commitecf641d88c264f7278d13a6d988288feb24a5dfe (patch)
tree5cd60dfa4f0d6d32a66d2e32d7912c9e289067c8 /src
parent75cd14926d8dec8c36289197822df78391686c6a (diff)
parent85f580cbde4913fe8382316ff3c27b8443c61dd7 (diff)
downloadgarage-ecf641d88c264f7278d13a6d988288feb24a5dfe.tar.gz
garage-ecf641d88c264f7278d13a6d988288feb24a5dfe.zip
Merge pull request 'Fix unbounded buffering when one node has slower network' (#792) from fix-buffering into main
Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/792
Diffstat (limited to 'src')
-rw-r--r--src/block/manager.rs37
-rw-r--r--src/block/metrics.rs15
-rw-r--r--src/block/resync.rs6
-rw-r--r--src/model/k2v/rpc.rs6
-rw-r--r--src/net/message.rs18
-rw-r--r--src/net/send.rs85
-rw-r--r--src/net/server.rs2
-rw-r--r--src/rpc/rpc_helper.rs68
-rw-r--r--src/util/config.rs11
9 files changed, 187 insertions, 61 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 34d854b9..62829a24 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -1,3 +1,4 @@
+use std::convert::TryInto;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
@@ -10,7 +11,7 @@ use serde::{Deserialize, Serialize};
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
-use tokio::sync::{mpsc, Mutex, MutexGuard};
+use tokio::sync::{mpsc, Mutex, MutexGuard, Semaphore};
use opentelemetry::{
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
@@ -93,6 +94,7 @@ pub struct BlockManager {
pub(crate) system: Arc<System>,
pub(crate) endpoint: Arc<Endpoint<BlockRpc, Self>>,
+ buffer_kb_semaphore: Arc<Semaphore>,
pub(crate) metrics: BlockManagerMetrics,
@@ -152,11 +154,14 @@ impl BlockManager {
.netapp
.endpoint("garage_block/manager.rs/Rpc".to_string());
+ let buffer_kb_semaphore = Arc::new(Semaphore::new(config.block_ram_buffer_max / 1024));
+
let metrics = BlockManagerMetrics::new(
config.compression_level,
rc.rc.clone(),
resync.queue.clone(),
resync.errors.clone(),
+ buffer_kb_semaphore.clone(),
);
let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info");
@@ -176,6 +181,7 @@ impl BlockManager {
resync,
system,
endpoint,
+ buffer_kb_semaphore,
metrics,
scrub_persister,
tx_scrub_command: ArcSwapOption::new(None),
@@ -232,10 +238,16 @@ impl BlockManager {
async fn rpc_get_raw_block_streaming(
&self,
hash: &Hash,
+ priority: RequestPriority,
order_tag: Option<OrderTag>,
) -> Result<DataBlockStream, Error> {
- self.rpc_get_raw_block_internal(hash, order_tag, |stream| async move { Ok(stream) })
- .await
+ self.rpc_get_raw_block_internal(
+ hash,
+ priority,
+ order_tag,
+ |stream| async move { Ok(stream) },
+ )
+ .await
}
/// Ask nodes that might have a (possibly compressed) block for it
@@ -243,9 +255,10 @@ impl BlockManager {
pub(crate) async fn rpc_get_raw_block(
&self,
hash: &Hash,
+ priority: RequestPriority,
order_tag: Option<OrderTag>,
) -> Result<DataBlock, Error> {
- self.rpc_get_raw_block_internal(hash, order_tag, |block_stream| async move {
+ self.rpc_get_raw_block_internal(hash, priority, order_tag, |block_stream| async move {
let (header, stream) = block_stream.into_parts();
read_stream_to_end(stream)
.await
@@ -258,6 +271,7 @@ impl BlockManager {
async fn rpc_get_raw_block_internal<F, Fut, T>(
&self,
hash: &Hash,
+ priority: RequestPriority,
order_tag: Option<OrderTag>,
f: F,
) -> Result<T, Error>
@@ -273,7 +287,7 @@ impl BlockManager {
let rpc = self.endpoint.call_streaming(
&node_id,
BlockRpc::GetBlock(*hash, order_tag),
- PRIO_NORMAL | PRIO_SECONDARY,
+ priority,
);
tokio::select! {
res = rpc => {
@@ -325,7 +339,9 @@ impl BlockManager {
hash: &Hash,
order_tag: Option<OrderTag>,
) -> Result<ByteStream, Error> {
- let block_stream = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
+ let block_stream = self
+ .rpc_get_raw_block_streaming(hash, PRIO_NORMAL | PRIO_SECONDARY, order_tag)
+ .await?;
let (header, stream) = block_stream.into_parts();
match header {
DataBlockHeader::Plain => Ok(stream),
@@ -361,6 +377,14 @@ impl BlockManager {
let (header, bytes) = DataBlock::from_buffer(data, self.compression_level)
.await
.into_parts();
+
+ let permit = self
+ .buffer_kb_semaphore
+ .clone()
+ .acquire_many_owned((bytes.len() / 1024).try_into().unwrap())
+ .await
+ .ok_or_message("could not reserve space for buffer of data to send to remote nodes")?;
+
let put_block_rpc =
Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes);
let put_block_rpc = if let Some(tag) = order_tag {
@@ -376,6 +400,7 @@ impl BlockManager {
&who[..],
put_block_rpc,
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
+ .with_drop_on_completion(permit)
.with_quorum(self.replication.write_quorum()),
)
.await?;
diff --git a/src/block/metrics.rs b/src/block/metrics.rs
index 6659df32..c989f940 100644
--- a/src/block/metrics.rs
+++ b/src/block/metrics.rs
@@ -1,3 +1,7 @@
+use std::sync::Arc;
+
+use tokio::sync::Semaphore;
+
use opentelemetry::{global, metrics::*};
use garage_db as db;
@@ -9,6 +13,7 @@ pub struct BlockManagerMetrics {
pub(crate) _rc_size: ValueObserver<u64>,
pub(crate) _resync_queue_len: ValueObserver<u64>,
pub(crate) _resync_errored_blocks: ValueObserver<u64>,
+ pub(crate) _buffer_free_kb: ValueObserver<u64>,
pub(crate) resync_counter: BoundCounter<u64>,
pub(crate) resync_error_counter: BoundCounter<u64>,
@@ -31,6 +36,7 @@ impl BlockManagerMetrics {
rc_tree: db::Tree,
resync_queue: CountedTree,
resync_errors: CountedTree,
+ buffer_semaphore: Arc<Semaphore>,
) -> Self {
let meter = global::meter("garage_model/block");
Self {
@@ -66,6 +72,15 @@ impl BlockManagerMetrics {
.with_description("Number of block hashes whose last resync resulted in an error")
.init(),
+ _buffer_free_kb: meter
+ .u64_value_observer("block.ram_buffer_free_kb", move |observer| {
+ observer.observe(buffer_semaphore.available_permits() as u64, &[])
+ })
+ .with_description(
+ "Available RAM in KiB to use for buffering data blocks to be written to remote nodes",
+ )
+ .init(),
+
resync_counter: meter
.u64_counter("block.resync_counter")
.with_description("Number of calls to resync_block")
diff --git a/src/block/resync.rs b/src/block/resync.rs
index 9c1da4a7..7221b093 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -436,7 +436,7 @@ impl BlockResyncManager {
&manager.endpoint,
&need_nodes[..],
put_block_message,
- RequestStrategy::with_priority(PRIO_BACKGROUND)
+ RequestStrategy::with_priority(PRIO_BACKGROUND | PRIO_SECONDARY)
.with_quorum(need_nodes.len()),
)
.await
@@ -460,7 +460,9 @@ impl BlockResyncManager {
hash
);
- let block_data = manager.rpc_get_raw_block(hash, None).await?;
+ let block_data = manager
+ .rpc_get_raw_block(hash, PRIO_BACKGROUND | PRIO_SECONDARY, None)
+ .await?;
manager.metrics.resync_recv_counter.add(1);
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index 4ab44c22..af7df341 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -300,7 +300,11 @@ impl K2VRpcHandler {
let rs = RequestStrategy::with_priority(PRIO_NORMAL).without_timeout();
let mut requests = nodes
.iter()
- .map(|node| self.system.rpc.call(&self.endpoint, *node, msg.clone(), rs))
+ .map(|node| {
+ self.system
+ .rpc
+ .call(&self.endpoint, *node, msg.clone(), rs.clone())
+ })
.collect::<FuturesUnordered<_>>();
// Fetch responses. This procedure stops fetching responses when any of the following
diff --git a/src/net/message.rs b/src/net/message.rs
index b0d255c6..af98ca12 100644
--- a/src/net/message.rs
+++ b/src/net/message.rs
@@ -28,12 +28,30 @@ use crate::util::*;
/// The same priority value is given to a request and to its associated response.
pub type RequestPriority = u8;
+// Usage of priority levels in Garage:
+//
+// PRIO_HIGH
+// for liveness check events such as pings and important
+// reconfiguration events such as layout changes
+//
+// PRIO_NORMAL
+// for standard interactive requests to exchange metadata
+//
+// PRIO_NORMAL | PRIO_SECONDARY
+// for standard interactive requests to exchange block data
+//
+// PRIO_BACKGROUND
+// for background resync requests to exchange metadata
+// PRIO_BACKGROUND | PRIO_SECONDARY
+// for background resync requests to exchange block data
+
/// Priority class: high
pub const PRIO_HIGH: RequestPriority = 0x20;
/// Priority class: normal
pub const PRIO_NORMAL: RequestPriority = 0x40;
/// Priority class: background
pub const PRIO_BACKGROUND: RequestPriority = 0x80;
+
/// Priority: primary among given class
pub const PRIO_PRIMARY: RequestPriority = 0x00;
/// Priority: secondary among given class (ex: `PRIO_HIGH | PRIO_SECONDARY`)
diff --git a/src/net/send.rs b/src/net/send.rs
index 0db0ba77..c60fc6b2 100644
--- a/src/net/send.rs
+++ b/src/net/send.rs
@@ -109,7 +109,7 @@ impl SendQueuePriority {
let i = order_vec.iter().take_while(|o2| **o2 < order).count();
order_vec.insert(i, order);
}
- self.items.push_front(item);
+ self.items.push_back(item);
}
fn remove(&mut self, id: RequestID) {
if let Some(i) = self.items.iter().position(|x| x.id == id) {
@@ -128,51 +128,56 @@ impl SendQueuePriority {
self.items.is_empty()
}
fn poll_next_ready(&mut self, ctx: &mut Context<'_>) -> Poll<(RequestID, DataFrame)> {
- for (j, item) in self.items.iter_mut().enumerate() {
- if let Some(OrderTag(stream, order)) = item.order_tag {
- if order > *self.order.get(&stream).unwrap().front().unwrap() {
+ // in step 1: poll only streams that have sent 0 bytes, we want to send them in priority
+ // as they most likely represent small requests to be sent first
+ // in step 2: poll all streams
+ for step in 0..2 {
+ for (j, item) in self.items.iter_mut().enumerate() {
+ if let Some(OrderTag(stream, order)) = item.order_tag {
+ if order > *self.order.get(&stream).unwrap().front().unwrap() {
+ continue;
+ }
+ }
+
+ if step == 0 && item.sent > 0 {
continue;
}
- }
- let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize);
- if let Poll::Ready(bytes_or_err) = Pin::new(&mut item_reader).poll(ctx) {
- let id = item.id;
- let eos = item.data.eos();
-
- let packet = bytes_or_err.map_err(|e| match e {
- ReadExactError::Stream(err) => err,
- _ => unreachable!(),
- });
-
- let is_err = packet.is_err();
- let data_frame = DataFrame::from_packet(packet, !eos);
- item.sent += data_frame.data().len();
-
- if eos || is_err {
- // If item had an order tag, remove it from the corresponding ordering list
- if let Some(OrderTag(stream, order)) = item.order_tag {
- let order_stream = self.order.get_mut(&stream).unwrap();
- assert_eq!(order_stream.pop_front(), Some(order));
- if order_stream.is_empty() {
- self.order.remove(&stream);
+ let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize);
+ if let Poll::Ready(bytes_or_err) = Pin::new(&mut item_reader).poll(ctx) {
+ let id = item.id;
+ let eos = item.data.eos();
+
+ let packet = bytes_or_err.map_err(|e| match e {
+ ReadExactError::Stream(err) => err,
+ _ => unreachable!(),
+ });
+
+ let is_err = packet.is_err();
+ let data_frame = DataFrame::from_packet(packet, !eos);
+ item.sent += data_frame.data().len();
+
+ if eos || is_err {
+ // If item had an order tag, remove it from the corresponding ordering list
+ if let Some(OrderTag(stream, order)) = item.order_tag {
+ let order_stream = self.order.get_mut(&stream).unwrap();
+ assert_eq!(order_stream.pop_front(), Some(order));
+ if order_stream.is_empty() {
+ self.order.remove(&stream);
+ }
}
+ // Remove item from sending queue
+ self.items.remove(j);
+ } else if step == 0 {
+ // Step 0 means that this stream had not sent any bytes yet.
+ // Now that it has, and it was not an EOS, we know that it is bigger
+ // than one chunk so move it at the end of the queue.
+ let item = self.items.remove(j).unwrap();
+ self.items.push_back(item);
}
- // Remove item from sending queue
- self.items.remove(j);
- } else {
- // Move item later in send queue to implement LAS scheduling
- // (LAS = Least Attained Service)
- for k in j..self.items.len() - 1 {
- if self.items[k].sent >= self.items[k + 1].sent {
- self.items.swap(k, k + 1);
- } else {
- break;
- }
- }
- }
- return Poll::Ready((id, data_frame));
+ return Poll::Ready((id, data_frame));
+ }
}
}
diff --git a/src/net/server.rs b/src/net/server.rs
index 55b9e678..36dccb2f 100644
--- a/src/net/server.rs
+++ b/src/net/server.rs
@@ -190,7 +190,7 @@ impl RecvLoop for ServerConn {
let (prio, resp_enc_result) = match ReqEnc::decode(stream).await {
Ok(req_enc) => (req_enc.prio, self2.recv_handler_aux(req_enc).await),
- Err(e) => (PRIO_HIGH, Err(e)),
+ Err(e) => (PRIO_NORMAL, Err(e)),
};
debug!("server: sending response to {}", id);
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index c46e577f..a1b7951c 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -33,8 +33,7 @@ use crate::ring::Ring;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
/// Strategy to apply when making RPC
-#[derive(Copy, Clone)]
-pub struct RequestStrategy {
+pub struct RequestStrategy<T> {
/// Min number of response to consider the request successful
pub rs_quorum: Option<usize>,
/// Should requests be dropped after enough response are received
@@ -43,6 +42,8 @@ pub struct RequestStrategy {
pub rs_priority: RequestPriority,
/// Custom timeout for this request
rs_timeout: Timeout,
+ /// Data to drop when everything completes
+ rs_drop_on_complete: T,
}
#[derive(Copy, Clone)]
@@ -52,7 +53,19 @@ enum Timeout {
Custom(Duration),
}
-impl RequestStrategy {
+impl Clone for RequestStrategy<()> {
+ fn clone(&self) -> Self {
+ RequestStrategy {
+ rs_quorum: self.rs_quorum,
+ rs_interrupt_after_quorum: self.rs_interrupt_after_quorum,
+ rs_priority: self.rs_priority,
+ rs_timeout: self.rs_timeout,
+ rs_drop_on_complete: (),
+ }
+ }
+}
+
+impl RequestStrategy<()> {
/// Create a RequestStrategy with default timeout and not interrupting when quorum reached
pub fn with_priority(prio: RequestPriority) -> Self {
RequestStrategy {
@@ -60,8 +73,22 @@ impl RequestStrategy {
rs_interrupt_after_quorum: false,
rs_priority: prio,
rs_timeout: Timeout::Default,
+ rs_drop_on_complete: (),
+ }
+ }
+ /// Add an item to be dropped on completion
+ pub fn with_drop_on_completion<T>(self, drop_on_complete: T) -> RequestStrategy<T> {
+ RequestStrategy {
+ rs_quorum: self.rs_quorum,
+ rs_interrupt_after_quorum: self.rs_interrupt_after_quorum,
+ rs_priority: self.rs_priority,
+ rs_timeout: self.rs_timeout,
+ rs_drop_on_complete: drop_on_complete,
}
}
+}
+
+impl<T> RequestStrategy<T> {
/// Set quorum to be reached for request
pub fn with_quorum(mut self, quorum: usize) -> Self {
self.rs_quorum = Some(quorum);
@@ -83,6 +110,19 @@ impl RequestStrategy {
self.rs_timeout = Timeout::Custom(timeout);
self
}
+ /// Extract drop_on_complete item
+ fn extract_drop_on_complete(self) -> (RequestStrategy<()>, T) {
+ (
+ RequestStrategy {
+ rs_quorum: self.rs_quorum,
+ rs_interrupt_after_quorum: self.rs_interrupt_after_quorum,
+ rs_priority: self.rs_priority,
+ rs_timeout: self.rs_timeout,
+ rs_drop_on_complete: (),
+ },
+ self.rs_drop_on_complete,
+ )
+ }
}
#[derive(Clone)]
@@ -123,7 +163,7 @@ impl RpcHelper {
endpoint: &Endpoint<M, H>,
to: Uuid,
msg: N,
- strat: RequestStrategy,
+ strat: RequestStrategy<()>,
) -> Result<S, Error>
where
M: Rpc<Response = Result<S, Error>>,
@@ -176,7 +216,7 @@ impl RpcHelper {
endpoint: &Endpoint<M, H>,
to: &[Uuid],
msg: N,
- strat: RequestStrategy,
+ strat: RequestStrategy<()>,
) -> Result<Vec<(Uuid, Result<S, Error>)>, Error>
where
M: Rpc<Response = Result<S, Error>>,
@@ -187,7 +227,7 @@ impl RpcHelper {
let resps = join_all(
to.iter()
- .map(|to| self.call(endpoint, *to, msg.clone(), strat)),
+ .map(|to| self.call(endpoint, *to, msg.clone(), strat.clone())),
)
.await;
Ok(to
@@ -201,7 +241,7 @@ impl RpcHelper {
&self,
endpoint: &Endpoint<M, H>,
msg: N,
- strat: RequestStrategy,
+ strat: RequestStrategy<()>,
) -> Result<Vec<(Uuid, Result<S, Error>)>, Error>
where
M: Rpc<Response = Result<S, Error>>,
@@ -220,18 +260,19 @@ impl RpcHelper {
/// Make a RPC call to multiple servers, returning either a Vec of responses,
/// or an error if quorum could not be reached due to too many errors
- pub async fn try_call_many<M, N, H, S>(
+ pub async fn try_call_many<M, N, H, S, T>(
&self,
endpoint: &Arc<Endpoint<M, H>>,
to: &[Uuid],
msg: N,
- strategy: RequestStrategy,
+ strategy: RequestStrategy<T>,
) -> Result<Vec<S>, Error>
where
M: Rpc<Response = Result<S, Error>> + 'static,
N: IntoReq<M>,
H: StreamingEndpointHandler<M> + 'static,
S: Send + 'static,
+ T: Send + 'static,
{
let quorum = strategy.rs_quorum.unwrap_or(to.len());
@@ -260,12 +301,12 @@ impl RpcHelper {
.await
}
- async fn try_call_many_internal<M, N, H, S>(
+ async fn try_call_many_internal<M, N, H, S, T>(
&self,
endpoint: &Arc<Endpoint<M, H>>,
to: &[Uuid],
msg: N,
- strategy: RequestStrategy,
+ strategy: RequestStrategy<T>,
quorum: usize,
) -> Result<Vec<S>, Error>
where
@@ -273,9 +314,12 @@ impl RpcHelper {
N: IntoReq<M>,
H: StreamingEndpointHandler<M> + 'static,
S: Send + 'static,
+ T: Send + 'static,
{
let msg = msg.into_req().map_err(garage_net::error::Error::from)?;
+ let (strategy, drop_on_complete) = strategy.extract_drop_on_complete();
+
// Build future for each request
// They are not started now: they are added below in a FuturesUnordered
// object that will take care of polling them (see below)
@@ -283,6 +327,7 @@ impl RpcHelper {
let self2 = self.clone();
let msg = msg.clone();
let endpoint2 = endpoint.clone();
+ let strategy = strategy.clone();
(to, async move {
self2.call(&endpoint2, to, msg, strategy).await
})
@@ -377,6 +422,7 @@ impl RpcHelper {
// they have to be put in a proper queue that is persisted to disk.
tokio::spawn(async move {
resp_stream.collect::<Vec<Result<_, _>>>().await;
+ drop(drop_on_complete);
});
}
}
diff --git a/src/util/config.rs b/src/util/config.rs
index 8ecbdfbb..5372a1ec 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -52,6 +52,14 @@ pub struct Config {
)]
pub compression_level: Option<i32>,
+ /// Maximum amount of block data to buffer in RAM for sending to
+ /// remote nodes when these nodes are on slower links
+ #[serde(
+ deserialize_with = "deserialize_capacity",
+ default = "default_block_ram_buffer_max"
+ )]
+ pub block_ram_buffer_max: usize,
+
/// Skip the permission check of secret files. Useful when
/// POSIX ACLs (or more complex chmods) are used.
#[serde(default)]
@@ -255,6 +263,9 @@ fn default_sled_flush_every_ms() -> u64 {
fn default_block_size() -> usize {
1048576
}
+fn default_block_ram_buffer_max() -> usize {
+ 256 * 1024 * 1024
+}
fn default_compression() -> Option<i32> {
Some(1)