aboutsummaryrefslogtreecommitdiff
path: root/src/block/manager.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r--src/block/manager.rs17
1 files changed, 16 insertions, 1 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 34d854b9..2c7c7aba 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -1,3 +1,4 @@
+use std::convert::TryInto;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
@@ -10,7 +11,7 @@ use serde::{Deserialize, Serialize};
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
-use tokio::sync::{mpsc, Mutex, MutexGuard};
+use tokio::sync::{mpsc, Mutex, MutexGuard, Semaphore};
use opentelemetry::{
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
@@ -93,6 +94,7 @@ pub struct BlockManager {
pub(crate) system: Arc<System>,
pub(crate) endpoint: Arc<Endpoint<BlockRpc, Self>>,
+ buffer_kb_semaphore: Arc<Semaphore>,
pub(crate) metrics: BlockManagerMetrics,
@@ -152,11 +154,14 @@ impl BlockManager {
.netapp
.endpoint("garage_block/manager.rs/Rpc".to_string());
+ let buffer_kb_semaphore = Arc::new(Semaphore::new(config.block_ram_buffer_max / 1024));
+
let metrics = BlockManagerMetrics::new(
config.compression_level,
rc.rc.clone(),
resync.queue.clone(),
resync.errors.clone(),
+ buffer_kb_semaphore.clone(),
);
let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info");
@@ -176,6 +181,7 @@ impl BlockManager {
resync,
system,
endpoint,
+ buffer_kb_semaphore,
metrics,
scrub_persister,
tx_scrub_command: ArcSwapOption::new(None),
@@ -361,6 +367,14 @@ impl BlockManager {
let (header, bytes) = DataBlock::from_buffer(data, self.compression_level)
.await
.into_parts();
+
+ let permit = self
+ .buffer_kb_semaphore
+ .clone()
+ .acquire_many_owned((bytes.len() / 1024).try_into().unwrap())
+ .await
+ .ok_or_message("could not reserve space for buffer of data to send to remote nodes")?;
+
let put_block_rpc =
Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes);
let put_block_rpc = if let Some(tag) = order_tag {
@@ -376,6 +390,7 @@ impl BlockManager {
&who[..],
put_block_rpc,
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
+ .with_drop_on_completion(permit)
.with_quorum(self.replication.write_quorum()),
)
.await?;