diff options
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r-- | src/block/manager.rs | 17 |
1 files changed, 16 insertions, 1 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs index 34d854b9..2c7c7aba 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -1,3 +1,4 @@ +use std::convert::TryInto; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; @@ -10,7 +11,7 @@ use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; -use tokio::sync::{mpsc, Mutex, MutexGuard}; +use tokio::sync::{mpsc, Mutex, MutexGuard, Semaphore}; use opentelemetry::{ trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer}, @@ -93,6 +94,7 @@ pub struct BlockManager { pub(crate) system: Arc<System>, pub(crate) endpoint: Arc<Endpoint<BlockRpc, Self>>, + buffer_kb_semaphore: Arc<Semaphore>, pub(crate) metrics: BlockManagerMetrics, @@ -152,11 +154,14 @@ impl BlockManager { .netapp .endpoint("garage_block/manager.rs/Rpc".to_string()); + let buffer_kb_semaphore = Arc::new(Semaphore::new(config.block_ram_buffer_max / 1024)); + let metrics = BlockManagerMetrics::new( config.compression_level, rc.rc.clone(), resync.queue.clone(), resync.errors.clone(), + buffer_kb_semaphore.clone(), ); let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info"); @@ -176,6 +181,7 @@ impl BlockManager { resync, system, endpoint, + buffer_kb_semaphore, metrics, scrub_persister, tx_scrub_command: ArcSwapOption::new(None), @@ -361,6 +367,14 @@ impl BlockManager { let (header, bytes) = DataBlock::from_buffer(data, self.compression_level) .await .into_parts(); + + let permit = self + .buffer_kb_semaphore + .clone() + .acquire_many_owned((bytes.len() / 1024).try_into().unwrap()) + .await + .ok_or_message("could not reserve space for buffer of data to send to remote nodes")?; + let put_block_rpc = Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes); let put_block_rpc = if let Some(tag) = order_tag { @@ -376,6 +390,7 @@ impl BlockManager { &who[..], put_block_rpc, RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY) + .with_drop_on_completion(permit) .with_quorum(self.replication.write_quorum()), ) .await?; |