diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/block/manager.rs | 37 | ||||
-rw-r--r-- | src/block/metrics.rs | 15 | ||||
-rw-r--r-- | src/block/resync.rs | 6 | ||||
-rw-r--r-- | src/model/k2v/rpc.rs | 2 | ||||
-rw-r--r-- | src/net/message.rs | 18 | ||||
-rw-r--r-- | src/net/send.rs | 85 | ||||
-rw-r--r-- | src/net/server.rs | 2 | ||||
-rw-r--r-- | src/rpc/rpc_helper.rs | 73 | ||||
-rw-r--r-- | src/util/config.rs | 11 |
9 files changed, 186 insertions, 63 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs index 82db2cab..40b177a2 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -1,3 +1,4 @@ +use std::convert::TryInto; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; @@ -10,7 +11,7 @@ use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; -use tokio::sync::{mpsc, Mutex, MutexGuard}; +use tokio::sync::{mpsc, Mutex, MutexGuard, Semaphore}; use opentelemetry::{ trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer}, @@ -93,6 +94,7 @@ pub struct BlockManager { pub(crate) system: Arc<System>, pub(crate) endpoint: Arc<Endpoint<BlockRpc, Self>>, + buffer_kb_semaphore: Arc<Semaphore>, pub(crate) metrics: BlockManagerMetrics, @@ -152,11 +154,14 @@ impl BlockManager { .netapp .endpoint("garage_block/manager.rs/Rpc".to_string()); + let buffer_kb_semaphore = Arc::new(Semaphore::new(config.block_ram_buffer_max / 1024)); + let metrics = BlockManagerMetrics::new( config.compression_level, rc.rc_table.clone(), resync.queue.clone(), resync.errors.clone(), + buffer_kb_semaphore.clone(), ); let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info"); @@ -176,6 +181,7 @@ impl BlockManager { resync, system, endpoint, + buffer_kb_semaphore, metrics, scrub_persister, tx_scrub_command: ArcSwapOption::new(None), @@ -238,10 +244,16 @@ impl BlockManager { async fn rpc_get_raw_block_streaming( &self, hash: &Hash, + priority: RequestPriority, order_tag: Option<OrderTag>, ) -> Result<DataBlockStream, Error> { - self.rpc_get_raw_block_internal(hash, order_tag, |stream| async move { Ok(stream) }) - .await + self.rpc_get_raw_block_internal( + hash, + priority, + order_tag, + |stream| async move { Ok(stream) }, + ) + .await } /// Ask nodes that might have a (possibly compressed) block for it @@ -249,9 +261,10 @@ impl BlockManager { pub(crate) async fn rpc_get_raw_block( &self, hash: &Hash, + priority: RequestPriority, order_tag: Option<OrderTag>, ) -> Result<DataBlock, Error> { - self.rpc_get_raw_block_internal(hash, order_tag, |block_stream| async move { + self.rpc_get_raw_block_internal(hash, priority, order_tag, |block_stream| async move { let (header, stream) = block_stream.into_parts(); read_stream_to_end(stream) .await @@ -264,6 +277,7 @@ impl BlockManager { async fn rpc_get_raw_block_internal<F, Fut, T>( &self, hash: &Hash, + priority: RequestPriority, order_tag: Option<OrderTag>, f: F, ) -> Result<T, Error> @@ -281,7 +295,7 @@ impl BlockManager { let rpc = self.endpoint.call_streaming( &node_id, BlockRpc::GetBlock(*hash, order_tag), - PRIO_NORMAL | PRIO_SECONDARY, + priority, ); tokio::select! { res = rpc => { @@ -333,7 +347,9 @@ impl BlockManager { hash: &Hash, order_tag: Option<OrderTag>, ) -> Result<ByteStream, Error> { - let block_stream = self.rpc_get_raw_block_streaming(hash, order_tag).await?; + let block_stream = self + .rpc_get_raw_block_streaming(hash, PRIO_NORMAL | PRIO_SECONDARY, order_tag) + .await?; let (header, stream) = block_stream.into_parts(); match header { DataBlockHeader::Plain => Ok(stream), @@ -361,6 +377,14 @@ impl BlockManager { let (header, bytes) = DataBlock::from_buffer(data, compression_level) .await .into_parts(); + + let permit = self + .buffer_kb_semaphore + .clone() + .acquire_many_owned((bytes.len() / 1024).try_into().unwrap()) + .await + .ok_or_message("could not reserve space for buffer of data to send to remote nodes")?; + let put_block_rpc = Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes); let put_block_rpc = if let Some(tag) = order_tag { @@ -376,6 +400,7 @@ impl BlockManager { who.as_ref(), put_block_rpc, RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY) + .with_drop_on_completion(permit) .with_quorum(self.replication.write_quorum()), ) .await?; diff --git a/src/block/metrics.rs b/src/block/metrics.rs index 8e10afdf..2d41e365 100644 --- a/src/block/metrics.rs +++ b/src/block/metrics.rs @@ -1,3 +1,7 @@ +use std::sync::Arc; + +use tokio::sync::Semaphore; + use opentelemetry::{global, metrics::*}; use garage_db as db; @@ -8,6 +12,7 @@ pub struct BlockManagerMetrics { pub(crate) _rc_size: ValueObserver<u64>, pub(crate) _resync_queue_len: ValueObserver<u64>, pub(crate) _resync_errored_blocks: ValueObserver<u64>, + pub(crate) _buffer_free_kb: ValueObserver<u64>, pub(crate) resync_counter: BoundCounter<u64>, pub(crate) resync_error_counter: BoundCounter<u64>, @@ -30,6 +35,7 @@ impl BlockManagerMetrics { rc_tree: db::Tree, resync_queue: db::Tree, resync_errors: db::Tree, + buffer_semaphore: Arc<Semaphore>, ) -> Self { let meter = global::meter("garage_model/block"); Self { @@ -69,6 +75,15 @@ impl BlockManagerMetrics { .with_description("Number of block hashes whose last resync resulted in an error") .init(), + _buffer_free_kb: meter + .u64_value_observer("block.ram_buffer_free_kb", move |observer| { + observer.observe(buffer_semaphore.available_permits() as u64, &[]) + }) + .with_description( + "Available RAM in KiB to use for buffering data blocks to be written to remote nodes", + ) + .init(), + resync_counter: meter .u64_counter("block.resync_counter") .with_description("Number of calls to resync_block") diff --git a/src/block/resync.rs b/src/block/resync.rs index b4108213..ab4604ad 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -436,7 +436,7 @@ impl BlockResyncManager { &manager.endpoint, &need_nodes, put_block_message, - RequestStrategy::with_priority(PRIO_BACKGROUND) + RequestStrategy::with_priority(PRIO_BACKGROUND | PRIO_SECONDARY) .with_quorum(need_nodes.len()), ) .await @@ -460,7 +460,9 @@ impl BlockResyncManager { hash ); - let block_data = manager.rpc_get_raw_block(hash, None).await; + let block_data = manager + .rpc_get_raw_block(hash, PRIO_BACKGROUND | PRIO_SECONDARY, None) + .await; if matches!(block_data, Err(Error::MissingBlock(_))) { warn!( "Could not fetch needed block {:?}, no node returned valid data. Checking that refcount is correct.", diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index 95ff2d18..4d7186a7 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -300,7 +300,7 @@ impl K2VRpcHandler { .map(|node| { self.system .rpc_helper() - .call(&self.endpoint, *node, msg.clone(), rs) + .call(&self.endpoint, *node, msg.clone(), rs.clone()) }) .collect::<FuturesUnordered<_>>(); diff --git a/src/net/message.rs b/src/net/message.rs index b0d255c6..af98ca12 100644 --- a/src/net/message.rs +++ b/src/net/message.rs @@ -28,12 +28,30 @@ use crate::util::*; /// The same priority value is given to a request and to its associated response. pub type RequestPriority = u8; +// Usage of priority levels in Garage: +// +// PRIO_HIGH +// for liveness check events such as pings and important +// reconfiguration events such as layout changes +// +// PRIO_NORMAL +// for standard interactive requests to exchange metadata +// +// PRIO_NORMAL | PRIO_SECONDARY +// for standard interactive requests to exchange block data +// +// PRIO_BACKGROUND +// for background resync requests to exchange metadata +// PRIO_BACKGROUND | PRIO_SECONDARY +// for background resync requests to exchange block data + /// Priority class: high pub const PRIO_HIGH: RequestPriority = 0x20; /// Priority class: normal pub const PRIO_NORMAL: RequestPriority = 0x40; /// Priority class: background pub const PRIO_BACKGROUND: RequestPriority = 0x80; + /// Priority: primary among given class pub const PRIO_PRIMARY: RequestPriority = 0x00; /// Priority: secondary among given class (ex: `PRIO_HIGH | PRIO_SECONDARY`) diff --git a/src/net/send.rs b/src/net/send.rs index 0db0ba77..c60fc6b2 100644 --- a/src/net/send.rs +++ b/src/net/send.rs @@ -109,7 +109,7 @@ impl SendQueuePriority { let i = order_vec.iter().take_while(|o2| **o2 < order).count(); order_vec.insert(i, order); } - self.items.push_front(item); + self.items.push_back(item); } fn remove(&mut self, id: RequestID) { if let Some(i) = self.items.iter().position(|x| x.id == id) { @@ -128,51 +128,56 @@ impl SendQueuePriority { self.items.is_empty() } fn poll_next_ready(&mut self, ctx: &mut Context<'_>) -> Poll<(RequestID, DataFrame)> { - for (j, item) in self.items.iter_mut().enumerate() { - if let Some(OrderTag(stream, order)) = item.order_tag { - if order > *self.order.get(&stream).unwrap().front().unwrap() { + // in step 1: poll only streams that have sent 0 bytes, we want to send them in priority + // as they most likely represent small requests to be sent first + // in step 2: poll all streams + for step in 0..2 { + for (j, item) in self.items.iter_mut().enumerate() { + if let Some(OrderTag(stream, order)) = item.order_tag { + if order > *self.order.get(&stream).unwrap().front().unwrap() { + continue; + } + } + + if step == 0 && item.sent > 0 { continue; } - } - let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize); - if let Poll::Ready(bytes_or_err) = Pin::new(&mut item_reader).poll(ctx) { - let id = item.id; - let eos = item.data.eos(); - - let packet = bytes_or_err.map_err(|e| match e { - ReadExactError::Stream(err) => err, - _ => unreachable!(), - }); - - let is_err = packet.is_err(); - let data_frame = DataFrame::from_packet(packet, !eos); - item.sent += data_frame.data().len(); - - if eos || is_err { - // If item had an order tag, remove it from the corresponding ordering list - if let Some(OrderTag(stream, order)) = item.order_tag { - let order_stream = self.order.get_mut(&stream).unwrap(); - assert_eq!(order_stream.pop_front(), Some(order)); - if order_stream.is_empty() { - self.order.remove(&stream); + let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize); + if let Poll::Ready(bytes_or_err) = Pin::new(&mut item_reader).poll(ctx) { + let id = item.id; + let eos = item.data.eos(); + + let packet = bytes_or_err.map_err(|e| match e { + ReadExactError::Stream(err) => err, + _ => unreachable!(), + }); + + let is_err = packet.is_err(); + let data_frame = DataFrame::from_packet(packet, !eos); + item.sent += data_frame.data().len(); + + if eos || is_err { + // If item had an order tag, remove it from the corresponding ordering list + if let Some(OrderTag(stream, order)) = item.order_tag { + let order_stream = self.order.get_mut(&stream).unwrap(); + assert_eq!(order_stream.pop_front(), Some(order)); + if order_stream.is_empty() { + self.order.remove(&stream); + } } + // Remove item from sending queue + self.items.remove(j); + } else if step == 0 { + // Step 0 means that this stream had not sent any bytes yet. + // Now that it has, and it was not an EOS, we know that it is bigger + // than one chunk so move it at the end of the queue. + let item = self.items.remove(j).unwrap(); + self.items.push_back(item); } - // Remove item from sending queue - self.items.remove(j); - } else { - // Move item later in send queue to implement LAS scheduling - // (LAS = Least Attained Service) - for k in j..self.items.len() - 1 { - if self.items[k].sent >= self.items[k + 1].sent { - self.items.swap(k, k + 1); - } else { - break; - } - } - } - return Poll::Ready((id, data_frame)); + return Poll::Ready((id, data_frame)); + } } } diff --git a/src/net/server.rs b/src/net/server.rs index 55b9e678..36dccb2f 100644 --- a/src/net/server.rs +++ b/src/net/server.rs @@ -190,7 +190,7 @@ impl RecvLoop for ServerConn { let (prio, resp_enc_result) = match ReqEnc::decode(stream).await { Ok(req_enc) => (req_enc.prio, self2.recv_handler_aux(req_enc).await), - Err(e) => (PRIO_HIGH, Err(e)), + Err(e) => (PRIO_NORMAL, Err(e)), }; debug!("server: sending response to {}", id); diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 05fdcce4..ea3e5e76 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -33,8 +33,7 @@ use crate::metrics::RpcMetrics; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300); /// Strategy to apply when making RPC -#[derive(Copy, Clone)] -pub struct RequestStrategy { +pub struct RequestStrategy<T> { /// Min number of response to consider the request successful rs_quorum: Option<usize>, /// Send all requests at once @@ -43,6 +42,8 @@ pub struct RequestStrategy { rs_priority: RequestPriority, /// Custom timeout for this request rs_timeout: Timeout, + /// Data to drop when everything completes + rs_drop_on_complete: T, } #[derive(Copy, Clone)] @@ -52,7 +53,19 @@ enum Timeout { Custom(Duration), } -impl RequestStrategy { +impl Clone for RequestStrategy<()> { + fn clone(&self) -> Self { + RequestStrategy { + rs_quorum: self.rs_quorum, + rs_send_all_at_once: self.rs_send_all_at_once, + rs_priority: self.rs_priority, + rs_timeout: self.rs_timeout, + rs_drop_on_complete: (), + } + } +} + +impl RequestStrategy<()> { /// Create a RequestStrategy with default timeout and not interrupting when quorum reached pub fn with_priority(prio: RequestPriority) -> Self { RequestStrategy { @@ -60,8 +73,22 @@ impl RequestStrategy { rs_send_all_at_once: None, rs_priority: prio, rs_timeout: Timeout::Default, + rs_drop_on_complete: (), + } + } + /// Add an item to be dropped on completion + pub fn with_drop_on_completion<T>(self, drop_on_complete: T) -> RequestStrategy<T> { + RequestStrategy { + rs_quorum: self.rs_quorum, + rs_send_all_at_once: self.rs_send_all_at_once, + rs_priority: self.rs_priority, + rs_timeout: self.rs_timeout, + rs_drop_on_complete: drop_on_complete, } } +} + +impl<T> RequestStrategy<T> { /// Set quorum to be reached for request pub fn with_quorum(mut self, quorum: usize) -> Self { self.rs_quorum = Some(quorum); @@ -82,6 +109,19 @@ impl RequestStrategy { self.rs_timeout = Timeout::Custom(timeout); self } + /// Extract drop_on_complete item + fn extract_drop_on_complete(self) -> (RequestStrategy<()>, T) { + ( + RequestStrategy { + rs_quorum: self.rs_quorum, + rs_send_all_at_once: self.rs_send_all_at_once, + rs_priority: self.rs_priority, + rs_timeout: self.rs_timeout, + rs_drop_on_complete: (), + }, + self.rs_drop_on_complete, + ) + } } #[derive(Clone)] @@ -122,7 +162,7 @@ impl RpcHelper { endpoint: &Endpoint<M, H>, to: Uuid, msg: N, - strat: RequestStrategy, + strat: RequestStrategy<()>, ) -> Result<S, Error> where M: Rpc<Response = Result<S, Error>>, @@ -182,7 +222,7 @@ impl RpcHelper { endpoint: &Endpoint<M, H>, to: &[Uuid], msg: N, - strat: RequestStrategy, + strat: RequestStrategy<()>, ) -> Result<Vec<(Uuid, Result<S, Error>)>, Error> where M: Rpc<Response = Result<S, Error>>, @@ -197,7 +237,7 @@ impl RpcHelper { let resps = join_all( to.iter() - .map(|to| self.call(endpoint, *to, msg.clone(), strat)), + .map(|to| self.call(endpoint, *to, msg.clone(), strat.clone())), ) .with_context(Context::current_with_span(span)) .await; @@ -212,7 +252,7 @@ impl RpcHelper { &self, endpoint: &Endpoint<M, H>, msg: N, - strat: RequestStrategy, + strat: RequestStrategy<()>, ) -> Result<Vec<(Uuid, Result<S, Error>)>, Error> where M: Rpc<Response = Result<S, Error>>, @@ -252,7 +292,7 @@ impl RpcHelper { endpoint: &Arc<Endpoint<M, H>>, to: &[Uuid], msg: N, - strategy: RequestStrategy, + strategy: RequestStrategy<()>, ) -> Result<Vec<S>, Error> where M: Rpc<Response = Result<S, Error>> + 'static, @@ -285,7 +325,7 @@ impl RpcHelper { endpoint: &Arc<Endpoint<M, H>>, to: &[Uuid], msg: N, - strategy: RequestStrategy, + strategy: RequestStrategy<()>, quorum: usize, ) -> Result<Vec<S>, Error> where @@ -316,6 +356,7 @@ impl RpcHelper { let self2 = self.clone(); let msg = msg.clone(); let endpoint2 = endpoint.clone(); + let strategy = strategy.clone(); async move { self2.call(&endpoint2, to, msg, strategy).await } }); @@ -388,18 +429,19 @@ impl RpcHelper { /// changes, where data has to be written both in the old layout and in the /// new one as long as all nodes have not successfully tranisitionned and /// moved all data to the new layout. - pub async fn try_write_many_sets<M, N, H, S>( + pub async fn try_write_many_sets<M, N, H, S, T>( &self, endpoint: &Arc<Endpoint<M, H>>, to_sets: &[Vec<Uuid>], msg: N, - strategy: RequestStrategy, + strategy: RequestStrategy<T>, ) -> Result<Vec<S>, Error> where M: Rpc<Response = Result<S, Error>> + 'static, N: IntoReq<M>, H: StreamingEndpointHandler<M> + 'static, S: Send + 'static, + T: Send + 'static, { let quorum = strategy .rs_quorum @@ -423,12 +465,12 @@ impl RpcHelper { .await } - async fn try_write_many_sets_inner<M, N, H, S>( + async fn try_write_many_sets_inner<M, N, H, S, T>( &self, endpoint: &Arc<Endpoint<M, H>>, to_sets: &[Vec<Uuid>], msg: N, - strategy: RequestStrategy, + strategy: RequestStrategy<T>, quorum: usize, ) -> Result<Vec<S>, Error> where @@ -436,11 +478,14 @@ impl RpcHelper { N: IntoReq<M>, H: StreamingEndpointHandler<M> + 'static, S: Send + 'static, + T: Send + 'static, { // Peers may appear in many quorum sets. Here, build a list of peers, // mapping to the index of the quorum sets in which they appear. let mut result_tracker = QuorumSetResultTracker::new(to_sets, quorum); + let (strategy, drop_on_complete) = strategy.extract_drop_on_complete(); + // Send one request to each peer of the quorum sets let msg = msg.into_req().map_err(garage_net::error::Error::from)?; let requests = result_tracker.nodes.keys().map(|peer| { @@ -448,6 +493,7 @@ impl RpcHelper { let msg = msg.clone(); let endpoint2 = endpoint.clone(); let to = *peer; + let strategy = strategy.clone(); async move { (to, self2.call(&endpoint2, to, msg, strategy).await) } }); let mut resp_stream = requests.collect::<FuturesUnordered<_>>(); @@ -463,6 +509,7 @@ impl RpcHelper { // Continue all other requets in background tokio::spawn(async move { resp_stream.collect::<Vec<(Uuid, Result<_, _>)>>().await; + drop(drop_on_complete); }); return Ok(result_tracker.success_values()); diff --git a/src/util/config.rs b/src/util/config.rs index c5a24f76..028f8c68 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -60,6 +60,14 @@ pub struct Config { )] pub compression_level: Option<i32>, + /// Maximum amount of block data to buffer in RAM for sending to + /// remote nodes when these nodes are on slower links + #[serde( + deserialize_with = "deserialize_capacity", + default = "default_block_ram_buffer_max" + )] + pub block_ram_buffer_max: usize, + /// Skip the permission check of secret files. Useful when /// POSIX ACLs (or more complex chmods) are used. #[serde(default)] @@ -247,6 +255,9 @@ fn default_db_engine() -> String { fn default_block_size() -> usize { 1048576 } +fn default_block_ram_buffer_max() -> usize { + 256 * 1024 * 1024 +} fn default_consistency_mode() -> String { "consistent".into() |