diff options
Diffstat (limited to 'src/rpc')
-rw-r--r-- | src/rpc/Cargo.toml | 5 | ||||
-rw-r--r-- | src/rpc/rpc_helper.rs | 186 | ||||
-rw-r--r-- | src/rpc/system.rs | 7 |
3 files changed, 104 insertions, 94 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 80a1975c..6f3c81ed 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -45,9 +45,8 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi tokio-stream = { version = "0.1", features = ["net"] } opentelemetry = "0.17" -#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } -#netapp = { version = "0.4", path = "../../../netapp", features = ["telemetry"] } -netapp = { version = "0.4.4", features = ["telemetry"] } +#netapp = { version = "0.4.4", features = ["telemetry"] } +netapp = { version = "0.5.0", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] } hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] } diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 34717d3b..ddabd636 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -15,10 +15,13 @@ use opentelemetry::{ Context, }; -pub use netapp::endpoint::{Endpoint, EndpointHandler, Message as Rpc}; +pub use netapp::endpoint::{Endpoint, EndpointHandler, StreamingEndpointHandler}; +use netapp::message::IntoReq; +pub use netapp::message::{ + Message as Rpc, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL, +}; use netapp::peering::fullmesh::FullMeshPeeringStrategy; -pub use netapp::proto::*; -pub use netapp::{NetApp, NodeID}; +pub use netapp::{self, NetApp, NodeID}; use garage_util::background::BackgroundRunner; use garage_util::data::*; @@ -30,10 +33,8 @@ use crate::ring::Ring; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); -// Try to never have more than 200MB of outgoing requests -// buffered at the same time. Other requests are queued until -// space is freed. -const REQUEST_BUFFER_SIZE: usize = 200 * 1024 * 1024; +// Don't allow more than 100 concurrent outgoing RPCs. +const MAX_CONCURRENT_REQUESTS: usize = 100; /// Strategy to apply when making RPC #[derive(Copy, Clone)] @@ -95,7 +96,7 @@ impl RpcHelper { background: Arc<BackgroundRunner>, ring: watch::Receiver<Arc<Ring>>, ) -> Self { - let sem = Arc::new(Semaphore::new(REQUEST_BUFFER_SIZE)); + let sem = Arc::new(Semaphore::new(MAX_CONCURRENT_REQUESTS)); let metrics = RpcMetrics::new(sem.clone()); @@ -109,30 +110,17 @@ impl RpcHelper { })) } - pub async fn call<M, H, S>( + pub async fn call<M, N, H, S>( &self, endpoint: &Endpoint<M, H>, to: Uuid, - msg: M, + msg: N, strat: RequestStrategy, ) -> Result<S, Error> where M: Rpc<Response = Result<S, Error>>, - H: EndpointHandler<M>, - { - self.call_arc(endpoint, to, Arc::new(msg), strat).await - } - - pub async fn call_arc<M, H, S>( - &self, - endpoint: &Endpoint<M, H>, - to: Uuid, - msg: Arc<M>, - strat: RequestStrategy, - ) -> Result<S, Error> - where - M: Rpc<Response = Result<S, Error>>, - H: EndpointHandler<M>, + N: IntoReq<M> + Send, + H: StreamingEndpointHandler<M>, { let metric_tags = [ KeyValue::new("rpc_endpoint", endpoint.path().to_string()), @@ -140,11 +128,10 @@ impl RpcHelper { KeyValue::new("to", format!("{:?}", to)), ]; - let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32; let permit = self .0 .request_buffer_semaphore - .acquire_many(msg_size) + .acquire() .record_duration(&self.0.metrics.rpc_queueing_time, &metric_tags) .await?; @@ -152,7 +139,7 @@ impl RpcHelper { let node_id = to.into(); let rpc_call = endpoint - .call(&node_id, msg, strat.rs_priority) + .call_streaming(&node_id, msg, strat.rs_priority) .record_duration(&self.0.metrics.rpc_duration, &metric_tags); select! { @@ -162,7 +149,7 @@ impl RpcHelper { if res.is_err() { self.0.metrics.rpc_netapp_error_counter.add(1, &metric_tags); } - let res = res?; + let res = res?.into_msg(); if res.is_err() { self.0.metrics.rpc_garage_error_counter.add(1, &metric_tags); @@ -178,38 +165,42 @@ impl RpcHelper { } } - pub async fn call_many<M, H, S>( + pub async fn call_many<M, N, H, S>( &self, endpoint: &Endpoint<M, H>, to: &[Uuid], - msg: M, + msg: N, strat: RequestStrategy, - ) -> Vec<(Uuid, Result<S, Error>)> + ) -> Result<Vec<(Uuid, Result<S, Error>)>, Error> where M: Rpc<Response = Result<S, Error>>, - H: EndpointHandler<M>, + N: IntoReq<M>, + H: StreamingEndpointHandler<M>, { - let msg = Arc::new(msg); + let msg = msg.into_req().map_err(netapp::error::Error::from)?; + let resps = join_all( to.iter() - .map(|to| self.call_arc(endpoint, *to, msg.clone(), strat)), + .map(|to| self.call(endpoint, *to, msg.clone(), strat)), ) .await; - to.iter() + Ok(to + .iter() .cloned() .zip(resps.into_iter()) - .collect::<Vec<_>>() + .collect::<Vec<_>>()) } - pub async fn broadcast<M, H, S>( + pub async fn broadcast<M, N, H, S>( &self, endpoint: &Endpoint<M, H>, - msg: M, + msg: N, strat: RequestStrategy, - ) -> Vec<(Uuid, Result<S, Error>)> + ) -> Result<Vec<(Uuid, Result<S, Error>)>, Error> where M: Rpc<Response = Result<S, Error>>, - H: EndpointHandler<M>, + N: IntoReq<M>, + H: StreamingEndpointHandler<M>, { let to = self .0 @@ -223,16 +214,17 @@ impl RpcHelper { /// Make a RPC call to multiple servers, returning either a Vec of responses, /// or an error if quorum could not be reached due to too many errors - pub async fn try_call_many<M, H, S>( + pub async fn try_call_many<M, N, H, S>( &self, endpoint: &Arc<Endpoint<M, H>>, to: &[Uuid], - msg: M, + msg: N, strategy: RequestStrategy, ) -> Result<Vec<S>, Error> where M: Rpc<Response = Result<S, Error>> + 'static, - H: EndpointHandler<M> + 'static, + N: IntoReq<M>, + H: StreamingEndpointHandler<M> + 'static, S: Send + 'static, { let quorum = strategy.rs_quorum.unwrap_or(to.len()); @@ -262,20 +254,21 @@ impl RpcHelper { .await } - async fn try_call_many_internal<M, H, S>( + async fn try_call_many_internal<M, N, H, S>( &self, endpoint: &Arc<Endpoint<M, H>>, to: &[Uuid], - msg: M, + msg: N, strategy: RequestStrategy, quorum: usize, ) -> Result<Vec<S>, Error> where M: Rpc<Response = Result<S, Error>> + 'static, - H: EndpointHandler<M> + 'static, + N: IntoReq<M>, + H: StreamingEndpointHandler<M> + 'static, S: Send + 'static, { - let msg = Arc::new(msg); + let msg = msg.into_req().map_err(netapp::error::Error::from)?; // Build future for each request // They are not started now: they are added below in a FuturesUnordered @@ -285,7 +278,7 @@ impl RpcHelper { let msg = msg.clone(); let endpoint2 = endpoint.clone(); (to, async move { - self2.call_arc(&endpoint2, to, msg, strategy).await + self2.call(&endpoint2, to, msg, strategy).await }) }); @@ -299,47 +292,19 @@ impl RpcHelper { // to reach a quorum, priorizing nodes with the lowest latency. // When there are errors, we start new requests to compensate. - // Retrieve some status variables that we will use to sort requests - let peer_list = self.0.fullmesh.get_peer_list(); - let ring: Arc<Ring> = self.0.ring.borrow().clone(); - let our_zone = match ring.layout.node_role(&self.0.our_node_id) { - Some(pc) => &pc.zone, - None => "", - }; - - // Augment requests with some information used to sort them. - // The tuples are as follows: - // (is another node?, is another zone?, latency, node ID, request future) - // We store all of these tuples in a vec that we can sort. - // By sorting this vec, we priorize ourself, then nodes in the same zone, - // and within a same zone we priorize nodes with the lowest latency. - let mut requests = requests - .map(|(to, fut)| { - let peer_zone = match ring.layout.node_role(&to) { - Some(pc) => &pc.zone, - None => "", - }; - let peer_avg_ping = peer_list - .iter() - .find(|x| x.id.as_ref() == to.as_slice()) - .and_then(|pi| pi.avg_ping) - .unwrap_or_else(|| Duration::from_secs(1)); - ( - to != self.0.our_node_id, - peer_zone != our_zone, - peer_avg_ping, - to, - fut, - ) - }) + // Reorder requests to priorize closeness / low latency + let request_order = self.request_order(to); + let mut ord_requests = vec![(); request_order.len()] + .into_iter() + .map(|_| None) .collect::<Vec<_>>(); - - // Sort requests by (priorize ourself, priorize same zone, priorize low latency) - requests - .sort_by_key(|(diffnode, diffzone, ping, _to, _fut)| (*diffnode, *diffzone, *ping)); + for (to, fut) in requests { + let i = request_order.iter().position(|x| *x == to).unwrap(); + ord_requests[i] = Some((to, fut)); + } // Make an iterator to take requests in their sorted order - let mut requests = requests.into_iter(); + let mut requests = ord_requests.into_iter().map(Option::unwrap); // resp_stream will contain all of the requests that are currently in flight. // (for the moment none, they will be added in the loop below) @@ -350,7 +315,7 @@ impl RpcHelper { // If the current set of requests that are running is not enough to possibly // reach quorum, start some new requests. while successes.len() + resp_stream.len() < quorum { - if let Some((_, _, _, req_to, fut)) = requests.next() { + if let Some((req_to, fut)) = requests.next() { let tracer = opentelemetry::global::tracer("garage"); let span = tracer.start(format!("RPC to {:?}", req_to)); resp_stream.push(tokio::spawn( @@ -420,4 +385,49 @@ impl RpcHelper { Err(Error::Quorum(quorum, successes.len(), to.len(), errors)) } } + + pub fn request_order(&self, nodes: &[Uuid]) -> Vec<Uuid> { + // Retrieve some status variables that we will use to sort requests + let peer_list = self.0.fullmesh.get_peer_list(); + let ring: Arc<Ring> = self.0.ring.borrow().clone(); + let our_zone = match ring.layout.node_role(&self.0.our_node_id) { + Some(pc) => &pc.zone, + None => "", + }; + + // Augment requests with some information used to sort them. + // The tuples are as follows: + // (is another node?, is another zone?, latency, node ID, request future) + // We store all of these tuples in a vec that we can sort. + // By sorting this vec, we priorize ourself, then nodes in the same zone, + // and within a same zone we priorize nodes with the lowest latency. + let mut nodes = nodes + .iter() + .map(|to| { + let peer_zone = match ring.layout.node_role(&to) { + Some(pc) => &pc.zone, + None => "", + }; + let peer_avg_ping = peer_list + .iter() + .find(|x| x.id.as_ref() == to.as_slice()) + .and_then(|pi| pi.avg_ping) + .unwrap_or_else(|| Duration::from_secs(1)); + ( + *to != self.0.our_node_id, + peer_zone != our_zone, + peer_avg_ping, + *to, + ) + }) + .collect::<Vec<_>>(); + + // Sort requests by (priorize ourself, priorize same zone, priorize low latency) + nodes.sort_by_key(|(diffnode, diffzone, ping, _to)| (*diffnode, *diffzone, *ping)); + + nodes + .into_iter() + .map(|(_, _, _, to)| to) + .collect::<Vec<_>>() + } } diff --git a/src/rpc/system.rs b/src/rpc/system.rs index fbfbbf56..5858660e 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -16,8 +16,8 @@ use tokio::sync::watch; use tokio::sync::Mutex; use netapp::endpoint::{Endpoint, EndpointHandler}; +use netapp::message::*; use netapp::peering::fullmesh::FullMeshPeeringStrategy; -use netapp::proto::*; use netapp::util::parse_and_resolve_peer_addr; use netapp::{NetApp, NetworkKey, NodeID, NodeKey}; @@ -541,7 +541,7 @@ impl System { SystemRpc::AdvertiseClusterLayout(layout), RequestStrategy::with_priority(PRIO_HIGH), ) - .await; + .await?; Ok(()) }); self.background.spawn(self.clone().save_cluster_layout()); @@ -556,7 +556,8 @@ impl System { self.update_local_status(); let local_status: NodeStatus = self.local_status.load().as_ref().clone(); - self.rpc + let _ = self + .rpc .broadcast( &self.system_endpoint, SystemRpc::AdvertiseStatus(local_status), |