aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2024-03-27 15:26:08 +0100
committerAlex Auvolat <alex@adnab.me>2024-03-27 16:22:40 +0100
commit0d3e285d133459fd53e28f879a86c0de1a0c36df (patch)
tree4301ed7bd7ed72e79701c68bebdb4c5ea1db37a5 /src
parent95eb8808e8ede5439cf6352ce4f9a148fac2f236 (diff)
downloadgarage-0d3e285d133459fd53e28f879a86c0de1a0c36df.tar.gz
garage-0d3e285d133459fd53e28f879a86c0de1a0c36df.zip
[fix-buffering] implement `block_ram_buffer_max` to avoid excessive RAM usage
Diffstat (limited to 'src')
-rw-r--r--src/block/manager.rs17
-rw-r--r--src/block/metrics.rs15
-rw-r--r--src/model/k2v/rpc.rs6
-rw-r--r--src/rpc/rpc_helper.rs68
-rw-r--r--src/util/config.rs11
5 files changed, 104 insertions, 13 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 34d854b9..2c7c7aba 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -1,3 +1,4 @@
+use std::convert::TryInto;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
@@ -10,7 +11,7 @@ use serde::{Deserialize, Serialize};
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
-use tokio::sync::{mpsc, Mutex, MutexGuard};
+use tokio::sync::{mpsc, Mutex, MutexGuard, Semaphore};
use opentelemetry::{
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
@@ -93,6 +94,7 @@ pub struct BlockManager {
pub(crate) system: Arc<System>,
pub(crate) endpoint: Arc<Endpoint<BlockRpc, Self>>,
+ buffer_kb_semaphore: Arc<Semaphore>,
pub(crate) metrics: BlockManagerMetrics,
@@ -152,11 +154,14 @@ impl BlockManager {
.netapp
.endpoint("garage_block/manager.rs/Rpc".to_string());
+ let buffer_kb_semaphore = Arc::new(Semaphore::new(config.block_ram_buffer_max / 1024));
+
let metrics = BlockManagerMetrics::new(
config.compression_level,
rc.rc.clone(),
resync.queue.clone(),
resync.errors.clone(),
+ buffer_kb_semaphore.clone(),
);
let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info");
@@ -176,6 +181,7 @@ impl BlockManager {
resync,
system,
endpoint,
+ buffer_kb_semaphore,
metrics,
scrub_persister,
tx_scrub_command: ArcSwapOption::new(None),
@@ -361,6 +367,14 @@ impl BlockManager {
let (header, bytes) = DataBlock::from_buffer(data, self.compression_level)
.await
.into_parts();
+
+ let permit = self
+ .buffer_kb_semaphore
+ .clone()
+ .acquire_many_owned((bytes.len() / 1024).try_into().unwrap())
+ .await
+ .ok_or_message("could not reserve space for buffer of data to send to remote nodes")?;
+
let put_block_rpc =
Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes);
let put_block_rpc = if let Some(tag) = order_tag {
@@ -376,6 +390,7 @@ impl BlockManager {
&who[..],
put_block_rpc,
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
+ .with_drop_on_completion(permit)
.with_quorum(self.replication.write_quorum()),
)
.await?;
diff --git a/src/block/metrics.rs b/src/block/metrics.rs
index 6659df32..c989f940 100644
--- a/src/block/metrics.rs
+++ b/src/block/metrics.rs
@@ -1,3 +1,7 @@
+use std::sync::Arc;
+
+use tokio::sync::Semaphore;
+
use opentelemetry::{global, metrics::*};
use garage_db as db;
@@ -9,6 +13,7 @@ pub struct BlockManagerMetrics {
pub(crate) _rc_size: ValueObserver<u64>,
pub(crate) _resync_queue_len: ValueObserver<u64>,
pub(crate) _resync_errored_blocks: ValueObserver<u64>,
+ pub(crate) _buffer_free_kb: ValueObserver<u64>,
pub(crate) resync_counter: BoundCounter<u64>,
pub(crate) resync_error_counter: BoundCounter<u64>,
@@ -31,6 +36,7 @@ impl BlockManagerMetrics {
rc_tree: db::Tree,
resync_queue: CountedTree,
resync_errors: CountedTree,
+ buffer_semaphore: Arc<Semaphore>,
) -> Self {
let meter = global::meter("garage_model/block");
Self {
@@ -66,6 +72,15 @@ impl BlockManagerMetrics {
.with_description("Number of block hashes whose last resync resulted in an error")
.init(),
+ _buffer_free_kb: meter
+ .u64_value_observer("block.ram_buffer_free_kb", move |observer| {
+ observer.observe(buffer_semaphore.available_permits() as u64, &[])
+ })
+ .with_description(
+ "Available RAM in KiB to use for buffering data blocks to be written to remote nodes",
+ )
+ .init(),
+
resync_counter: meter
.u64_counter("block.resync_counter")
.with_description("Number of calls to resync_block")
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index 4ab44c22..af7df341 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -300,7 +300,11 @@ impl K2VRpcHandler {
let rs = RequestStrategy::with_priority(PRIO_NORMAL).without_timeout();
let mut requests = nodes
.iter()
- .map(|node| self.system.rpc.call(&self.endpoint, *node, msg.clone(), rs))
+ .map(|node| {
+ self.system
+ .rpc
+ .call(&self.endpoint, *node, msg.clone(), rs.clone())
+ })
.collect::<FuturesUnordered<_>>();
// Fetch responses. This procedure stops fetching responses when any of the following
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index c46e577f..a1b7951c 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -33,8 +33,7 @@ use crate::ring::Ring;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
/// Strategy to apply when making RPC
-#[derive(Copy, Clone)]
-pub struct RequestStrategy {
+pub struct RequestStrategy<T> {
/// Min number of response to consider the request successful
pub rs_quorum: Option<usize>,
/// Should requests be dropped after enough response are received
@@ -43,6 +42,8 @@ pub struct RequestStrategy {
pub rs_priority: RequestPriority,
/// Custom timeout for this request
rs_timeout: Timeout,
+ /// Data to drop when everything completes
+ rs_drop_on_complete: T,
}
#[derive(Copy, Clone)]
@@ -52,7 +53,19 @@ enum Timeout {
Custom(Duration),
}
-impl RequestStrategy {
+impl Clone for RequestStrategy<()> {
+ fn clone(&self) -> Self {
+ RequestStrategy {
+ rs_quorum: self.rs_quorum,
+ rs_interrupt_after_quorum: self.rs_interrupt_after_quorum,
+ rs_priority: self.rs_priority,
+ rs_timeout: self.rs_timeout,
+ rs_drop_on_complete: (),
+ }
+ }
+}
+
+impl RequestStrategy<()> {
/// Create a RequestStrategy with default timeout and not interrupting when quorum reached
pub fn with_priority(prio: RequestPriority) -> Self {
RequestStrategy {
@@ -60,8 +73,22 @@ impl RequestStrategy {
rs_interrupt_after_quorum: false,
rs_priority: prio,
rs_timeout: Timeout::Default,
+ rs_drop_on_complete: (),
+ }
+ }
+ /// Add an item to be dropped on completion
+ pub fn with_drop_on_completion<T>(self, drop_on_complete: T) -> RequestStrategy<T> {
+ RequestStrategy {
+ rs_quorum: self.rs_quorum,
+ rs_interrupt_after_quorum: self.rs_interrupt_after_quorum,
+ rs_priority: self.rs_priority,
+ rs_timeout: self.rs_timeout,
+ rs_drop_on_complete: drop_on_complete,
}
}
+}
+
+impl<T> RequestStrategy<T> {
/// Set quorum to be reached for request
pub fn with_quorum(mut self, quorum: usize) -> Self {
self.rs_quorum = Some(quorum);
@@ -83,6 +110,19 @@ impl RequestStrategy {
self.rs_timeout = Timeout::Custom(timeout);
self
}
+ /// Extract drop_on_complete item
+ fn extract_drop_on_complete(self) -> (RequestStrategy<()>, T) {
+ (
+ RequestStrategy {
+ rs_quorum: self.rs_quorum,
+ rs_interrupt_after_quorum: self.rs_interrupt_after_quorum,
+ rs_priority: self.rs_priority,
+ rs_timeout: self.rs_timeout,
+ rs_drop_on_complete: (),
+ },
+ self.rs_drop_on_complete,
+ )
+ }
}
#[derive(Clone)]
@@ -123,7 +163,7 @@ impl RpcHelper {
endpoint: &Endpoint<M, H>,
to: Uuid,
msg: N,
- strat: RequestStrategy,
+ strat: RequestStrategy<()>,
) -> Result<S, Error>
where
M: Rpc<Response = Result<S, Error>>,
@@ -176,7 +216,7 @@ impl RpcHelper {
endpoint: &Endpoint<M, H>,
to: &[Uuid],
msg: N,
- strat: RequestStrategy,
+ strat: RequestStrategy<()>,
) -> Result<Vec<(Uuid, Result<S, Error>)>, Error>
where
M: Rpc<Response = Result<S, Error>>,
@@ -187,7 +227,7 @@ impl RpcHelper {
let resps = join_all(
to.iter()
- .map(|to| self.call(endpoint, *to, msg.clone(), strat)),
+ .map(|to| self.call(endpoint, *to, msg.clone(), strat.clone())),
)
.await;
Ok(to
@@ -201,7 +241,7 @@ impl RpcHelper {
&self,
endpoint: &Endpoint<M, H>,
msg: N,
- strat: RequestStrategy,
+ strat: RequestStrategy<()>,
) -> Result<Vec<(Uuid, Result<S, Error>)>, Error>
where
M: Rpc<Response = Result<S, Error>>,
@@ -220,18 +260,19 @@ impl RpcHelper {
/// Make a RPC call to multiple servers, returning either a Vec of responses,
/// or an error if quorum could not be reached due to too many errors
- pub async fn try_call_many<M, N, H, S>(
+ pub async fn try_call_many<M, N, H, S, T>(
&self,
endpoint: &Arc<Endpoint<M, H>>,
to: &[Uuid],
msg: N,
- strategy: RequestStrategy,
+ strategy: RequestStrategy<T>,
) -> Result<Vec<S>, Error>
where
M: Rpc<Response = Result<S, Error>> + 'static,
N: IntoReq<M>,
H: StreamingEndpointHandler<M> + 'static,
S: Send + 'static,
+ T: Send + 'static,
{
let quorum = strategy.rs_quorum.unwrap_or(to.len());
@@ -260,12 +301,12 @@ impl RpcHelper {
.await
}
- async fn try_call_many_internal<M, N, H, S>(
+ async fn try_call_many_internal<M, N, H, S, T>(
&self,
endpoint: &Arc<Endpoint<M, H>>,
to: &[Uuid],
msg: N,
- strategy: RequestStrategy,
+ strategy: RequestStrategy<T>,
quorum: usize,
) -> Result<Vec<S>, Error>
where
@@ -273,9 +314,12 @@ impl RpcHelper {
N: IntoReq<M>,
H: StreamingEndpointHandler<M> + 'static,
S: Send + 'static,
+ T: Send + 'static,
{
let msg = msg.into_req().map_err(garage_net::error::Error::from)?;
+ let (strategy, drop_on_complete) = strategy.extract_drop_on_complete();
+
// Build future for each request
// They are not started now: they are added below in a FuturesUnordered
// object that will take care of polling them (see below)
@@ -283,6 +327,7 @@ impl RpcHelper {
let self2 = self.clone();
let msg = msg.clone();
let endpoint2 = endpoint.clone();
+ let strategy = strategy.clone();
(to, async move {
self2.call(&endpoint2, to, msg, strategy).await
})
@@ -377,6 +422,7 @@ impl RpcHelper {
// they have to be put in a proper queue that is persisted to disk.
tokio::spawn(async move {
resp_stream.collect::<Vec<Result<_, _>>>().await;
+ drop(drop_on_complete);
});
}
}
diff --git a/src/util/config.rs b/src/util/config.rs
index 8ecbdfbb..5372a1ec 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -52,6 +52,14 @@ pub struct Config {
)]
pub compression_level: Option<i32>,
+ /// Maximum amount of block data to buffer in RAM for sending to
+ /// remote nodes when these nodes are on slower links
+ #[serde(
+ deserialize_with = "deserialize_capacity",
+ default = "default_block_ram_buffer_max"
+ )]
+ pub block_ram_buffer_max: usize,
+
/// Skip the permission check of secret files. Useful when
/// POSIX ACLs (or more complex chmods) are used.
#[serde(default)]
@@ -255,6 +263,9 @@ fn default_sled_flush_every_ms() -> u64 {
fn default_block_size() -> usize {
1048576
}
+fn default_block_ram_buffer_max() -> usize {
+ 256 * 1024 * 1024
+}
fn default_compression() -> Option<i32> {
Some(1)