From 0d3e285d133459fd53e28f879a86c0de1a0c36df Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 27 Mar 2024 15:26:08 +0100 Subject: [fix-buffering] implement `block_ram_buffer_max` to avoid excessive RAM usage --- src/block/manager.rs | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) (limited to 'src/block/manager.rs') diff --git a/src/block/manager.rs b/src/block/manager.rs index 34d854b9..2c7c7aba 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -1,3 +1,4 @@ +use std::convert::TryInto; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; @@ -10,7 +11,7 @@ use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; -use tokio::sync::{mpsc, Mutex, MutexGuard}; +use tokio::sync::{mpsc, Mutex, MutexGuard, Semaphore}; use opentelemetry::{ trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer}, @@ -93,6 +94,7 @@ pub struct BlockManager { pub(crate) system: Arc, pub(crate) endpoint: Arc>, + buffer_kb_semaphore: Arc, pub(crate) metrics: BlockManagerMetrics, @@ -152,11 +154,14 @@ impl BlockManager { .netapp .endpoint("garage_block/manager.rs/Rpc".to_string()); + let buffer_kb_semaphore = Arc::new(Semaphore::new(config.block_ram_buffer_max / 1024)); + let metrics = BlockManagerMetrics::new( config.compression_level, rc.rc.clone(), resync.queue.clone(), resync.errors.clone(), + buffer_kb_semaphore.clone(), ); let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info"); @@ -176,6 +181,7 @@ impl BlockManager { resync, system, endpoint, + buffer_kb_semaphore, metrics, scrub_persister, tx_scrub_command: ArcSwapOption::new(None), @@ -361,6 +367,14 @@ impl BlockManager { let (header, bytes) = DataBlock::from_buffer(data, self.compression_level) .await .into_parts(); + + let permit = self + .buffer_kb_semaphore + .clone() + .acquire_many_owned((bytes.len() / 1024).try_into().unwrap()) + .await + .ok_or_message("could not reserve space for buffer of data to send to remote nodes")?; + let put_block_rpc = Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes); let put_block_rpc = if let Some(tag) = order_tag { @@ -376,6 +390,7 @@ impl BlockManager { &who[..], put_block_rpc, RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY) + .with_drop_on_completion(permit) .with_quorum(self.replication.write_quorum()), ) .await?; -- cgit v1.2.3 From 85f580cbde4913fe8382316ff3c27b8443c61dd7 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 27 Mar 2024 16:00:46 +0100 Subject: [fix-buffering] change request sending strategy and fix priorities remove LAS, priorize new requests but otherwise just do standard queuing --- src/block/manager.rs | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) (limited to 'src/block/manager.rs') diff --git a/src/block/manager.rs b/src/block/manager.rs index 2c7c7aba..62829a24 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -238,10 +238,16 @@ impl BlockManager { async fn rpc_get_raw_block_streaming( &self, hash: &Hash, + priority: RequestPriority, order_tag: Option, ) -> Result { - self.rpc_get_raw_block_internal(hash, order_tag, |stream| async move { Ok(stream) }) - .await + self.rpc_get_raw_block_internal( + hash, + priority, + order_tag, + |stream| async move { Ok(stream) }, + ) + .await } /// Ask nodes that might have a (possibly compressed) block for it @@ -249,9 +255,10 @@ impl BlockManager { pub(crate) async fn rpc_get_raw_block( &self, hash: &Hash, + priority: RequestPriority, order_tag: Option, ) -> Result { - self.rpc_get_raw_block_internal(hash, order_tag, |block_stream| async move { + self.rpc_get_raw_block_internal(hash, priority, order_tag, |block_stream| async move { let (header, stream) = block_stream.into_parts(); read_stream_to_end(stream) .await @@ -264,6 +271,7 @@ impl BlockManager { async fn rpc_get_raw_block_internal( &self, hash: &Hash, + priority: RequestPriority, order_tag: Option, f: F, ) -> Result @@ -279,7 +287,7 @@ impl BlockManager { let rpc = self.endpoint.call_streaming( &node_id, BlockRpc::GetBlock(*hash, order_tag), - PRIO_NORMAL | PRIO_SECONDARY, + priority, ); tokio::select! { res = rpc => { @@ -331,7 +339,9 @@ impl BlockManager { hash: &Hash, order_tag: Option, ) -> Result { - let block_stream = self.rpc_get_raw_block_streaming(hash, order_tag).await?; + let block_stream = self + .rpc_get_raw_block_streaming(hash, PRIO_NORMAL | PRIO_SECONDARY, order_tag) + .await?; let (header, stream) = block_stream.into_parts(); match header { DataBlockHeader::Plain => Ok(stream), -- cgit v1.2.3