diff options
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r-- | src/block/manager.rs | 37 |
1 files changed, 31 insertions, 6 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs index 82db2cab..40b177a2 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -1,3 +1,4 @@ +use std::convert::TryInto; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; @@ -10,7 +11,7 @@ use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; -use tokio::sync::{mpsc, Mutex, MutexGuard}; +use tokio::sync::{mpsc, Mutex, MutexGuard, Semaphore}; use opentelemetry::{ trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer}, @@ -93,6 +94,7 @@ pub struct BlockManager { pub(crate) system: Arc<System>, pub(crate) endpoint: Arc<Endpoint<BlockRpc, Self>>, + buffer_kb_semaphore: Arc<Semaphore>, pub(crate) metrics: BlockManagerMetrics, @@ -152,11 +154,14 @@ impl BlockManager { .netapp .endpoint("garage_block/manager.rs/Rpc".to_string()); + let buffer_kb_semaphore = Arc::new(Semaphore::new(config.block_ram_buffer_max / 1024)); + let metrics = BlockManagerMetrics::new( config.compression_level, rc.rc_table.clone(), resync.queue.clone(), resync.errors.clone(), + buffer_kb_semaphore.clone(), ); let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info"); @@ -176,6 +181,7 @@ impl BlockManager { resync, system, endpoint, + buffer_kb_semaphore, metrics, scrub_persister, tx_scrub_command: ArcSwapOption::new(None), @@ -238,10 +244,16 @@ impl BlockManager { async fn rpc_get_raw_block_streaming( &self, hash: &Hash, + priority: RequestPriority, order_tag: Option<OrderTag>, ) -> Result<DataBlockStream, Error> { - self.rpc_get_raw_block_internal(hash, order_tag, |stream| async move { Ok(stream) }) - .await + self.rpc_get_raw_block_internal( + hash, + priority, + order_tag, + |stream| async move { Ok(stream) }, + ) + .await } /// Ask nodes that might have a (possibly compressed) block for it @@ -249,9 +261,10 @@ impl BlockManager { pub(crate) async fn rpc_get_raw_block( &self, hash: &Hash, + priority: RequestPriority, order_tag: Option<OrderTag>, ) -> Result<DataBlock, Error> { - self.rpc_get_raw_block_internal(hash, order_tag, |block_stream| async move { + self.rpc_get_raw_block_internal(hash, priority, order_tag, |block_stream| async move { let (header, stream) = block_stream.into_parts(); read_stream_to_end(stream) .await @@ -264,6 +277,7 @@ impl BlockManager { async fn rpc_get_raw_block_internal<F, Fut, T>( &self, hash: &Hash, + priority: RequestPriority, order_tag: Option<OrderTag>, f: F, ) -> Result<T, Error> @@ -281,7 +295,7 @@ impl BlockManager { let rpc = self.endpoint.call_streaming( &node_id, BlockRpc::GetBlock(*hash, order_tag), - PRIO_NORMAL | PRIO_SECONDARY, + priority, ); tokio::select! { res = rpc => { @@ -333,7 +347,9 @@ impl BlockManager { hash: &Hash, order_tag: Option<OrderTag>, ) -> Result<ByteStream, Error> { - let block_stream = self.rpc_get_raw_block_streaming(hash, order_tag).await?; + let block_stream = self + .rpc_get_raw_block_streaming(hash, PRIO_NORMAL | PRIO_SECONDARY, order_tag) + .await?; let (header, stream) = block_stream.into_parts(); match header { DataBlockHeader::Plain => Ok(stream), @@ -361,6 +377,14 @@ impl BlockManager { let (header, bytes) = DataBlock::from_buffer(data, compression_level) .await .into_parts(); + + let permit = self + .buffer_kb_semaphore + .clone() + .acquire_many_owned((bytes.len() / 1024).try_into().unwrap()) + .await + .ok_or_message("could not reserve space for buffer of data to send to remote nodes")?; + let put_block_rpc = Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes); let put_block_rpc = if let Some(tag) = order_tag { @@ -376,6 +400,7 @@ impl BlockManager { who.as_ref(), put_block_rpc, RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY) + .with_drop_on_completion(permit) .with_quorum(self.replication.write_quorum()), ) .await?; |