aboutsummaryrefslogtreecommitdiff
path: root/src/block/manager.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/block/manager.rs')
-rw-r--r--src/block/manager.rs37
1 files changed, 31 insertions, 6 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 82db2cab..40b177a2 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -1,3 +1,4 @@
+use std::convert::TryInto;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
@@ -10,7 +11,7 @@ use serde::{Deserialize, Serialize};
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
-use tokio::sync::{mpsc, Mutex, MutexGuard};
+use tokio::sync::{mpsc, Mutex, MutexGuard, Semaphore};
use opentelemetry::{
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
@@ -93,6 +94,7 @@ pub struct BlockManager {
pub(crate) system: Arc<System>,
pub(crate) endpoint: Arc<Endpoint<BlockRpc, Self>>,
+ buffer_kb_semaphore: Arc<Semaphore>,
pub(crate) metrics: BlockManagerMetrics,
@@ -152,11 +154,14 @@ impl BlockManager {
.netapp
.endpoint("garage_block/manager.rs/Rpc".to_string());
+ let buffer_kb_semaphore = Arc::new(Semaphore::new(config.block_ram_buffer_max / 1024));
+
let metrics = BlockManagerMetrics::new(
config.compression_level,
rc.rc_table.clone(),
resync.queue.clone(),
resync.errors.clone(),
+ buffer_kb_semaphore.clone(),
);
let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info");
@@ -176,6 +181,7 @@ impl BlockManager {
resync,
system,
endpoint,
+ buffer_kb_semaphore,
metrics,
scrub_persister,
tx_scrub_command: ArcSwapOption::new(None),
@@ -238,10 +244,16 @@ impl BlockManager {
async fn rpc_get_raw_block_streaming(
&self,
hash: &Hash,
+ priority: RequestPriority,
order_tag: Option<OrderTag>,
) -> Result<DataBlockStream, Error> {
- self.rpc_get_raw_block_internal(hash, order_tag, |stream| async move { Ok(stream) })
- .await
+ self.rpc_get_raw_block_internal(
+ hash,
+ priority,
+ order_tag,
+ |stream| async move { Ok(stream) },
+ )
+ .await
}
/// Ask nodes that might have a (possibly compressed) block for it
@@ -249,9 +261,10 @@ impl BlockManager {
pub(crate) async fn rpc_get_raw_block(
&self,
hash: &Hash,
+ priority: RequestPriority,
order_tag: Option<OrderTag>,
) -> Result<DataBlock, Error> {
- self.rpc_get_raw_block_internal(hash, order_tag, |block_stream| async move {
+ self.rpc_get_raw_block_internal(hash, priority, order_tag, |block_stream| async move {
let (header, stream) = block_stream.into_parts();
read_stream_to_end(stream)
.await
@@ -264,6 +277,7 @@ impl BlockManager {
async fn rpc_get_raw_block_internal<F, Fut, T>(
&self,
hash: &Hash,
+ priority: RequestPriority,
order_tag: Option<OrderTag>,
f: F,
) -> Result<T, Error>
@@ -281,7 +295,7 @@ impl BlockManager {
let rpc = self.endpoint.call_streaming(
&node_id,
BlockRpc::GetBlock(*hash, order_tag),
- PRIO_NORMAL | PRIO_SECONDARY,
+ priority,
);
tokio::select! {
res = rpc => {
@@ -333,7 +347,9 @@ impl BlockManager {
hash: &Hash,
order_tag: Option<OrderTag>,
) -> Result<ByteStream, Error> {
- let block_stream = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
+ let block_stream = self
+ .rpc_get_raw_block_streaming(hash, PRIO_NORMAL | PRIO_SECONDARY, order_tag)
+ .await?;
let (header, stream) = block_stream.into_parts();
match header {
DataBlockHeader::Plain => Ok(stream),
@@ -361,6 +377,14 @@ impl BlockManager {
let (header, bytes) = DataBlock::from_buffer(data, compression_level)
.await
.into_parts();
+
+ let permit = self
+ .buffer_kb_semaphore
+ .clone()
+ .acquire_many_owned((bytes.len() / 1024).try_into().unwrap())
+ .await
+ .ok_or_message("could not reserve space for buffer of data to send to remote nodes")?;
+
let put_block_rpc =
Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes);
let put_block_rpc = if let Some(tag) = order_tag {
@@ -376,6 +400,7 @@ impl BlockManager {
who.as_ref(),
put_block_rpc,
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
+ .with_drop_on_completion(permit)
.with_quorum(self.replication.write_quorum()),
)
.await?;