aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2024-03-28 15:01:05 +0100
committerAlex Auvolat <alex@adnab.me>2024-03-28 15:01:05 +0100
commit8bfc16ba7d5e0c2806aa32e0257fbdc21cb93860 (patch)
tree49d6c32376708147e90ba64ea32cea7835e751c1 /src
parent25c196f34d958f4f61d50c89a1c5d40b96d7cd24 (diff)
parentecf641d88c264f7278d13a6d988288feb24a5dfe (diff)
downloadgarage-8bfc16ba7d5e0c2806aa32e0257fbdc21cb93860.tar.gz
garage-8bfc16ba7d5e0c2806aa32e0257fbdc21cb93860.zip
Merge branch 'main' into next-0.10
Diffstat (limited to 'src')
-rw-r--r--src/block/manager.rs37
-rw-r--r--src/block/metrics.rs15
-rw-r--r--src/block/resync.rs6
-rw-r--r--src/model/k2v/rpc.rs2
-rw-r--r--src/net/message.rs18
-rw-r--r--src/net/send.rs85
-rw-r--r--src/net/server.rs2
-rw-r--r--src/rpc/rpc_helper.rs73
-rw-r--r--src/util/config.rs11
9 files changed, 186 insertions, 63 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 82db2cab..40b177a2 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -1,3 +1,4 @@
+use std::convert::TryInto;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
@@ -10,7 +11,7 @@ use serde::{Deserialize, Serialize};
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
-use tokio::sync::{mpsc, Mutex, MutexGuard};
+use tokio::sync::{mpsc, Mutex, MutexGuard, Semaphore};
use opentelemetry::{
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
@@ -93,6 +94,7 @@ pub struct BlockManager {
pub(crate) system: Arc<System>,
pub(crate) endpoint: Arc<Endpoint<BlockRpc, Self>>,
+ buffer_kb_semaphore: Arc<Semaphore>,
pub(crate) metrics: BlockManagerMetrics,
@@ -152,11 +154,14 @@ impl BlockManager {
.netapp
.endpoint("garage_block/manager.rs/Rpc".to_string());
+ let buffer_kb_semaphore = Arc::new(Semaphore::new(config.block_ram_buffer_max / 1024));
+
let metrics = BlockManagerMetrics::new(
config.compression_level,
rc.rc_table.clone(),
resync.queue.clone(),
resync.errors.clone(),
+ buffer_kb_semaphore.clone(),
);
let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info");
@@ -176,6 +181,7 @@ impl BlockManager {
resync,
system,
endpoint,
+ buffer_kb_semaphore,
metrics,
scrub_persister,
tx_scrub_command: ArcSwapOption::new(None),
@@ -238,10 +244,16 @@ impl BlockManager {
async fn rpc_get_raw_block_streaming(
&self,
hash: &Hash,
+ priority: RequestPriority,
order_tag: Option<OrderTag>,
) -> Result<DataBlockStream, Error> {
- self.rpc_get_raw_block_internal(hash, order_tag, |stream| async move { Ok(stream) })
- .await
+ self.rpc_get_raw_block_internal(
+ hash,
+ priority,
+ order_tag,
+ |stream| async move { Ok(stream) },
+ )
+ .await
}
/// Ask nodes that might have a (possibly compressed) block for it
@@ -249,9 +261,10 @@ impl BlockManager {
pub(crate) async fn rpc_get_raw_block(
&self,
hash: &Hash,
+ priority: RequestPriority,
order_tag: Option<OrderTag>,
) -> Result<DataBlock, Error> {
- self.rpc_get_raw_block_internal(hash, order_tag, |block_stream| async move {
+ self.rpc_get_raw_block_internal(hash, priority, order_tag, |block_stream| async move {
let (header, stream) = block_stream.into_parts();
read_stream_to_end(stream)
.await
@@ -264,6 +277,7 @@ impl BlockManager {
async fn rpc_get_raw_block_internal<F, Fut, T>(
&self,
hash: &Hash,
+ priority: RequestPriority,
order_tag: Option<OrderTag>,
f: F,
) -> Result<T, Error>
@@ -281,7 +295,7 @@ impl BlockManager {
let rpc = self.endpoint.call_streaming(
&node_id,
BlockRpc::GetBlock(*hash, order_tag),
- PRIO_NORMAL | PRIO_SECONDARY,
+ priority,
);
tokio::select! {
res = rpc => {
@@ -333,7 +347,9 @@ impl BlockManager {
hash: &Hash,
order_tag: Option<OrderTag>,
) -> Result<ByteStream, Error> {
- let block_stream = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
+ let block_stream = self
+ .rpc_get_raw_block_streaming(hash, PRIO_NORMAL | PRIO_SECONDARY, order_tag)
+ .await?;
let (header, stream) = block_stream.into_parts();
match header {
DataBlockHeader::Plain => Ok(stream),
@@ -361,6 +377,14 @@ impl BlockManager {
let (header, bytes) = DataBlock::from_buffer(data, compression_level)
.await
.into_parts();
+
+ let permit = self
+ .buffer_kb_semaphore
+ .clone()
+ .acquire_many_owned((bytes.len() / 1024).try_into().unwrap())
+ .await
+ .ok_or_message("could not reserve space for buffer of data to send to remote nodes")?;
+
let put_block_rpc =
Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes);
let put_block_rpc = if let Some(tag) = order_tag {
@@ -376,6 +400,7 @@ impl BlockManager {
who.as_ref(),
put_block_rpc,
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
+ .with_drop_on_completion(permit)
.with_quorum(self.replication.write_quorum()),
)
.await?;
diff --git a/src/block/metrics.rs b/src/block/metrics.rs
index 8e10afdf..2d41e365 100644
--- a/src/block/metrics.rs
+++ b/src/block/metrics.rs
@@ -1,3 +1,7 @@
+use std::sync::Arc;
+
+use tokio::sync::Semaphore;
+
use opentelemetry::{global, metrics::*};
use garage_db as db;
@@ -8,6 +12,7 @@ pub struct BlockManagerMetrics {
pub(crate) _rc_size: ValueObserver<u64>,
pub(crate) _resync_queue_len: ValueObserver<u64>,
pub(crate) _resync_errored_blocks: ValueObserver<u64>,
+ pub(crate) _buffer_free_kb: ValueObserver<u64>,
pub(crate) resync_counter: BoundCounter<u64>,
pub(crate) resync_error_counter: BoundCounter<u64>,
@@ -30,6 +35,7 @@ impl BlockManagerMetrics {
rc_tree: db::Tree,
resync_queue: db::Tree,
resync_errors: db::Tree,
+ buffer_semaphore: Arc<Semaphore>,
) -> Self {
let meter = global::meter("garage_model/block");
Self {
@@ -69,6 +75,15 @@ impl BlockManagerMetrics {
.with_description("Number of block hashes whose last resync resulted in an error")
.init(),
+ _buffer_free_kb: meter
+ .u64_value_observer("block.ram_buffer_free_kb", move |observer| {
+ observer.observe(buffer_semaphore.available_permits() as u64, &[])
+ })
+ .with_description(
+ "Available RAM in KiB to use for buffering data blocks to be written to remote nodes",
+ )
+ .init(),
+
resync_counter: meter
.u64_counter("block.resync_counter")
.with_description("Number of calls to resync_block")
diff --git a/src/block/resync.rs b/src/block/resync.rs
index b4108213..ab4604ad 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -436,7 +436,7 @@ impl BlockResyncManager {
&manager.endpoint,
&need_nodes,
put_block_message,
- RequestStrategy::with_priority(PRIO_BACKGROUND)
+ RequestStrategy::with_priority(PRIO_BACKGROUND | PRIO_SECONDARY)
.with_quorum(need_nodes.len()),
)
.await
@@ -460,7 +460,9 @@ impl BlockResyncManager {
hash
);
- let block_data = manager.rpc_get_raw_block(hash, None).await;
+ let block_data = manager
+ .rpc_get_raw_block(hash, PRIO_BACKGROUND | PRIO_SECONDARY, None)
+ .await;
if matches!(block_data, Err(Error::MissingBlock(_))) {
warn!(
"Could not fetch needed block {:?}, no node returned valid data. Checking that refcount is correct.",
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index 95ff2d18..4d7186a7 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -300,7 +300,7 @@ impl K2VRpcHandler {
.map(|node| {
self.system
.rpc_helper()
- .call(&self.endpoint, *node, msg.clone(), rs)
+ .call(&self.endpoint, *node, msg.clone(), rs.clone())
})
.collect::<FuturesUnordered<_>>();
diff --git a/src/net/message.rs b/src/net/message.rs
index b0d255c6..af98ca12 100644
--- a/src/net/message.rs
+++ b/src/net/message.rs
@@ -28,12 +28,30 @@ use crate::util::*;
/// The same priority value is given to a request and to its associated response.
pub type RequestPriority = u8;
+// Usage of priority levels in Garage:
+//
+// PRIO_HIGH
+// for liveness check events such as pings and important
+// reconfiguration events such as layout changes
+//
+// PRIO_NORMAL
+// for standard interactive requests to exchange metadata
+//
+// PRIO_NORMAL | PRIO_SECONDARY
+// for standard interactive requests to exchange block data
+//
+// PRIO_BACKGROUND
+// for background resync requests to exchange metadata
+// PRIO_BACKGROUND | PRIO_SECONDARY
+// for background resync requests to exchange block data
+
/// Priority class: high
pub const PRIO_HIGH: RequestPriority = 0x20;
/// Priority class: normal
pub const PRIO_NORMAL: RequestPriority = 0x40;
/// Priority class: background
pub const PRIO_BACKGROUND: RequestPriority = 0x80;
+
/// Priority: primary among given class
pub const PRIO_PRIMARY: RequestPriority = 0x00;
/// Priority: secondary among given class (ex: `PRIO_HIGH | PRIO_SECONDARY`)
diff --git a/src/net/send.rs b/src/net/send.rs
index 0db0ba77..c60fc6b2 100644
--- a/src/net/send.rs
+++ b/src/net/send.rs
@@ -109,7 +109,7 @@ impl SendQueuePriority {
let i = order_vec.iter().take_while(|o2| **o2 < order).count();
order_vec.insert(i, order);
}
- self.items.push_front(item);
+ self.items.push_back(item);
}
fn remove(&mut self, id: RequestID) {
if let Some(i) = self.items.iter().position(|x| x.id == id) {
@@ -128,51 +128,56 @@ impl SendQueuePriority {
self.items.is_empty()
}
fn poll_next_ready(&mut self, ctx: &mut Context<'_>) -> Poll<(RequestID, DataFrame)> {
- for (j, item) in self.items.iter_mut().enumerate() {
- if let Some(OrderTag(stream, order)) = item.order_tag {
- if order > *self.order.get(&stream).unwrap().front().unwrap() {
+ // in step 1: poll only streams that have sent 0 bytes, we want to send them in priority
+ // as they most likely represent small requests to be sent first
+ // in step 2: poll all streams
+ for step in 0..2 {
+ for (j, item) in self.items.iter_mut().enumerate() {
+ if let Some(OrderTag(stream, order)) = item.order_tag {
+ if order > *self.order.get(&stream).unwrap().front().unwrap() {
+ continue;
+ }
+ }
+
+ if step == 0 && item.sent > 0 {
continue;
}
- }
- let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize);
- if let Poll::Ready(bytes_or_err) = Pin::new(&mut item_reader).poll(ctx) {
- let id = item.id;
- let eos = item.data.eos();
-
- let packet = bytes_or_err.map_err(|e| match e {
- ReadExactError::Stream(err) => err,
- _ => unreachable!(),
- });
-
- let is_err = packet.is_err();
- let data_frame = DataFrame::from_packet(packet, !eos);
- item.sent += data_frame.data().len();
-
- if eos || is_err {
- // If item had an order tag, remove it from the corresponding ordering list
- if let Some(OrderTag(stream, order)) = item.order_tag {
- let order_stream = self.order.get_mut(&stream).unwrap();
- assert_eq!(order_stream.pop_front(), Some(order));
- if order_stream.is_empty() {
- self.order.remove(&stream);
+ let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize);
+ if let Poll::Ready(bytes_or_err) = Pin::new(&mut item_reader).poll(ctx) {
+ let id = item.id;
+ let eos = item.data.eos();
+
+ let packet = bytes_or_err.map_err(|e| match e {
+ ReadExactError::Stream(err) => err,
+ _ => unreachable!(),
+ });
+
+ let is_err = packet.is_err();
+ let data_frame = DataFrame::from_packet(packet, !eos);
+ item.sent += data_frame.data().len();
+
+ if eos || is_err {
+ // If item had an order tag, remove it from the corresponding ordering list
+ if let Some(OrderTag(stream, order)) = item.order_tag {
+ let order_stream = self.order.get_mut(&stream).unwrap();
+ assert_eq!(order_stream.pop_front(), Some(order));
+ if order_stream.is_empty() {
+ self.order.remove(&stream);
+ }
}
+ // Remove item from sending queue
+ self.items.remove(j);
+ } else if step == 0 {
+ // Step 0 means that this stream had not sent any bytes yet.
+ // Now that it has, and it was not an EOS, we know that it is bigger
+ // than one chunk so move it at the end of the queue.
+ let item = self.items.remove(j).unwrap();
+ self.items.push_back(item);
}
- // Remove item from sending queue
- self.items.remove(j);
- } else {
- // Move item later in send queue to implement LAS scheduling
- // (LAS = Least Attained Service)
- for k in j..self.items.len() - 1 {
- if self.items[k].sent >= self.items[k + 1].sent {
- self.items.swap(k, k + 1);
- } else {
- break;
- }
- }
- }
- return Poll::Ready((id, data_frame));
+ return Poll::Ready((id, data_frame));
+ }
}
}
diff --git a/src/net/server.rs b/src/net/server.rs
index 55b9e678..36dccb2f 100644
--- a/src/net/server.rs
+++ b/src/net/server.rs
@@ -190,7 +190,7 @@ impl RecvLoop for ServerConn {
let (prio, resp_enc_result) = match ReqEnc::decode(stream).await {
Ok(req_enc) => (req_enc.prio, self2.recv_handler_aux(req_enc).await),
- Err(e) => (PRIO_HIGH, Err(e)),
+ Err(e) => (PRIO_NORMAL, Err(e)),
};
debug!("server: sending response to {}", id);
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 05fdcce4..ea3e5e76 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -33,8 +33,7 @@ use crate::metrics::RpcMetrics;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
/// Strategy to apply when making RPC
-#[derive(Copy, Clone)]
-pub struct RequestStrategy {
+pub struct RequestStrategy<T> {
/// Min number of response to consider the request successful
rs_quorum: Option<usize>,
/// Send all requests at once
@@ -43,6 +42,8 @@ pub struct RequestStrategy {
rs_priority: RequestPriority,
/// Custom timeout for this request
rs_timeout: Timeout,
+ /// Data to drop when everything completes
+ rs_drop_on_complete: T,
}
#[derive(Copy, Clone)]
@@ -52,7 +53,19 @@ enum Timeout {
Custom(Duration),
}
-impl RequestStrategy {
+impl Clone for RequestStrategy<()> {
+ fn clone(&self) -> Self {
+ RequestStrategy {
+ rs_quorum: self.rs_quorum,
+ rs_send_all_at_once: self.rs_send_all_at_once,
+ rs_priority: self.rs_priority,
+ rs_timeout: self.rs_timeout,
+ rs_drop_on_complete: (),
+ }
+ }
+}
+
+impl RequestStrategy<()> {
/// Create a RequestStrategy with default timeout and not interrupting when quorum reached
pub fn with_priority(prio: RequestPriority) -> Self {
RequestStrategy {
@@ -60,8 +73,22 @@ impl RequestStrategy {
rs_send_all_at_once: None,
rs_priority: prio,
rs_timeout: Timeout::Default,
+ rs_drop_on_complete: (),
+ }
+ }
+ /// Add an item to be dropped on completion
+ pub fn with_drop_on_completion<T>(self, drop_on_complete: T) -> RequestStrategy<T> {
+ RequestStrategy {
+ rs_quorum: self.rs_quorum,
+ rs_send_all_at_once: self.rs_send_all_at_once,
+ rs_priority: self.rs_priority,
+ rs_timeout: self.rs_timeout,
+ rs_drop_on_complete: drop_on_complete,
}
}
+}
+
+impl<T> RequestStrategy<T> {
/// Set quorum to be reached for request
pub fn with_quorum(mut self, quorum: usize) -> Self {
self.rs_quorum = Some(quorum);
@@ -82,6 +109,19 @@ impl RequestStrategy {
self.rs_timeout = Timeout::Custom(timeout);
self
}
+ /// Extract drop_on_complete item
+ fn extract_drop_on_complete(self) -> (RequestStrategy<()>, T) {
+ (
+ RequestStrategy {
+ rs_quorum: self.rs_quorum,
+ rs_send_all_at_once: self.rs_send_all_at_once,
+ rs_priority: self.rs_priority,
+ rs_timeout: self.rs_timeout,
+ rs_drop_on_complete: (),
+ },
+ self.rs_drop_on_complete,
+ )
+ }
}
#[derive(Clone)]
@@ -122,7 +162,7 @@ impl RpcHelper {
endpoint: &Endpoint<M, H>,
to: Uuid,
msg: N,
- strat: RequestStrategy,
+ strat: RequestStrategy<()>,
) -> Result<S, Error>
where
M: Rpc<Response = Result<S, Error>>,
@@ -182,7 +222,7 @@ impl RpcHelper {
endpoint: &Endpoint<M, H>,
to: &[Uuid],
msg: N,
- strat: RequestStrategy,
+ strat: RequestStrategy<()>,
) -> Result<Vec<(Uuid, Result<S, Error>)>, Error>
where
M: Rpc<Response = Result<S, Error>>,
@@ -197,7 +237,7 @@ impl RpcHelper {
let resps = join_all(
to.iter()
- .map(|to| self.call(endpoint, *to, msg.clone(), strat)),
+ .map(|to| self.call(endpoint, *to, msg.clone(), strat.clone())),
)
.with_context(Context::current_with_span(span))
.await;
@@ -212,7 +252,7 @@ impl RpcHelper {
&self,
endpoint: &Endpoint<M, H>,
msg: N,
- strat: RequestStrategy,
+ strat: RequestStrategy<()>,
) -> Result<Vec<(Uuid, Result<S, Error>)>, Error>
where
M: Rpc<Response = Result<S, Error>>,
@@ -252,7 +292,7 @@ impl RpcHelper {
endpoint: &Arc<Endpoint<M, H>>,
to: &[Uuid],
msg: N,
- strategy: RequestStrategy,
+ strategy: RequestStrategy<()>,
) -> Result<Vec<S>, Error>
where
M: Rpc<Response = Result<S, Error>> + 'static,
@@ -285,7 +325,7 @@ impl RpcHelper {
endpoint: &Arc<Endpoint<M, H>>,
to: &[Uuid],
msg: N,
- strategy: RequestStrategy,
+ strategy: RequestStrategy<()>,
quorum: usize,
) -> Result<Vec<S>, Error>
where
@@ -316,6 +356,7 @@ impl RpcHelper {
let self2 = self.clone();
let msg = msg.clone();
let endpoint2 = endpoint.clone();
+ let strategy = strategy.clone();
async move { self2.call(&endpoint2, to, msg, strategy).await }
});
@@ -388,18 +429,19 @@ impl RpcHelper {
/// changes, where data has to be written both in the old layout and in the
/// new one as long as all nodes have not successfully tranisitionned and
/// moved all data to the new layout.
- pub async fn try_write_many_sets<M, N, H, S>(
+ pub async fn try_write_many_sets<M, N, H, S, T>(
&self,
endpoint: &Arc<Endpoint<M, H>>,
to_sets: &[Vec<Uuid>],
msg: N,
- strategy: RequestStrategy,
+ strategy: RequestStrategy<T>,
) -> Result<Vec<S>, Error>
where
M: Rpc<Response = Result<S, Error>> + 'static,
N: IntoReq<M>,
H: StreamingEndpointHandler<M> + 'static,
S: Send + 'static,
+ T: Send + 'static,
{
let quorum = strategy
.rs_quorum
@@ -423,12 +465,12 @@ impl RpcHelper {
.await
}
- async fn try_write_many_sets_inner<M, N, H, S>(
+ async fn try_write_many_sets_inner<M, N, H, S, T>(
&self,
endpoint: &Arc<Endpoint<M, H>>,
to_sets: &[Vec<Uuid>],
msg: N,
- strategy: RequestStrategy,
+ strategy: RequestStrategy<T>,
quorum: usize,
) -> Result<Vec<S>, Error>
where
@@ -436,11 +478,14 @@ impl RpcHelper {
N: IntoReq<M>,
H: StreamingEndpointHandler<M> + 'static,
S: Send + 'static,
+ T: Send + 'static,
{
// Peers may appear in many quorum sets. Here, build a list of peers,
// mapping to the index of the quorum sets in which they appear.
let mut result_tracker = QuorumSetResultTracker::new(to_sets, quorum);
+ let (strategy, drop_on_complete) = strategy.extract_drop_on_complete();
+
// Send one request to each peer of the quorum sets
let msg = msg.into_req().map_err(garage_net::error::Error::from)?;
let requests = result_tracker.nodes.keys().map(|peer| {
@@ -448,6 +493,7 @@ impl RpcHelper {
let msg = msg.clone();
let endpoint2 = endpoint.clone();
let to = *peer;
+ let strategy = strategy.clone();
async move { (to, self2.call(&endpoint2, to, msg, strategy).await) }
});
let mut resp_stream = requests.collect::<FuturesUnordered<_>>();
@@ -463,6 +509,7 @@ impl RpcHelper {
// Continue all other requets in background
tokio::spawn(async move {
resp_stream.collect::<Vec<(Uuid, Result<_, _>)>>().await;
+ drop(drop_on_complete);
});
return Ok(result_tracker.success_values());
diff --git a/src/util/config.rs b/src/util/config.rs
index c5a24f76..028f8c68 100644
--- a/src/util/config.rs
+++ b/src/util/config.rs
@@ -60,6 +60,14 @@ pub struct Config {
)]
pub compression_level: Option<i32>,
+ /// Maximum amount of block data to buffer in RAM for sending to
+ /// remote nodes when these nodes are on slower links
+ #[serde(
+ deserialize_with = "deserialize_capacity",
+ default = "default_block_ram_buffer_max"
+ )]
+ pub block_ram_buffer_max: usize,
+
/// Skip the permission check of secret files. Useful when
/// POSIX ACLs (or more complex chmods) are used.
#[serde(default)]
@@ -247,6 +255,9 @@ fn default_db_engine() -> String {
fn default_block_size() -> usize {
1048576
}
+fn default_block_ram_buffer_max() -> usize {
+ 256 * 1024 * 1024
+}
fn default_consistency_mode() -> String {
"consistent".into()