aboutsummaryrefslogtreecommitdiff
path: root/src/block
diff options
context:
space:
mode:
Diffstat (limited to 'src/block')
-rw-r--r--src/block/manager.rs17
-rw-r--r--src/block/metrics.rs15
2 files changed, 31 insertions, 1 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 34d854b9..2c7c7aba 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -1,3 +1,4 @@
+use std::convert::TryInto;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
@@ -10,7 +11,7 @@ use serde::{Deserialize, Serialize};
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
-use tokio::sync::{mpsc, Mutex, MutexGuard};
+use tokio::sync::{mpsc, Mutex, MutexGuard, Semaphore};
use opentelemetry::{
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
@@ -93,6 +94,7 @@ pub struct BlockManager {
pub(crate) system: Arc<System>,
pub(crate) endpoint: Arc<Endpoint<BlockRpc, Self>>,
+ buffer_kb_semaphore: Arc<Semaphore>,
pub(crate) metrics: BlockManagerMetrics,
@@ -152,11 +154,14 @@ impl BlockManager {
.netapp
.endpoint("garage_block/manager.rs/Rpc".to_string());
+ let buffer_kb_semaphore = Arc::new(Semaphore::new(config.block_ram_buffer_max / 1024));
+
let metrics = BlockManagerMetrics::new(
config.compression_level,
rc.rc.clone(),
resync.queue.clone(),
resync.errors.clone(),
+ buffer_kb_semaphore.clone(),
);
let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info");
@@ -176,6 +181,7 @@ impl BlockManager {
resync,
system,
endpoint,
+ buffer_kb_semaphore,
metrics,
scrub_persister,
tx_scrub_command: ArcSwapOption::new(None),
@@ -361,6 +367,14 @@ impl BlockManager {
let (header, bytes) = DataBlock::from_buffer(data, self.compression_level)
.await
.into_parts();
+
+ let permit = self
+ .buffer_kb_semaphore
+ .clone()
+ .acquire_many_owned((bytes.len() / 1024).try_into().unwrap())
+ .await
+ .ok_or_message("could not reserve space for buffer of data to send to remote nodes")?;
+
let put_block_rpc =
Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes);
let put_block_rpc = if let Some(tag) = order_tag {
@@ -376,6 +390,7 @@ impl BlockManager {
&who[..],
put_block_rpc,
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
+ .with_drop_on_completion(permit)
.with_quorum(self.replication.write_quorum()),
)
.await?;
diff --git a/src/block/metrics.rs b/src/block/metrics.rs
index 6659df32..c989f940 100644
--- a/src/block/metrics.rs
+++ b/src/block/metrics.rs
@@ -1,3 +1,7 @@
+use std::sync::Arc;
+
+use tokio::sync::Semaphore;
+
use opentelemetry::{global, metrics::*};
use garage_db as db;
@@ -9,6 +13,7 @@ pub struct BlockManagerMetrics {
pub(crate) _rc_size: ValueObserver<u64>,
pub(crate) _resync_queue_len: ValueObserver<u64>,
pub(crate) _resync_errored_blocks: ValueObserver<u64>,
+ pub(crate) _buffer_free_kb: ValueObserver<u64>,
pub(crate) resync_counter: BoundCounter<u64>,
pub(crate) resync_error_counter: BoundCounter<u64>,
@@ -31,6 +36,7 @@ impl BlockManagerMetrics {
rc_tree: db::Tree,
resync_queue: CountedTree,
resync_errors: CountedTree,
+ buffer_semaphore: Arc<Semaphore>,
) -> Self {
let meter = global::meter("garage_model/block");
Self {
@@ -66,6 +72,15 @@ impl BlockManagerMetrics {
.with_description("Number of block hashes whose last resync resulted in an error")
.init(),
+ _buffer_free_kb: meter
+ .u64_value_observer("block.ram_buffer_free_kb", move |observer| {
+ observer.observe(buffer_semaphore.available_permits() as u64, &[])
+ })
+ .with_description(
+ "Available RAM in KiB to use for buffering data blocks to be written to remote nodes",
+ )
+ .init(),
+
resync_counter: meter
.u64_counter("block.resync_counter")
.with_description("Number of calls to resync_block")