aboutsummaryrefslogtreecommitdiff
path: root/src/block
diff options
context:
space:
mode:
Diffstat (limited to 'src/block')
-rw-r--r--src/block/manager.rs37
-rw-r--r--src/block/metrics.rs15
-rw-r--r--src/block/resync.rs6
3 files changed, 50 insertions, 8 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 82db2cab..40b177a2 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -1,3 +1,4 @@
+use std::convert::TryInto;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
@@ -10,7 +11,7 @@ use serde::{Deserialize, Serialize};
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
-use tokio::sync::{mpsc, Mutex, MutexGuard};
+use tokio::sync::{mpsc, Mutex, MutexGuard, Semaphore};
use opentelemetry::{
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
@@ -93,6 +94,7 @@ pub struct BlockManager {
pub(crate) system: Arc<System>,
pub(crate) endpoint: Arc<Endpoint<BlockRpc, Self>>,
+ buffer_kb_semaphore: Arc<Semaphore>,
pub(crate) metrics: BlockManagerMetrics,
@@ -152,11 +154,14 @@ impl BlockManager {
.netapp
.endpoint("garage_block/manager.rs/Rpc".to_string());
+ let buffer_kb_semaphore = Arc::new(Semaphore::new(config.block_ram_buffer_max / 1024));
+
let metrics = BlockManagerMetrics::new(
config.compression_level,
rc.rc_table.clone(),
resync.queue.clone(),
resync.errors.clone(),
+ buffer_kb_semaphore.clone(),
);
let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info");
@@ -176,6 +181,7 @@ impl BlockManager {
resync,
system,
endpoint,
+ buffer_kb_semaphore,
metrics,
scrub_persister,
tx_scrub_command: ArcSwapOption::new(None),
@@ -238,10 +244,16 @@ impl BlockManager {
async fn rpc_get_raw_block_streaming(
&self,
hash: &Hash,
+ priority: RequestPriority,
order_tag: Option<OrderTag>,
) -> Result<DataBlockStream, Error> {
- self.rpc_get_raw_block_internal(hash, order_tag, |stream| async move { Ok(stream) })
- .await
+ self.rpc_get_raw_block_internal(
+ hash,
+ priority,
+ order_tag,
+ |stream| async move { Ok(stream) },
+ )
+ .await
}
/// Ask nodes that might have a (possibly compressed) block for it
@@ -249,9 +261,10 @@ impl BlockManager {
pub(crate) async fn rpc_get_raw_block(
&self,
hash: &Hash,
+ priority: RequestPriority,
order_tag: Option<OrderTag>,
) -> Result<DataBlock, Error> {
- self.rpc_get_raw_block_internal(hash, order_tag, |block_stream| async move {
+ self.rpc_get_raw_block_internal(hash, priority, order_tag, |block_stream| async move {
let (header, stream) = block_stream.into_parts();
read_stream_to_end(stream)
.await
@@ -264,6 +277,7 @@ impl BlockManager {
async fn rpc_get_raw_block_internal<F, Fut, T>(
&self,
hash: &Hash,
+ priority: RequestPriority,
order_tag: Option<OrderTag>,
f: F,
) -> Result<T, Error>
@@ -281,7 +295,7 @@ impl BlockManager {
let rpc = self.endpoint.call_streaming(
&node_id,
BlockRpc::GetBlock(*hash, order_tag),
- PRIO_NORMAL | PRIO_SECONDARY,
+ priority,
);
tokio::select! {
res = rpc => {
@@ -333,7 +347,9 @@ impl BlockManager {
hash: &Hash,
order_tag: Option<OrderTag>,
) -> Result<ByteStream, Error> {
- let block_stream = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
+ let block_stream = self
+ .rpc_get_raw_block_streaming(hash, PRIO_NORMAL | PRIO_SECONDARY, order_tag)
+ .await?;
let (header, stream) = block_stream.into_parts();
match header {
DataBlockHeader::Plain => Ok(stream),
@@ -361,6 +377,14 @@ impl BlockManager {
let (header, bytes) = DataBlock::from_buffer(data, compression_level)
.await
.into_parts();
+
+ let permit = self
+ .buffer_kb_semaphore
+ .clone()
+ .acquire_many_owned((bytes.len() / 1024).try_into().unwrap())
+ .await
+ .ok_or_message("could not reserve space for buffer of data to send to remote nodes")?;
+
let put_block_rpc =
Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes);
let put_block_rpc = if let Some(tag) = order_tag {
@@ -376,6 +400,7 @@ impl BlockManager {
who.as_ref(),
put_block_rpc,
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
+ .with_drop_on_completion(permit)
.with_quorum(self.replication.write_quorum()),
)
.await?;
diff --git a/src/block/metrics.rs b/src/block/metrics.rs
index 8e10afdf..2d41e365 100644
--- a/src/block/metrics.rs
+++ b/src/block/metrics.rs
@@ -1,3 +1,7 @@
+use std::sync::Arc;
+
+use tokio::sync::Semaphore;
+
use opentelemetry::{global, metrics::*};
use garage_db as db;
@@ -8,6 +12,7 @@ pub struct BlockManagerMetrics {
pub(crate) _rc_size: ValueObserver<u64>,
pub(crate) _resync_queue_len: ValueObserver<u64>,
pub(crate) _resync_errored_blocks: ValueObserver<u64>,
+ pub(crate) _buffer_free_kb: ValueObserver<u64>,
pub(crate) resync_counter: BoundCounter<u64>,
pub(crate) resync_error_counter: BoundCounter<u64>,
@@ -30,6 +35,7 @@ impl BlockManagerMetrics {
rc_tree: db::Tree,
resync_queue: db::Tree,
resync_errors: db::Tree,
+ buffer_semaphore: Arc<Semaphore>,
) -> Self {
let meter = global::meter("garage_model/block");
Self {
@@ -69,6 +75,15 @@ impl BlockManagerMetrics {
.with_description("Number of block hashes whose last resync resulted in an error")
.init(),
+ _buffer_free_kb: meter
+ .u64_value_observer("block.ram_buffer_free_kb", move |observer| {
+ observer.observe(buffer_semaphore.available_permits() as u64, &[])
+ })
+ .with_description(
+ "Available RAM in KiB to use for buffering data blocks to be written to remote nodes",
+ )
+ .init(),
+
resync_counter: meter
.u64_counter("block.resync_counter")
.with_description("Number of calls to resync_block")
diff --git a/src/block/resync.rs b/src/block/resync.rs
index b4108213..ab4604ad 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -436,7 +436,7 @@ impl BlockResyncManager {
&manager.endpoint,
&need_nodes,
put_block_message,
- RequestStrategy::with_priority(PRIO_BACKGROUND)
+ RequestStrategy::with_priority(PRIO_BACKGROUND | PRIO_SECONDARY)
.with_quorum(need_nodes.len()),
)
.await
@@ -460,7 +460,9 @@ impl BlockResyncManager {
hash
);
- let block_data = manager.rpc_get_raw_block(hash, None).await;
+ let block_data = manager
+ .rpc_get_raw_block(hash, PRIO_BACKGROUND | PRIO_SECONDARY, None)
+ .await;
if matches!(block_data, Err(Error::MissingBlock(_))) {
warn!(
"Could not fetch needed block {:?}, no node returned valid data. Checking that refcount is correct.",