From 0d3e285d133459fd53e28f879a86c0de1a0c36df Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 27 Mar 2024 15:26:08 +0100 Subject: [fix-buffering] implement `block_ram_buffer_max` to avoid excessive RAM usage --- src/block/manager.rs | 17 ++++++++++++- src/block/metrics.rs | 15 ++++++++++++ src/model/k2v/rpc.rs | 6 ++++- src/rpc/rpc_helper.rs | 68 ++++++++++++++++++++++++++++++++++++++++++--------- src/util/config.rs | 11 +++++++++ 5 files changed, 104 insertions(+), 13 deletions(-) (limited to 'src') diff --git a/src/block/manager.rs b/src/block/manager.rs index 34d854b9..2c7c7aba 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -1,3 +1,4 @@ +use std::convert::TryInto; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; @@ -10,7 +11,7 @@ use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; -use tokio::sync::{mpsc, Mutex, MutexGuard}; +use tokio::sync::{mpsc, Mutex, MutexGuard, Semaphore}; use opentelemetry::{ trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer}, @@ -93,6 +94,7 @@ pub struct BlockManager { pub(crate) system: Arc, pub(crate) endpoint: Arc>, + buffer_kb_semaphore: Arc, pub(crate) metrics: BlockManagerMetrics, @@ -152,11 +154,14 @@ impl BlockManager { .netapp .endpoint("garage_block/manager.rs/Rpc".to_string()); + let buffer_kb_semaphore = Arc::new(Semaphore::new(config.block_ram_buffer_max / 1024)); + let metrics = BlockManagerMetrics::new( config.compression_level, rc.rc.clone(), resync.queue.clone(), resync.errors.clone(), + buffer_kb_semaphore.clone(), ); let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info"); @@ -176,6 +181,7 @@ impl BlockManager { resync, system, endpoint, + buffer_kb_semaphore, metrics, scrub_persister, tx_scrub_command: ArcSwapOption::new(None), @@ -361,6 +367,14 @@ impl BlockManager { let (header, bytes) = DataBlock::from_buffer(data, self.compression_level) .await .into_parts(); + + let permit = self + .buffer_kb_semaphore + .clone() + .acquire_many_owned((bytes.len() / 1024).try_into().unwrap()) + .await + .ok_or_message("could not reserve space for buffer of data to send to remote nodes")?; + let put_block_rpc = Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes); let put_block_rpc = if let Some(tag) = order_tag { @@ -376,6 +390,7 @@ impl BlockManager { &who[..], put_block_rpc, RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY) + .with_drop_on_completion(permit) .with_quorum(self.replication.write_quorum()), ) .await?; diff --git a/src/block/metrics.rs b/src/block/metrics.rs index 6659df32..c989f940 100644 --- a/src/block/metrics.rs +++ b/src/block/metrics.rs @@ -1,3 +1,7 @@ +use std::sync::Arc; + +use tokio::sync::Semaphore; + use opentelemetry::{global, metrics::*}; use garage_db as db; @@ -9,6 +13,7 @@ pub struct BlockManagerMetrics { pub(crate) _rc_size: ValueObserver, pub(crate) _resync_queue_len: ValueObserver, pub(crate) _resync_errored_blocks: ValueObserver, + pub(crate) _buffer_free_kb: ValueObserver, pub(crate) resync_counter: BoundCounter, pub(crate) resync_error_counter: BoundCounter, @@ -31,6 +36,7 @@ impl BlockManagerMetrics { rc_tree: db::Tree, resync_queue: CountedTree, resync_errors: CountedTree, + buffer_semaphore: Arc, ) -> Self { let meter = global::meter("garage_model/block"); Self { @@ -66,6 +72,15 @@ impl BlockManagerMetrics { .with_description("Number of block hashes whose last resync resulted in an error") .init(), + _buffer_free_kb: meter + .u64_value_observer("block.ram_buffer_free_kb", move |observer| { + observer.observe(buffer_semaphore.available_permits() as u64, &[]) + }) + .with_description( + "Available RAM in KiB to use for buffering data blocks to be written to remote nodes", + ) + .init(), + resync_counter: meter .u64_counter("block.resync_counter") .with_description("Number of calls to resync_block") diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index 4ab44c22..af7df341 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -300,7 +300,11 @@ impl K2VRpcHandler { let rs = RequestStrategy::with_priority(PRIO_NORMAL).without_timeout(); let mut requests = nodes .iter() - .map(|node| self.system.rpc.call(&self.endpoint, *node, msg.clone(), rs)) + .map(|node| { + self.system + .rpc + .call(&self.endpoint, *node, msg.clone(), rs.clone()) + }) .collect::>(); // Fetch responses. This procedure stops fetching responses when any of the following diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index c46e577f..a1b7951c 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -33,8 +33,7 @@ use crate::ring::Ring; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300); /// Strategy to apply when making RPC -#[derive(Copy, Clone)] -pub struct RequestStrategy { +pub struct RequestStrategy { /// Min number of response to consider the request successful pub rs_quorum: Option, /// Should requests be dropped after enough response are received @@ -43,6 +42,8 @@ pub struct RequestStrategy { pub rs_priority: RequestPriority, /// Custom timeout for this request rs_timeout: Timeout, + /// Data to drop when everything completes + rs_drop_on_complete: T, } #[derive(Copy, Clone)] @@ -52,7 +53,19 @@ enum Timeout { Custom(Duration), } -impl RequestStrategy { +impl Clone for RequestStrategy<()> { + fn clone(&self) -> Self { + RequestStrategy { + rs_quorum: self.rs_quorum, + rs_interrupt_after_quorum: self.rs_interrupt_after_quorum, + rs_priority: self.rs_priority, + rs_timeout: self.rs_timeout, + rs_drop_on_complete: (), + } + } +} + +impl RequestStrategy<()> { /// Create a RequestStrategy with default timeout and not interrupting when quorum reached pub fn with_priority(prio: RequestPriority) -> Self { RequestStrategy { @@ -60,8 +73,22 @@ impl RequestStrategy { rs_interrupt_after_quorum: false, rs_priority: prio, rs_timeout: Timeout::Default, + rs_drop_on_complete: (), + } + } + /// Add an item to be dropped on completion + pub fn with_drop_on_completion(self, drop_on_complete: T) -> RequestStrategy { + RequestStrategy { + rs_quorum: self.rs_quorum, + rs_interrupt_after_quorum: self.rs_interrupt_after_quorum, + rs_priority: self.rs_priority, + rs_timeout: self.rs_timeout, + rs_drop_on_complete: drop_on_complete, } } +} + +impl RequestStrategy { /// Set quorum to be reached for request pub fn with_quorum(mut self, quorum: usize) -> Self { self.rs_quorum = Some(quorum); @@ -83,6 +110,19 @@ impl RequestStrategy { self.rs_timeout = Timeout::Custom(timeout); self } + /// Extract drop_on_complete item + fn extract_drop_on_complete(self) -> (RequestStrategy<()>, T) { + ( + RequestStrategy { + rs_quorum: self.rs_quorum, + rs_interrupt_after_quorum: self.rs_interrupt_after_quorum, + rs_priority: self.rs_priority, + rs_timeout: self.rs_timeout, + rs_drop_on_complete: (), + }, + self.rs_drop_on_complete, + ) + } } #[derive(Clone)] @@ -123,7 +163,7 @@ impl RpcHelper { endpoint: &Endpoint, to: Uuid, msg: N, - strat: RequestStrategy, + strat: RequestStrategy<()>, ) -> Result where M: Rpc>, @@ -176,7 +216,7 @@ impl RpcHelper { endpoint: &Endpoint, to: &[Uuid], msg: N, - strat: RequestStrategy, + strat: RequestStrategy<()>, ) -> Result)>, Error> where M: Rpc>, @@ -187,7 +227,7 @@ impl RpcHelper { let resps = join_all( to.iter() - .map(|to| self.call(endpoint, *to, msg.clone(), strat)), + .map(|to| self.call(endpoint, *to, msg.clone(), strat.clone())), ) .await; Ok(to @@ -201,7 +241,7 @@ impl RpcHelper { &self, endpoint: &Endpoint, msg: N, - strat: RequestStrategy, + strat: RequestStrategy<()>, ) -> Result)>, Error> where M: Rpc>, @@ -220,18 +260,19 @@ impl RpcHelper { /// Make a RPC call to multiple servers, returning either a Vec of responses, /// or an error if quorum could not be reached due to too many errors - pub async fn try_call_many( + pub async fn try_call_many( &self, endpoint: &Arc>, to: &[Uuid], msg: N, - strategy: RequestStrategy, + strategy: RequestStrategy, ) -> Result, Error> where M: Rpc> + 'static, N: IntoReq, H: StreamingEndpointHandler + 'static, S: Send + 'static, + T: Send + 'static, { let quorum = strategy.rs_quorum.unwrap_or(to.len()); @@ -260,12 +301,12 @@ impl RpcHelper { .await } - async fn try_call_many_internal( + async fn try_call_many_internal( &self, endpoint: &Arc>, to: &[Uuid], msg: N, - strategy: RequestStrategy, + strategy: RequestStrategy, quorum: usize, ) -> Result, Error> where @@ -273,9 +314,12 @@ impl RpcHelper { N: IntoReq, H: StreamingEndpointHandler + 'static, S: Send + 'static, + T: Send + 'static, { let msg = msg.into_req().map_err(garage_net::error::Error::from)?; + let (strategy, drop_on_complete) = strategy.extract_drop_on_complete(); + // Build future for each request // They are not started now: they are added below in a FuturesUnordered // object that will take care of polling them (see below) @@ -283,6 +327,7 @@ impl RpcHelper { let self2 = self.clone(); let msg = msg.clone(); let endpoint2 = endpoint.clone(); + let strategy = strategy.clone(); (to, async move { self2.call(&endpoint2, to, msg, strategy).await }) @@ -377,6 +422,7 @@ impl RpcHelper { // they have to be put in a proper queue that is persisted to disk. tokio::spawn(async move { resp_stream.collect::>>().await; + drop(drop_on_complete); }); } } diff --git a/src/util/config.rs b/src/util/config.rs index 8ecbdfbb..5372a1ec 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -52,6 +52,14 @@ pub struct Config { )] pub compression_level: Option, + /// Maximum amount of block data to buffer in RAM for sending to + /// remote nodes when these nodes are on slower links + #[serde( + deserialize_with = "deserialize_capacity", + default = "default_block_ram_buffer_max" + )] + pub block_ram_buffer_max: usize, + /// Skip the permission check of secret files. Useful when /// POSIX ACLs (or more complex chmods) are used. #[serde(default)] @@ -255,6 +263,9 @@ fn default_sled_flush_every_ms() -> u64 { fn default_block_size() -> usize { 1048576 } +fn default_block_ram_buffer_max() -> usize { + 256 * 1024 * 1024 +} fn default_compression() -> Option { Some(1) -- cgit v1.2.3