diff options
Diffstat (limited to 'src/block')
-rw-r--r-- | src/block/manager.rs | 17 | ||||
-rw-r--r-- | src/block/metrics.rs | 15 |
2 files changed, 31 insertions, 1 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs index 34d854b9..2c7c7aba 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -1,3 +1,4 @@ +use std::convert::TryInto; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; @@ -10,7 +11,7 @@ use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; -use tokio::sync::{mpsc, Mutex, MutexGuard}; +use tokio::sync::{mpsc, Mutex, MutexGuard, Semaphore}; use opentelemetry::{ trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer}, @@ -93,6 +94,7 @@ pub struct BlockManager { pub(crate) system: Arc<System>, pub(crate) endpoint: Arc<Endpoint<BlockRpc, Self>>, + buffer_kb_semaphore: Arc<Semaphore>, pub(crate) metrics: BlockManagerMetrics, @@ -152,11 +154,14 @@ impl BlockManager { .netapp .endpoint("garage_block/manager.rs/Rpc".to_string()); + let buffer_kb_semaphore = Arc::new(Semaphore::new(config.block_ram_buffer_max / 1024)); + let metrics = BlockManagerMetrics::new( config.compression_level, rc.rc.clone(), resync.queue.clone(), resync.errors.clone(), + buffer_kb_semaphore.clone(), ); let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info"); @@ -176,6 +181,7 @@ impl BlockManager { resync, system, endpoint, + buffer_kb_semaphore, metrics, scrub_persister, tx_scrub_command: ArcSwapOption::new(None), @@ -361,6 +367,14 @@ impl BlockManager { let (header, bytes) = DataBlock::from_buffer(data, self.compression_level) .await .into_parts(); + + let permit = self + .buffer_kb_semaphore + .clone() + .acquire_many_owned((bytes.len() / 1024).try_into().unwrap()) + .await + .ok_or_message("could not reserve space for buffer of data to send to remote nodes")?; + let put_block_rpc = Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes); let put_block_rpc = if let Some(tag) = order_tag { @@ -376,6 +390,7 @@ impl BlockManager { &who[..], put_block_rpc, RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY) + .with_drop_on_completion(permit) .with_quorum(self.replication.write_quorum()), ) .await?; diff --git a/src/block/metrics.rs b/src/block/metrics.rs index 6659df32..c989f940 100644 --- a/src/block/metrics.rs +++ b/src/block/metrics.rs @@ -1,3 +1,7 @@ +use std::sync::Arc; + +use tokio::sync::Semaphore; + use opentelemetry::{global, metrics::*}; use garage_db as db; @@ -9,6 +13,7 @@ pub struct BlockManagerMetrics { pub(crate) _rc_size: ValueObserver<u64>, pub(crate) _resync_queue_len: ValueObserver<u64>, pub(crate) _resync_errored_blocks: ValueObserver<u64>, + pub(crate) _buffer_free_kb: ValueObserver<u64>, pub(crate) resync_counter: BoundCounter<u64>, pub(crate) resync_error_counter: BoundCounter<u64>, @@ -31,6 +36,7 @@ impl BlockManagerMetrics { rc_tree: db::Tree, resync_queue: CountedTree, resync_errors: CountedTree, + buffer_semaphore: Arc<Semaphore>, ) -> Self { let meter = global::meter("garage_model/block"); Self { @@ -66,6 +72,15 @@ impl BlockManagerMetrics { .with_description("Number of block hashes whose last resync resulted in an error") .init(), + _buffer_free_kb: meter + .u64_value_observer("block.ram_buffer_free_kb", move |observer| { + observer.observe(buffer_semaphore.available_permits() as u64, &[]) + }) + .with_description( + "Available RAM in KiB to use for buffering data blocks to be written to remote nodes", + ) + .init(), + resync_counter: meter .u64_counter("block.resync_counter") .with_description("Number of calls to resync_block") |