From def78c5e6f5da37a0d17b5652c525fbeccbc2e86 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 9 May 2022 12:08:47 +0200 Subject: Update netapp to 0.4.4, fix #300 --- src/rpc/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/rpc') diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index efaacf2e..46d0dc1e 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -48,7 +48,7 @@ opentelemetry = "0.17" #netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } #netapp = { version = "0.4", path = "../../../netapp", features = ["telemetry"] } -netapp = { version = "0.4.2", features = ["telemetry"] } +netapp = { version = "0.4.4", features = ["telemetry"] } hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] } -- cgit v1.2.3 From 5768bf362262f78376af14517c4921941986192e Mon Sep 17 00:00:00 2001 From: Alex Date: Tue, 10 May 2022 13:16:57 +0200 Subject: First implementation of K2V (#293) **Specification:** View spec at [this URL](https://git.deuxfleurs.fr/Deuxfleurs/garage/src/branch/k2v/doc/drafts/k2v-spec.md) - [x] Specify the structure of K2V triples - [x] Specify the DVVS format used for causality detection - [x] Specify the K2V index (just a counter of number of values per partition key) - [x] Specify single-item endpoints: ReadItem, InsertItem, DeleteItem - [x] Specify index endpoint: ReadIndex - [x] Specify multi-item endpoints: InsertBatch, ReadBatch, DeleteBatch - [x] Move to JSON objects instead of tuples - [x] Specify endpoints for polling for updates on single values (PollItem) **Implementation:** - [x] Table for K2V items, causal contexts - [x] Indexing mechanism and table for K2V index - [x] Make API handlers a bit more generic - [x] K2V API endpoint - [x] K2V API router - [x] ReadItem - [x] InsertItem - [x] DeleteItem - [x] PollItem - [x] ReadIndex - [x] InsertBatch - [x] ReadBatch - [x] DeleteBatch **Testing:** - [x] Just a simple Python script that does some requests to check visually that things are going right (does not contain parsing of results or assertions on returned values) - [x] Actual tests: - [x] Adapt testing framework - [x] Simple test with InsertItem + ReadItem - [x] Test with several Insert/Read/DeleteItem + ReadIndex - [x] Test all combinations of return formats for ReadItem - [x] Test with ReadBatch, InsertBatch, DeleteBatch - [x] Test with PollItem - [x] Test error codes - [ ] Fix most broken stuff - [x] test PollItem broken randomly - [x] when invalid causality tokens are given, errors should be 4xx not 5xx **Improvements:** - [x] Descending range queries - [x] Specify - [x] Implement - [x] Add test - [x] Batch updates to index counter - [x] Put K2V behind `k2v` feature flag Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/293 Co-authored-by: Alex Co-committed-by: Alex --- src/rpc/Cargo.toml | 1 + 1 file changed, 1 insertion(+) (limited to 'src/rpc') diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 46d0dc1e..bed7f44a 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -52,5 +52,6 @@ netapp = { version = "0.4.4", features = ["telemetry"] } hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] } + [features] kubernetes-discovery = [ "kube", "k8s-openapi", "openssl", "schemars" ] -- cgit v1.2.3 From 382e74c798263d042b1c6ca3788c866a8c69c4f4 Mon Sep 17 00:00:00 2001 From: Alex Date: Tue, 24 May 2022 12:16:39 +0200 Subject: First version of admin API (#298) **Spec:** - [x] Start writing - [x] Specify all layout endpoints - [x] Specify all endpoints for operations on keys - [x] Specify all endpoints for operations on key/bucket permissions - [x] Specify all endpoints for operations on buckets - [x] Specify all endpoints for operations on bucket aliases View rendered spec at **Code:** - [x] Refactor code for admin api to use common api code that was created for K2V **General endpoints:** - [x] Metrics - [x] GetClusterStatus - [x] ConnectClusterNodes - [x] GetClusterLayout - [x] UpdateClusterLayout - [x] ApplyClusterLayout - [x] RevertClusterLayout **Key-related endpoints:** - [x] ListKeys - [x] CreateKey - [x] ImportKey - [x] GetKeyInfo - [x] UpdateKey - [x] DeleteKey **Bucket-related endpoints:** - [x] ListBuckets - [x] CreateBucket - [x] GetBucketInfo - [x] DeleteBucket - [x] PutBucketWebsite - [x] DeleteBucketWebsite **Operations on key/bucket permissions:** - [x] BucketAllowKey - [x] BucketDenyKey **Operations on bucket aliases:** - [x] GlobalAliasBucket - [x] GlobalUnaliasBucket - [x] LocalAliasBucket - [x] LocalUnaliasBucket **And also:** - [x] Separate error type for the admin API (this PR includes a quite big refactoring of error handling) - [x] Add management of website access - [ ] Check that nothing is missing wrt what can be done using the CLI - [ ] Improve formatting of the spec - [x] Make sure everyone is cool with the API design Fix #231 Fix #295 Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/298 Co-authored-by: Alex Co-committed-by: Alex --- src/rpc/Cargo.toml | 2 +- src/rpc/layout.rs | 56 +++++++++++++++++++++++ src/rpc/system.rs | 132 ++++++++++++++++++++++++++++++++++------------------- 3 files changed, 141 insertions(+), 49 deletions(-) (limited to 'src/rpc') diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index bed7f44a..73328993 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -15,11 +15,11 @@ path = "lib.rs" [dependencies] garage_util = { version = "0.7.0", path = "../util" } -garage_admin = { version = "0.7.0", path = "../admin" } arc-swap = "1.0" bytes = "1.0" gethostname = "0.2" +git-version = "0.3.4" hex = "0.4" tracing = "0.1.30" rand = "0.8" diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs index b9c02c21..f517f36f 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout.rs @@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize}; use garage_util::crdt::{AutoCrdt, Crdt, LwwMap}; use garage_util::data::*; +use garage_util::error::*; use crate::ring::*; @@ -100,6 +101,61 @@ impl ClusterLayout { } } + pub fn apply_staged_changes(mut self, version: Option) -> Result { + match version { + None => { + let error = r#" +Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout. +To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes. + "#; + return Err(Error::Message(error.into())); + } + Some(v) => { + if v != self.version + 1 { + return Err(Error::Message("Invalid new layout version".into())); + } + } + } + + self.roles.merge(&self.staging); + self.roles.retain(|(_, _, v)| v.0.is_some()); + + if !self.calculate_partition_assignation() { + return Err(Error::Message("Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter).".into())); + } + + self.staging.clear(); + self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); + + self.version += 1; + + Ok(self) + } + + pub fn revert_staged_changes(mut self, version: Option) -> Result { + match version { + None => { + let error = r#" +Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout. +To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes. + "#; + return Err(Error::Message(error.into())); + } + Some(v) => { + if v != self.version + 1 { + return Err(Error::Message("Invalid new layout version".into())); + } + } + } + + self.staging.clear(); + self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]); + + self.version += 1; + + Ok(self) + } + /// Returns a list of IDs of nodes that currently have /// a role in the cluster pub fn node_ids(&self) -> &[Uuid] { diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 68d94ea5..1d7c3ea4 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -312,6 +312,84 @@ impl System { ); } + // ---- Administrative operations (directly available and + // also available through RPC) ---- + + pub fn garage_version(&self) -> &'static str { + option_env!("GIT_VERSION").unwrap_or(git_version::git_version!( + prefix = "git:", + cargo_prefix = "cargo:", + fallback = "unknown" + )) + } + + pub fn get_known_nodes(&self) -> Vec { + let node_status = self.node_status.read().unwrap(); + let known_nodes = self + .fullmesh + .get_peer_list() + .iter() + .map(|n| KnownNodeInfo { + id: n.id.into(), + addr: n.addr, + is_up: n.is_up(), + last_seen_secs_ago: n.last_seen.map(|t| (Instant::now() - t).as_secs()), + status: node_status + .get(&n.id.into()) + .cloned() + .map(|(_, st)| st) + .unwrap_or(NodeStatus { + hostname: "?".to_string(), + replication_factor: 0, + cluster_layout_version: 0, + cluster_layout_staging_hash: Hash::from([0u8; 32]), + }), + }) + .collect::>(); + known_nodes + } + + pub fn get_cluster_layout(&self) -> ClusterLayout { + self.ring.borrow().layout.clone() + } + + pub async fn update_cluster_layout( + self: &Arc, + layout: &ClusterLayout, + ) -> Result<(), Error> { + self.handle_advertise_cluster_layout(layout).await?; + Ok(()) + } + + pub async fn connect(&self, node: &str) -> Result<(), Error> { + let (pubkey, addrs) = parse_and_resolve_peer_addr(node).ok_or_else(|| { + Error::Message(format!( + "Unable to parse or resolve node specification: {}", + node + )) + })?; + let mut errors = vec![]; + for ip in addrs.iter() { + match self + .netapp + .clone() + .try_connect(*ip, pubkey) + .await + .err_context(CONNECT_ERROR_MESSAGE) + { + Ok(()) => return Ok(()), + Err(e) => { + errors.push((*ip, e)); + } + } + } + if errors.len() == 1 { + Err(Error::Message(errors[0].1.to_string())) + } else { + Err(Error::Message(format!("{:?}", errors))) + } + } + // ---- INTERNALS ---- async fn advertise_to_consul(self: Arc) -> Result<(), Error> { @@ -384,32 +462,11 @@ impl System { self.local_status.swap(Arc::new(new_si)); } + // --- RPC HANDLERS --- + async fn handle_connect(&self, node: &str) -> Result { - let (pubkey, addrs) = parse_and_resolve_peer_addr(node).ok_or_else(|| { - Error::Message(format!( - "Unable to parse or resolve node specification: {}", - node - )) - })?; - let mut errors = vec![]; - for ip in addrs.iter() { - match self - .netapp - .clone() - .try_connect(*ip, pubkey) - .await - .err_context(CONNECT_ERROR_MESSAGE) - { - Ok(()) => return Ok(SystemRpc::Ok), - Err(e) => { - errors.push((*ip, e)); - } - } - } - return Err(Error::Message(format!( - "Could not connect to specified peers. Errors: {:?}", - errors - ))); + self.connect(node).await?; + Ok(SystemRpc::Ok) } fn handle_pull_cluster_layout(&self) -> SystemRpc { @@ -418,28 +475,7 @@ impl System { } fn handle_get_known_nodes(&self) -> SystemRpc { - let node_status = self.node_status.read().unwrap(); - let known_nodes = self - .fullmesh - .get_peer_list() - .iter() - .map(|n| KnownNodeInfo { - id: n.id.into(), - addr: n.addr, - is_up: n.is_up(), - last_seen_secs_ago: n.last_seen.map(|t| (Instant::now() - t).as_secs()), - status: node_status - .get(&n.id.into()) - .cloned() - .map(|(_, st)| st) - .unwrap_or(NodeStatus { - hostname: "?".to_string(), - replication_factor: 0, - cluster_layout_version: 0, - cluster_layout_staging_hash: Hash::from([0u8; 32]), - }), - }) - .collect::>(); + let known_nodes = self.get_known_nodes(); SystemRpc::ReturnKnownNodes(known_nodes) } @@ -476,7 +512,7 @@ impl System { } async fn handle_advertise_cluster_layout( - self: Arc, + self: &Arc, adv: &ClusterLayout, ) -> Result { let update_ring = self.update_ring.lock().await; -- cgit v1.2.3 From 4f38cadf6e2963a652ed28327d1c2ccfa2ebb2b7 Mon Sep 17 00:00:00 2001 From: Alex Date: Fri, 8 Jul 2022 13:30:26 +0200 Subject: Background task manager (#332) - [x] New background worker trait - [x] Adapt all current workers to use new API - [x] Command to list currently running workers, and whether they are active, idle, or dead - [x] Error reporting - Optimizations - [x] Merkle updater: several items per iteration - [ ] Use `tokio::task::spawn_blocking` where appropriate so that CPU-intensive tasks don't block other things going on - scrub: - [x] have only one worker with a channel to start/pause/cancel - [x] automatic scrub - [x] ability to view and change tranquility from CLI - [x] persistence of a few info - [ ] Testing Co-authored-by: Alex Auvolat Reviewed-on: https://git.deuxfleurs.fr/Deuxfleurs/garage/pulls/332 Co-authored-by: Alex Co-committed-by: Alex --- src/rpc/system.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'src/rpc') diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 1d7c3ea4..f9f2970b 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::io::{Read, Write}; use std::net::{IpAddr, SocketAddr}; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; @@ -104,6 +104,9 @@ pub struct System { /// The job runner of this node pub background: Arc, + + /// Path to metadata directory + pub metadata_dir: PathBuf, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -295,6 +298,7 @@ impl System { ring, update_ring: Mutex::new(update_ring), background, + metadata_dir: config.metadata_dir.clone(), }); sys.system_endpoint.set_handler(sys.clone()); sys -- cgit v1.2.3 From 8e7e680afe39f48fe15f365c9ef3fee57596e119 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Jul 2022 15:20:00 +0200 Subject: First adaptation to WIP netapp with streaming body --- src/rpc/Cargo.toml | 5 ++-- src/rpc/rpc_helper.rs | 71 ++++++++++++++++++++++----------------------------- src/rpc/system.rs | 7 ++--- 3 files changed, 36 insertions(+), 47 deletions(-) (limited to 'src/rpc') diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 73328993..5d5151cd 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -46,9 +46,8 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi tokio-stream = { version = "0.1", features = ["net"] } opentelemetry = "0.17" -#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } -#netapp = { version = "0.4", path = "../../../netapp", features = ["telemetry"] } -netapp = { version = "0.4.4", features = ["telemetry"] } +#netapp = { version = "0.4.4", features = ["telemetry"] } +netapp = { version = "0.4.4", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] } hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] } diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 34717d3b..079cdc70 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -15,9 +15,9 @@ use opentelemetry::{ Context, }; -pub use netapp::endpoint::{Endpoint, EndpointHandler, Message as Rpc}; +pub use netapp::endpoint::{Endpoint, EndpointHandler}; +pub use netapp::message::{Message as Rpc, *}; use netapp::peering::fullmesh::FullMeshPeeringStrategy; -pub use netapp::proto::*; pub use netapp::{NetApp, NodeID}; use garage_util::background::BackgroundRunner; @@ -30,10 +30,8 @@ use crate::ring::Ring; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); -// Try to never have more than 200MB of outgoing requests -// buffered at the same time. Other requests are queued until -// space is freed. -const REQUEST_BUFFER_SIZE: usize = 200 * 1024 * 1024; +// Don't allow more than 100 concurrent outgoing RPCs. +const MAX_CONCURRENT_REQUESTS: usize = 100; /// Strategy to apply when making RPC #[derive(Copy, Clone)] @@ -95,7 +93,7 @@ impl RpcHelper { background: Arc, ring: watch::Receiver>, ) -> Self { - let sem = Arc::new(Semaphore::new(REQUEST_BUFFER_SIZE)); + let sem = Arc::new(Semaphore::new(MAX_CONCURRENT_REQUESTS)); let metrics = RpcMetrics::new(sem.clone()); @@ -109,29 +107,16 @@ impl RpcHelper { })) } - pub async fn call( + pub async fn call( &self, endpoint: &Endpoint, to: Uuid, - msg: M, - strat: RequestStrategy, - ) -> Result - where - M: Rpc>, - H: EndpointHandler, - { - self.call_arc(endpoint, to, Arc::new(msg), strat).await - } - - pub async fn call_arc( - &self, - endpoint: &Endpoint, - to: Uuid, - msg: Arc, + msg: N, strat: RequestStrategy, ) -> Result where M: Rpc>, + N: IntoReq + Send, H: EndpointHandler, { let metric_tags = [ @@ -140,11 +125,10 @@ impl RpcHelper { KeyValue::new("to", format!("{:?}", to)), ]; - let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32; let permit = self .0 .request_buffer_semaphore - .acquire_many(msg_size) + .acquire() .record_duration(&self.0.metrics.rpc_queueing_time, &metric_tags) .await?; @@ -152,7 +136,7 @@ impl RpcHelper { let node_id = to.into(); let rpc_call = endpoint - .call(&node_id, msg, strat.rs_priority) + .call_streaming(&node_id, msg, strat.rs_priority) .record_duration(&self.0.metrics.rpc_duration, &metric_tags); select! { @@ -162,7 +146,7 @@ impl RpcHelper { if res.is_err() { self.0.metrics.rpc_netapp_error_counter.add(1, &metric_tags); } - let res = res?; + let res = res?.into_msg(); if res.is_err() { self.0.metrics.rpc_garage_error_counter.add(1, &metric_tags); @@ -178,37 +162,41 @@ impl RpcHelper { } } - pub async fn call_many( + pub async fn call_many( &self, endpoint: &Endpoint, to: &[Uuid], - msg: M, + msg: N, strat: RequestStrategy, - ) -> Vec<(Uuid, Result)> + ) -> Result)>, Error> where M: Rpc>, + N: IntoReq, H: EndpointHandler, { - let msg = Arc::new(msg); + let msg = msg.into_req().map_err(netapp::error::Error::from)?; + let resps = join_all( to.iter() - .map(|to| self.call_arc(endpoint, *to, msg.clone(), strat)), + .map(|to| self.call(endpoint, *to, msg.clone(), strat)), ) .await; - to.iter() + Ok(to + .iter() .cloned() .zip(resps.into_iter()) - .collect::>() + .collect::>()) } - pub async fn broadcast( + pub async fn broadcast( &self, endpoint: &Endpoint, - msg: M, + msg: N, strat: RequestStrategy, - ) -> Vec<(Uuid, Result)> + ) -> Result)>, Error> where M: Rpc>, + N: IntoReq, H: EndpointHandler, { let to = self @@ -262,20 +250,21 @@ impl RpcHelper { .await } - async fn try_call_many_internal( + async fn try_call_many_internal( &self, endpoint: &Arc>, to: &[Uuid], - msg: M, + msg: N, strategy: RequestStrategy, quorum: usize, ) -> Result, Error> where M: Rpc> + 'static, + N: IntoReq, H: EndpointHandler + 'static, S: Send + 'static, { - let msg = Arc::new(msg); + let msg = msg.into_req().map_err(netapp::error::Error::from)?; // Build future for each request // They are not started now: they are added below in a FuturesUnordered @@ -285,7 +274,7 @@ impl RpcHelper { let msg = msg.clone(); let endpoint2 = endpoint.clone(); (to, async move { - self2.call_arc(&endpoint2, to, msg, strategy).await + self2.call(&endpoint2, to, msg, strategy).await }) }); diff --git a/src/rpc/system.rs b/src/rpc/system.rs index f9f2970b..04ef2f69 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -16,8 +16,8 @@ use tokio::sync::watch; use tokio::sync::Mutex; use netapp::endpoint::{Endpoint, EndpointHandler}; +use netapp::message::*; use netapp::peering::fullmesh::FullMeshPeeringStrategy; -use netapp::proto::*; use netapp::util::parse_and_resolve_peer_addr; use netapp::{NetApp, NetworkKey, NodeID, NodeKey}; @@ -544,7 +544,7 @@ impl System { SystemRpc::AdvertiseClusterLayout(layout), RequestStrategy::with_priority(PRIO_HIGH), ) - .await; + .await?; Ok(()) }); self.background.spawn(self.clone().save_cluster_layout()); @@ -559,7 +559,8 @@ impl System { self.update_local_status(); let local_status: NodeStatus = self.local_status.load().as_ref().clone(); - self.rpc + let _ = self + .rpc .broadcast( &self.system_endpoint, SystemRpc::AdvertiseStatus(local_status), -- cgit v1.2.3 From a35d4da721db3550a2833d8576d4283bc999e8df Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Jul 2022 16:45:45 +0200 Subject: update netapp to 0.5 --- src/rpc/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/rpc') diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 5d5151cd..5986b7bf 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -47,7 +47,7 @@ tokio-stream = { version = "0.1", features = ["net"] } opentelemetry = "0.17" #netapp = { version = "0.4.4", features = ["telemetry"] } -netapp = { version = "0.4.4", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] } +netapp = { version = "0.5.0", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] } hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] } -- cgit v1.2.3 From 605a630333c8ee60c55fe011a375c01277bba173 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Jul 2022 18:20:27 +0200 Subject: Use streaming in block manager --- src/rpc/rpc_helper.rs | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) (limited to 'src/rpc') diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 079cdc70..6e098446 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -15,10 +15,13 @@ use opentelemetry::{ Context, }; -pub use netapp::endpoint::{Endpoint, EndpointHandler}; -pub use netapp::message::{Message as Rpc, *}; +pub use netapp::endpoint::{Endpoint, EndpointHandler, StreamingEndpointHandler}; +use netapp::message::IntoReq; +pub use netapp::message::{ + Message as Rpc, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL, +}; use netapp::peering::fullmesh::FullMeshPeeringStrategy; -pub use netapp::{NetApp, NodeID}; +pub use netapp::{self, NetApp, NodeID}; use garage_util::background::BackgroundRunner; use garage_util::data::*; @@ -117,7 +120,7 @@ impl RpcHelper { where M: Rpc>, N: IntoReq + Send, - H: EndpointHandler, + H: StreamingEndpointHandler, { let metric_tags = [ KeyValue::new("rpc_endpoint", endpoint.path().to_string()), @@ -172,7 +175,7 @@ impl RpcHelper { where M: Rpc>, N: IntoReq, - H: EndpointHandler, + H: StreamingEndpointHandler, { let msg = msg.into_req().map_err(netapp::error::Error::from)?; @@ -197,7 +200,7 @@ impl RpcHelper { where M: Rpc>, N: IntoReq, - H: EndpointHandler, + H: StreamingEndpointHandler, { let to = self .0 @@ -211,16 +214,17 @@ impl RpcHelper { /// Make a RPC call to multiple servers, returning either a Vec of responses, /// or an error if quorum could not be reached due to too many errors - pub async fn try_call_many( + pub async fn try_call_many( &self, endpoint: &Arc>, to: &[Uuid], - msg: M, + msg: N, strategy: RequestStrategy, ) -> Result, Error> where M: Rpc> + 'static, - H: EndpointHandler + 'static, + N: IntoReq, + H: StreamingEndpointHandler + 'static, S: Send + 'static, { let quorum = strategy.rs_quorum.unwrap_or(to.len()); @@ -261,7 +265,7 @@ impl RpcHelper { where M: Rpc> + 'static, N: IntoReq, - H: EndpointHandler + 'static, + H: StreamingEndpointHandler + 'static, S: Send + 'static, { let msg = msg.into_req().map_err(netapp::error::Error::from)?; -- cgit v1.2.3 From e935861854deed5d1ca66767fc51d9849201a4dd Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 25 Jul 2022 18:19:35 +0200 Subject: Factor out node request order selection logic & use in manager --- src/rpc/rpc_helper.rs | 95 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 56 insertions(+), 39 deletions(-) (limited to 'src/rpc') diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 6e098446..ddabd636 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -292,47 +292,19 @@ impl RpcHelper { // to reach a quorum, priorizing nodes with the lowest latency. // When there are errors, we start new requests to compensate. - // Retrieve some status variables that we will use to sort requests - let peer_list = self.0.fullmesh.get_peer_list(); - let ring: Arc = self.0.ring.borrow().clone(); - let our_zone = match ring.layout.node_role(&self.0.our_node_id) { - Some(pc) => &pc.zone, - None => "", - }; - - // Augment requests with some information used to sort them. - // The tuples are as follows: - // (is another node?, is another zone?, latency, node ID, request future) - // We store all of these tuples in a vec that we can sort. - // By sorting this vec, we priorize ourself, then nodes in the same zone, - // and within a same zone we priorize nodes with the lowest latency. - let mut requests = requests - .map(|(to, fut)| { - let peer_zone = match ring.layout.node_role(&to) { - Some(pc) => &pc.zone, - None => "", - }; - let peer_avg_ping = peer_list - .iter() - .find(|x| x.id.as_ref() == to.as_slice()) - .and_then(|pi| pi.avg_ping) - .unwrap_or_else(|| Duration::from_secs(1)); - ( - to != self.0.our_node_id, - peer_zone != our_zone, - peer_avg_ping, - to, - fut, - ) - }) + // Reorder requests to priorize closeness / low latency + let request_order = self.request_order(to); + let mut ord_requests = vec![(); request_order.len()] + .into_iter() + .map(|_| None) .collect::>(); - - // Sort requests by (priorize ourself, priorize same zone, priorize low latency) - requests - .sort_by_key(|(diffnode, diffzone, ping, _to, _fut)| (*diffnode, *diffzone, *ping)); + for (to, fut) in requests { + let i = request_order.iter().position(|x| *x == to).unwrap(); + ord_requests[i] = Some((to, fut)); + } // Make an iterator to take requests in their sorted order - let mut requests = requests.into_iter(); + let mut requests = ord_requests.into_iter().map(Option::unwrap); // resp_stream will contain all of the requests that are currently in flight. // (for the moment none, they will be added in the loop below) @@ -343,7 +315,7 @@ impl RpcHelper { // If the current set of requests that are running is not enough to possibly // reach quorum, start some new requests. while successes.len() + resp_stream.len() < quorum { - if let Some((_, _, _, req_to, fut)) = requests.next() { + if let Some((req_to, fut)) = requests.next() { let tracer = opentelemetry::global::tracer("garage"); let span = tracer.start(format!("RPC to {:?}", req_to)); resp_stream.push(tokio::spawn( @@ -413,4 +385,49 @@ impl RpcHelper { Err(Error::Quorum(quorum, successes.len(), to.len(), errors)) } } + + pub fn request_order(&self, nodes: &[Uuid]) -> Vec { + // Retrieve some status variables that we will use to sort requests + let peer_list = self.0.fullmesh.get_peer_list(); + let ring: Arc = self.0.ring.borrow().clone(); + let our_zone = match ring.layout.node_role(&self.0.our_node_id) { + Some(pc) => &pc.zone, + None => "", + }; + + // Augment requests with some information used to sort them. + // The tuples are as follows: + // (is another node?, is another zone?, latency, node ID, request future) + // We store all of these tuples in a vec that we can sort. + // By sorting this vec, we priorize ourself, then nodes in the same zone, + // and within a same zone we priorize nodes with the lowest latency. + let mut nodes = nodes + .iter() + .map(|to| { + let peer_zone = match ring.layout.node_role(&to) { + Some(pc) => &pc.zone, + None => "", + }; + let peer_avg_ping = peer_list + .iter() + .find(|x| x.id.as_ref() == to.as_slice()) + .and_then(|pi| pi.avg_ping) + .unwrap_or_else(|| Duration::from_secs(1)); + ( + *to != self.0.our_node_id, + peer_zone != our_zone, + peer_avg_ping, + *to, + ) + }) + .collect::>(); + + // Sort requests by (priorize ourself, priorize same zone, priorize low latency) + nodes.sort_by_key(|(diffnode, diffzone, ping, _to)| (*diffnode, *diffzone, *ping)); + + nodes + .into_iter() + .map(|(_, _, _, to)| to) + .collect::>() + } } -- cgit v1.2.3 From 2c7bae935ac68acab831fe86e5330d3c9a84a953 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 10 Aug 2022 12:18:44 +0200 Subject: Configure structopt to report the right version By default, structopt reports the value provided by the env var CARGO_PKG_VERSION, feeded by Cargo when reading Cargo.toml. However for Garage we use a versioning based on git, so we often report a version that is behind the real version. In this commit, we create garage_util::version::garage() that reports the right version and configure all structopt subcommands to call this function instead of using the env var. --- src/rpc/Cargo.toml | 1 - src/rpc/system.rs | 7 ++----- 2 files changed, 2 insertions(+), 6 deletions(-) (limited to 'src/rpc') diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 73328993..80a1975c 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -19,7 +19,6 @@ garage_util = { version = "0.7.0", path = "../util" } arc-swap = "1.0" bytes = "1.0" gethostname = "0.2" -git-version = "0.3.4" hex = "0.4" tracing = "0.1.30" rand = "0.8" diff --git a/src/rpc/system.rs b/src/rpc/system.rs index f9f2970b..fbfbbf56 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -27,6 +27,7 @@ use garage_util::data::*; use garage_util::error::*; use garage_util::persister::Persister; use garage_util::time::*; +use garage_util::version; use crate::consul::*; #[cfg(feature = "kubernetes-discovery")] @@ -320,11 +321,7 @@ impl System { // also available through RPC) ---- pub fn garage_version(&self) -> &'static str { - option_env!("GIT_VERSION").unwrap_or(git_version::git_version!( - prefix = "git:", - cargo_prefix = "cargo:", - fallback = "unknown" - )) + version::garage() } pub fn get_known_nodes(&self) -> Vec { -- cgit v1.2.3 From 322dafc761295df45c081183c5fc059a750a3249 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 29 Aug 2022 17:32:45 +0200 Subject: Try to fix clippy --- src/rpc/rpc_helper.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/rpc') diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index ddabd636..216fffd4 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -404,7 +404,7 @@ impl RpcHelper { let mut nodes = nodes .iter() .map(|to| { - let peer_zone = match ring.layout.node_role(&to) { + let peer_zone = match ring.layout.node_role(to) { Some(pc) => &pc.zone, None => "", }; -- cgit v1.2.3 From bc977f9a7a7a5bd87ccf5fe96d64b397591f8ba0 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 1 Sep 2022 12:58:20 +0200 Subject: Update to Netapp with OrderTag support and exploit OrderTags --- src/rpc/rpc_helper.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/rpc') diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 216fffd4..6c79c502 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -18,7 +18,7 @@ use opentelemetry::{ pub use netapp::endpoint::{Endpoint, EndpointHandler, StreamingEndpointHandler}; use netapp::message::IntoReq; pub use netapp::message::{ - Message as Rpc, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL, + Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL, }; use netapp::peering::fullmesh::FullMeshPeeringStrategy; pub use netapp::{self, NetApp, NodeID}; -- cgit v1.2.3 From df094bd8075332bb765b8b44c9b19cf2485e9ca8 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 1 Sep 2022 16:30:44 +0200 Subject: Less strict timeouts --- src/rpc/rpc_helper.rs | 2 +- src/rpc/system.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) (limited to 'src/rpc') diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 6c79c502..e9575261 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -31,7 +31,7 @@ use garage_util::metrics::RecordDuration; use crate::metrics::RpcMetrics; use crate::ring::Ring; -const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); // Don't allow more than 100 concurrent outgoing RPCs. const MAX_CONCURRENT_REQUESTS: usize = 100; diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 5858660e..d7ef2140 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -38,7 +38,7 @@ use crate::rpc_helper::*; const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60); const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10); -const PING_TIMEOUT: Duration = Duration::from_secs(2); +const SYSTEM_RPC_TIMEOUT: Duration = Duration::from_secs(15); /// Version tag used for version check upon Netapp connection pub const GARAGE_VERSION_TAG: u64 = 0x6761726167650007; // garage 0x0007 @@ -561,7 +561,7 @@ impl System { .broadcast( &self.system_endpoint, SystemRpc::AdvertiseStatus(local_status), - RequestStrategy::with_priority(PRIO_HIGH).with_timeout(PING_TIMEOUT), + RequestStrategy::with_priority(PRIO_HIGH).with_timeout(SYSTEM_RPC_TIMEOUT), ) .await; @@ -685,7 +685,7 @@ impl System { &self.system_endpoint, peer, SystemRpc::PullClusterLayout, - RequestStrategy::with_priority(PRIO_HIGH).with_timeout(PING_TIMEOUT), + RequestStrategy::with_priority(PRIO_HIGH).with_timeout(SYSTEM_RPC_TIMEOUT), ) .await; if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp { -- cgit v1.2.3 From 99b532b85bf35b5acf621c229fb991825f3d994c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 1 Sep 2022 16:35:43 +0200 Subject: Apply PRIO_SECONDARY to block data transfers --- src/rpc/rpc_helper.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/rpc') diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index e9575261..aa204c5e 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -18,7 +18,7 @@ use opentelemetry::{ pub use netapp::endpoint::{Endpoint, EndpointHandler, StreamingEndpointHandler}; use netapp::message::IntoReq; pub use netapp::message::{ - Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL, + Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL, PRIO_SECONDARY }; use netapp::peering::fullmesh::FullMeshPeeringStrategy; pub use netapp::{self, NetApp, NodeID}; -- cgit v1.2.3 From 1ef87ac4cb676113e86fc16a9eb27546d9a737bd Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 2 Sep 2022 13:38:29 +0200 Subject: cargo fmt --- src/rpc/rpc_helper.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src/rpc') diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index aa204c5e..19abb4c5 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -18,7 +18,8 @@ use opentelemetry::{ pub use netapp::endpoint::{Endpoint, EndpointHandler, StreamingEndpointHandler}; use netapp::message::IntoReq; pub use netapp::message::{ - Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL, PRIO_SECONDARY + Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL, + PRIO_SECONDARY, }; use netapp::peering::fullmesh::FullMeshPeeringStrategy; pub use netapp::{self, NetApp, NodeID}; -- cgit v1.2.3 From 6226f5ceca7828d096890c3dbc5b9fbc3f7c4b14 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 2 Sep 2022 14:33:12 +0200 Subject: Update to netapp 0.4.5 - fixed ping --- src/rpc/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/rpc') diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 80a1975c..5757fe8d 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -47,7 +47,7 @@ opentelemetry = "0.17" #netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } #netapp = { version = "0.4", path = "../../../netapp", features = ["telemetry"] } -netapp = { version = "0.4.4", features = ["telemetry"] } +netapp = { version = "0.4.5", features = ["telemetry"] } hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] } -- cgit v1.2.3 From a6e40b75eabf0d6a863a91ae17f7d0ae20582d9e Mon Sep 17 00:00:00 2001 From: Jakub Jirutka Date: Sat, 3 Sep 2022 18:37:24 +0200 Subject: Add feature "system-libs" to enable linking against system libraries If this feature is enabled, libsodium-sys and zstd-sys will link dynamically against system-provided libraries instead of building and linking statically the bundled (possibly outdated and vulnerable) copies of them. This feature is intended mainly for linux package maintainers. --- src/rpc/Cargo.toml | 1 + 1 file changed, 1 insertion(+) (limited to 'src/rpc') diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 80a1975c..309e3fc2 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -54,3 +54,4 @@ hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] } [features] kubernetes-discovery = [ "kube", "k8s-openapi", "openssl", "schemars" ] +system-libs = [ "sodiumoxide/use-pkg-config" ] -- cgit v1.2.3 From 48ffaaadfc790142ed9556f5227913fa8c32d2ed Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 6 Sep 2022 16:47:56 +0200 Subject: Bump versions to 0.8.0 (compatibility is broken already) --- src/rpc/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/rpc') diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 309e3fc2..21841a02 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_rpc" -version = "0.7.0" +version = "0.8.0" authors = ["Alex Auvolat "] edition = "2018" license = "AGPL-3.0" @@ -14,7 +14,7 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -garage_util = { version = "0.7.0", path = "../util" } +garage_util = { version = "0.8.0", path = "../util" } arc-swap = "1.0" bytes = "1.0" -- cgit v1.2.3 From db61f41030678c5756c844c8aa41a210c658769e Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 7 Sep 2022 11:59:56 +0200 Subject: Move GIT_VERSION injection later in build chain to reduce build times --- src/rpc/system.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) (limited to 'src/rpc') diff --git a/src/rpc/system.rs b/src/rpc/system.rs index fbfbbf56..d621f59f 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -27,7 +27,6 @@ use garage_util::data::*; use garage_util::error::*; use garage_util::persister::Persister; use garage_util::time::*; -use garage_util::version; use crate::consul::*; #[cfg(feature = "kubernetes-discovery")] @@ -40,8 +39,10 @@ const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60); const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10); const PING_TIMEOUT: Duration = Duration::from_secs(2); -/// Version tag used for version check upon Netapp connection -pub const GARAGE_VERSION_TAG: u64 = 0x6761726167650007; // garage 0x0007 +/// Version tag used for version check upon Netapp connection. +/// Cluster nodes with different version tags are deemed +/// incompatible and will refuse to connect. +pub const GARAGE_VERSION_TAG: u64 = 0x6761726167650008; // garage 0x0008 /// RPC endpoint used for calls related to membership pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc"; @@ -320,10 +321,6 @@ impl System { // ---- Administrative operations (directly available and // also available through RPC) ---- - pub fn garage_version(&self) -> &'static str { - version::garage() - } - pub fn get_known_nodes(&self) -> Vec { let node_status = self.node_status.read().unwrap(); let known_nodes = self -- cgit v1.2.3 From 28a4af73ca8c0f82314157939fc98c46f338e84a Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 13 Sep 2022 13:11:44 +0200 Subject: Use netapp 0.5 published from crates.io --- src/rpc/Cargo.toml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'src/rpc') diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index d7136401..079cfe34 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -45,8 +45,7 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi tokio-stream = { version = "0.1", features = ["net"] } opentelemetry = "0.17" -#netapp = { version = "0.4.5", features = ["telemetry"] } -netapp = { version = "0.5.0", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] } +netapp = { version = "0.5.0", features = ["telemetry"] } hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] } -- cgit v1.2.3 From 44733474bb6c9021c92b59e5c349b4b7ef71409a Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 13 Sep 2022 16:01:55 +0200 Subject: Remove/change println! in server code (fix #358) --- src/rpc/kubernetes.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/rpc') diff --git a/src/rpc/kubernetes.rs b/src/rpc/kubernetes.rs index 939a0eed..197245aa 100644 --- a/src/rpc/kubernetes.rs +++ b/src/rpc/kubernetes.rs @@ -56,7 +56,7 @@ pub async fn get_kubernetes_nodes( let mut ret = Vec::with_capacity(nodes.items.len()); for node in nodes { - println!("Found Pod: {:?}", node.metadata.name); + info!("Found Pod: {:?}", node.metadata.name); let pubkey = &node .metadata -- cgit v1.2.3 From ab722cb40f5aacf661a280b7eb025acd3aefc1bb Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 13 Sep 2022 16:22:23 +0200 Subject: Add checks on replication_factor of layouts we use (fix #363, fix #364) --- src/rpc/system.rs | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) (limited to 'src/rpc') diff --git a/src/rpc/system.rs b/src/rpc/system.rs index c0e70c61..228b66a4 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -198,7 +198,7 @@ impl System { background: Arc, replication_factor: usize, config: &Config, - ) -> Arc { + ) -> Result, Error> { let node_key = gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID"); info!( @@ -206,11 +206,21 @@ impl System { hex::encode(&node_key.public_key()[..8]) ); - let persist_cluster_layout = Persister::new(&config.metadata_dir, "cluster_layout"); + let persist_cluster_layout: Persister = + Persister::new(&config.metadata_dir, "cluster_layout"); let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list"); let cluster_layout = match persist_cluster_layout.load() { - Ok(x) => x, + Ok(x) => { + if x.replication_factor != replication_factor { + return Err(Error::Message(format!( + "Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.", + x.replication_factor, + replication_factor + ))); + } + x + } Err(e) => { info!( "No valid previous cluster layout stored ({}), starting fresh.", @@ -303,7 +313,7 @@ impl System { metadata_dir: config.metadata_dir.clone(), }); sys.system_endpoint.set_handler(sys.clone()); - sys + Ok(sys) } /// Perform bootstraping, starting the ping loop @@ -485,7 +495,7 @@ impl System { let local_info = self.local_status.load(); if local_info.replication_factor < info.replication_factor { - error!("Some node have a higher replication factor ({}) than this one ({}). This is not supported and might lead to bugs", + error!("Some node have a higher replication factor ({}) than this one ({}). This is not supported and will lead to data corruption. Shutting down for safety.", info.replication_factor, local_info.replication_factor); std::process::exit(1); @@ -513,6 +523,16 @@ impl System { self: &Arc, adv: &ClusterLayout, ) -> Result { + if adv.replication_factor != self.replication_factor { + let msg = format!( + "Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.", + adv.replication_factor, + self.replication_factor + ); + error!("{}", msg); + return Err(Error::Message(msg)); + } + let update_ring = self.update_ring.lock().await; let mut layout: ClusterLayout = self.ring.borrow().layout.clone(); -- cgit v1.2.3 From e46dc2a8ef8a12e49aed3883b34b538b5f65ca31 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Sep 2022 16:09:38 +0200 Subject: Allow for hostnames in bootstrap_peers and rpc_public_addr (fix #353) --- src/rpc/Cargo.toml | 2 +- src/rpc/system.rs | 73 ++++++++++++++++++++++++++++++++++++++++++------------ 2 files changed, 58 insertions(+), 17 deletions(-) (limited to 'src/rpc') diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 079cfe34..e51f1f73 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -45,7 +45,7 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi tokio-stream = { version = "0.1", features = ["net"] } opentelemetry = "0.17" -netapp = { version = "0.5.0", features = ["telemetry"] } +netapp = { version = "0.5.1", features = ["telemetry"] } hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] } diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 228b66a4..2c6136a8 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -18,7 +18,7 @@ use tokio::sync::Mutex; use netapp::endpoint::{Endpoint, EndpointHandler}; use netapp::message::*; use netapp::peering::fullmesh::FullMeshPeeringStrategy; -use netapp::util::parse_and_resolve_peer_addr; +use netapp::util::parse_and_resolve_peer_addr_async; use netapp::{NetApp, NetworkKey, NodeID, NodeKey}; use garage_util::background::BackgroundRunner; @@ -92,7 +92,7 @@ pub struct System { rpc_listen_addr: SocketAddr, rpc_public_addr: Option, - bootstrap_peers: Vec<(NodeID, SocketAddr)>, + bootstrap_peers: Vec, consul_discovery: Option, #[cfg(feature = "kubernetes-discovery")] @@ -242,8 +242,29 @@ impl System { let ring = Ring::new(cluster_layout, replication_factor); let (update_ring, ring) = watch::channel(Arc::new(ring)); - let rpc_public_addr = match config.rpc_public_addr { - Some(a) => Some(a), + let rpc_public_addr = match &config.rpc_public_addr { + Some(a_str) => { + use std::net::ToSocketAddrs; + match a_str.to_socket_addrs() { + Err(e) => { + error!( + "Cannot resolve rpc_public_addr {} from config file: {}.", + a_str, e + ); + None + } + Ok(a) => { + let a = a.collect::>(); + if a.is_empty() { + error!("rpc_public_addr {} resolve to no known IP address", a_str); + } + if a.len() > 1 { + warn!("Multiple possible resolutions for rpc_public_addr: {:?}. Taking the first one.", a); + } + a.into_iter().next() + } + } + } None => { let addr = get_default_ip().map(|ip| SocketAddr::new(ip, config.rpc_bind_addr.port())); @@ -253,13 +274,12 @@ impl System { addr } }; + if rpc_public_addr.is_none() { + warn!("This Garage node does not know its publicly reachable RPC address, this might hamper intra-cluster communication."); + } let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key); - let fullmesh = FullMeshPeeringStrategy::new( - netapp.clone(), - config.bootstrap_peers.clone(), - rpc_public_addr, - ); + let fullmesh = FullMeshPeeringStrategy::new(netapp.clone(), vec![], rpc_public_addr); let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into()); @@ -370,12 +390,14 @@ impl System { } pub async fn connect(&self, node: &str) -> Result<(), Error> { - let (pubkey, addrs) = parse_and_resolve_peer_addr(node).ok_or_else(|| { - Error::Message(format!( - "Unable to parse or resolve node specification: {}", - node - )) - })?; + let (pubkey, addrs) = parse_and_resolve_peer_addr_async(node) + .await + .ok_or_else(|| { + Error::Message(format!( + "Unable to parse or resolve node specification: {}", + node + )) + })?; let mut errors = vec![]; for ip in addrs.iter() { match self @@ -604,7 +626,7 @@ impl System { if not_configured || no_peers || bad_peers { info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers); - let mut ping_list = self.bootstrap_peers.clone(); + let mut ping_list = resolve_peers(&self.bootstrap_peers).await; // Add peer list from list stored on disk if let Ok(peers) = self.persist_peer_list.load_async().await { @@ -735,6 +757,25 @@ fn get_default_ip() -> Option { .map(|a| a.ip()) } +async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> { + let mut ret = vec![]; + + for peer in peers.iter() { + match parse_and_resolve_peer_addr_async(peer).await { + Some((pubkey, addrs)) => { + for ip in addrs { + ret.push((pubkey, ip)); + } + } + None => { + warn!("Unable to parse and/or resolve peer hostname {}", peer); + } + } + } + + ret +} + struct ConsulDiscoveryParam { consul_host: String, service_name: String, -- cgit v1.2.3 From 56592e18538b379ccaaa7b7c1990a599ac83b191 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 19 Sep 2022 20:12:19 +0200 Subject: RPC performance changes - configurable ping timeout - single, much higher, configurable RPC timeout - no more concurrency semaphore --- src/rpc/Cargo.toml | 2 +- src/rpc/metrics.rs | 19 +---------------- src/rpc/rpc_helper.rs | 57 +++++++++++++++++++++++++-------------------------- src/rpc/system.rs | 16 +++++++++++---- 4 files changed, 42 insertions(+), 52 deletions(-) (limited to 'src/rpc') diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index e51f1f73..d61acea4 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -45,7 +45,7 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi tokio-stream = { version = "0.1", features = ["net"] } opentelemetry = "0.17" -netapp = { version = "0.5.1", features = ["telemetry"] } +netapp = { version = "0.5.2", features = ["telemetry"] } hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] } diff --git a/src/rpc/metrics.rs b/src/rpc/metrics.rs index c900518c..61f8fa79 100644 --- a/src/rpc/metrics.rs +++ b/src/rpc/metrics.rs @@ -1,31 +1,18 @@ -use std::sync::Arc; - use opentelemetry::{global, metrics::*}; -use tokio::sync::Semaphore; /// TableMetrics reference all counter used for metrics pub struct RpcMetrics { - pub(crate) _rpc_available_permits: ValueObserver, - pub(crate) rpc_counter: Counter, pub(crate) rpc_timeout_counter: Counter, pub(crate) rpc_netapp_error_counter: Counter, pub(crate) rpc_garage_error_counter: Counter, pub(crate) rpc_duration: ValueRecorder, - pub(crate) rpc_queueing_time: ValueRecorder, } impl RpcMetrics { - pub fn new(sem: Arc) -> Self { + pub fn new() -> Self { let meter = global::meter("garage_rpc"); RpcMetrics { - _rpc_available_permits: meter - .u64_value_observer("rpc.available_permits", move |observer| { - observer.observe(sem.available_permits() as u64, &[]) - }) - .with_description("Number of available RPC permits") - .init(), - rpc_counter: meter .u64_counter("rpc.request_counter") .with_description("Number of RPC requests emitted") @@ -46,10 +33,6 @@ impl RpcMetrics { .f64_value_recorder("rpc.duration") .with_description("Duration of RPCs") .init(), - rpc_queueing_time: meter - .f64_value_recorder("rpc.queueing_time") - .with_description("Time RPC requests were queued for before being sent") - .init(), } } } diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 19abb4c5..857ed620 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -7,7 +7,7 @@ use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::StreamExt; use futures_util::future::FutureExt; use tokio::select; -use tokio::sync::{watch, Semaphore}; +use tokio::sync::watch; use opentelemetry::KeyValue; use opentelemetry::{ @@ -32,32 +32,30 @@ use garage_util::metrics::RecordDuration; use crate::metrics::RpcMetrics; use crate::ring::Ring; -const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); - -// Don't allow more than 100 concurrent outgoing RPCs. -const MAX_CONCURRENT_REQUESTS: usize = 100; +// Default RPC timeout = 5 minutes +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300); /// Strategy to apply when making RPC #[derive(Copy, Clone)] pub struct RequestStrategy { - /// Max time to wait for reponse - pub rs_timeout: Duration, /// Min number of response to consider the request successful pub rs_quorum: Option, /// Should requests be dropped after enough response are received pub rs_interrupt_after_quorum: bool, /// Request priority pub rs_priority: RequestPriority, + /// Deactivate timeout for this request + pub rs_no_timeout: bool, } impl RequestStrategy { /// Create a RequestStrategy with default timeout and not interrupting when quorum reached pub fn with_priority(prio: RequestPriority) -> Self { RequestStrategy { - rs_timeout: DEFAULT_TIMEOUT, rs_quorum: None, rs_interrupt_after_quorum: false, rs_priority: prio, + rs_no_timeout: false, } } /// Set quorum to be reached for request @@ -65,17 +63,17 @@ impl RequestStrategy { self.rs_quorum = Some(quorum); self } - /// Set timeout of the strategy - pub fn with_timeout(mut self, timeout: Duration) -> Self { - self.rs_timeout = timeout; - self - } /// Set if requests can be dropped after quorum has been reached /// In general true for read requests, and false for write pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self { self.rs_interrupt_after_quorum = interrupt; self } + /// Deactivate timeout for this request + pub fn without_timeout(mut self) -> Self { + self.rs_no_timeout = true; + self + } } #[derive(Clone)] @@ -86,8 +84,8 @@ struct RpcHelperInner { fullmesh: Arc, background: Arc, ring: watch::Receiver>, - request_buffer_semaphore: Arc, metrics: RpcMetrics, + rpc_timeout: Duration, } impl RpcHelper { @@ -96,21 +94,24 @@ impl RpcHelper { fullmesh: Arc, background: Arc, ring: watch::Receiver>, + rpc_timeout: Option, ) -> Self { - let sem = Arc::new(Semaphore::new(MAX_CONCURRENT_REQUESTS)); - - let metrics = RpcMetrics::new(sem.clone()); + let metrics = RpcMetrics::new(); Self(Arc::new(RpcHelperInner { our_node_id, fullmesh, background, ring, - request_buffer_semaphore: sem, metrics, + rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT), })) } + pub fn rpc_timeout(&self) -> Duration { + self.0.rpc_timeout + } + pub async fn call( &self, endpoint: &Endpoint, @@ -129,13 +130,6 @@ impl RpcHelper { KeyValue::new("to", format!("{:?}", to)), ]; - let permit = self - .0 - .request_buffer_semaphore - .acquire() - .record_duration(&self.0.metrics.rpc_queueing_time, &metric_tags) - .await?; - self.0.metrics.rpc_counter.add(1, &metric_tags); let node_id = to.into(); @@ -143,10 +137,16 @@ impl RpcHelper { .call_streaming(&node_id, msg, strat.rs_priority) .record_duration(&self.0.metrics.rpc_duration, &metric_tags); + let timeout = async { + if strat.rs_no_timeout { + futures::future::pending().await + } else { + tokio::time::sleep(self.0.rpc_timeout).await + } + }; + select! { res = rpc_call => { - drop(permit); - if res.is_err() { self.0.metrics.rpc_netapp_error_counter.add(1, &metric_tags); } @@ -158,8 +158,7 @@ impl RpcHelper { Ok(res?) } - _ = tokio::time::sleep(strat.rs_timeout) => { - drop(permit); + () = timeout => { self.0.metrics.rpc_timeout_counter.add(1, &metric_tags); Err(Error::Timeout) } diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 2c6136a8..f8121193 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -37,7 +37,6 @@ use crate::rpc_helper::*; const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60); const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10); -const SYSTEM_RPC_TIMEOUT: Duration = Duration::from_secs(15); /// Version tag used for version check upon Netapp connection. /// Cluster nodes with different version tags are deemed @@ -280,6 +279,9 @@ impl System { let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key); let fullmesh = FullMeshPeeringStrategy::new(netapp.clone(), vec![], rpc_public_addr); + if let Some(ping_timeout) = config.rpc_ping_timeout_msec { + fullmesh.set_ping_timeout_millis(ping_timeout); + } let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into()); @@ -317,7 +319,13 @@ impl System { node_status: RwLock::new(HashMap::new()), netapp: netapp.clone(), fullmesh: fullmesh.clone(), - rpc: RpcHelper::new(netapp.id.into(), fullmesh, background.clone(), ring.clone()), + rpc: RpcHelper::new( + netapp.id.into(), + fullmesh, + background.clone(), + ring.clone(), + config.rpc_timeout_msec.map(Duration::from_millis), + ), system_endpoint, replication_factor, rpc_listen_addr: config.rpc_bind_addr, @@ -600,7 +608,7 @@ impl System { .broadcast( &self.system_endpoint, SystemRpc::AdvertiseStatus(local_status), - RequestStrategy::with_priority(PRIO_HIGH).with_timeout(SYSTEM_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_HIGH), ) .await; @@ -724,7 +732,7 @@ impl System { &self.system_endpoint, peer, SystemRpc::PullClusterLayout, - RequestStrategy::with_priority(PRIO_HIGH).with_timeout(SYSTEM_RPC_TIMEOUT), + RequestStrategy::with_priority(PRIO_HIGH), ) .await; if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp { -- cgit v1.2.3 From ded444f6c96f8ab991e762f65760b42e4d64246c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 20 Sep 2022 16:01:41 +0200 Subject: Ability to have custom timeouts in request strategy (not used) --- src/rpc/rpc_helper.rs | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) (limited to 'src/rpc') diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 857ed620..949aced6 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -44,8 +44,15 @@ pub struct RequestStrategy { pub rs_interrupt_after_quorum: bool, /// Request priority pub rs_priority: RequestPriority, - /// Deactivate timeout for this request - pub rs_no_timeout: bool, + /// Custom timeout for this request + rs_timeout: Timeout, +} + +#[derive(Copy, Clone)] +enum Timeout { + None, + Default, + Custom(Duration), } impl RequestStrategy { @@ -55,7 +62,7 @@ impl RequestStrategy { rs_quorum: None, rs_interrupt_after_quorum: false, rs_priority: prio, - rs_no_timeout: false, + rs_timeout: Timeout::Default, } } /// Set quorum to be reached for request @@ -71,7 +78,12 @@ impl RequestStrategy { } /// Deactivate timeout for this request pub fn without_timeout(mut self) -> Self { - self.rs_no_timeout = true; + self.rs_timeout = Timeout::None; + self + } + /// Set custom timeout for this request + pub fn with_custom_timeout(mut self, timeout: Duration) -> Self { + self.rs_timeout = Timeout::Custom(timeout); self } } @@ -138,10 +150,10 @@ impl RpcHelper { .record_duration(&self.0.metrics.rpc_duration, &metric_tags); let timeout = async { - if strat.rs_no_timeout { - futures::future::pending().await - } else { - tokio::time::sleep(self.0.rpc_timeout).await + match strat.rs_timeout { + Timeout::None => futures::future::pending().await, + Timeout::Default => tokio::time::sleep(self.0.rpc_timeout).await, + Timeout::Custom(t) => tokio::time::sleep(t).await, } }; @@ -412,7 +424,7 @@ impl RpcHelper { .iter() .find(|x| x.id.as_ref() == to.as_slice()) .and_then(|pi| pi.avg_ping) - .unwrap_or_else(|| Duration::from_secs(1)); + .unwrap_or_else(|| Duration::from_secs(10)); ( *to != self.0.our_node_id, peer_zone != our_zone, -- cgit v1.2.3 From ad917ffd3f76316e48b89ff17e2f8a600a269481 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 29 Sep 2022 15:53:54 +0200 Subject: Fix instant substractions that might have panicked --- src/rpc/system.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'src/rpc') diff --git a/src/rpc/system.rs b/src/rpc/system.rs index f8121193..9e0bfa11 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -369,7 +369,9 @@ impl System { id: n.id.into(), addr: n.addr, is_up: n.is_up(), - last_seen_secs_ago: n.last_seen.map(|t| (Instant::now() - t).as_secs()), + last_seen_secs_ago: n + .last_seen + .map(|t| (Instant::now().saturating_duration_since(t)).as_secs()), status: node_status .get(&n.id.into()) .cloned() -- cgit v1.2.3