diff options
Diffstat (limited to 'src/rpc')
-rw-r--r-- | src/rpc/Cargo.toml | 3 | ||||
-rw-r--r-- | src/rpc/consul.rs | 2 | ||||
-rw-r--r-- | src/rpc/kubernetes.rs | 2 | ||||
-rw-r--r-- | src/rpc/layout/manager.rs | 10 | ||||
-rw-r--r-- | src/rpc/rpc_helper.rs | 24 | ||||
-rw-r--r-- | src/rpc/system.rs | 32 |
6 files changed, 36 insertions, 37 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 1b2867a5..3e7ac635 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -17,6 +17,7 @@ path = "lib.rs" format_table.workspace = true garage_db.workspace = true garage_util.workspace = true +garage_net.workspace = true arc-swap.workspace = true bytes.workspace = true @@ -49,8 +50,6 @@ tokio.workspace = true tokio-stream.workspace = true opentelemetry.workspace = true -netapp.workspace = true - [features] kubernetes-discovery = [ "kube", "k8s-openapi", "schemars" ] consul-discovery = [ "reqwest", "err-derive" ] diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs index 71fd71b0..f088bf3f 100644 --- a/src/rpc/consul.rs +++ b/src/rpc/consul.rs @@ -6,7 +6,7 @@ use std::net::{IpAddr, SocketAddr}; use err_derive::Error; use serde::{Deserialize, Serialize}; -use netapp::NodeID; +use garage_net::NodeID; use garage_util::config::ConsulDiscoveryAPI; use garage_util::config::ConsulDiscoveryConfig; diff --git a/src/rpc/kubernetes.rs b/src/rpc/kubernetes.rs index 63c6567d..85254bb5 100644 --- a/src/rpc/kubernetes.rs +++ b/src/rpc/kubernetes.rs @@ -10,7 +10,7 @@ use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomRe use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use netapp::NodeID; +use garage_net::NodeID; use garage_util::config::KubernetesDiscoveryConfig; diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index 6747b79d..0b6c7e63 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -4,9 +4,9 @@ use std::time::Duration; use tokio::sync::Notify; -use netapp::endpoint::Endpoint; -use netapp::peering::fullmesh::FullMeshPeeringStrategy; -use netapp::NodeID; +use garage_net::endpoint::Endpoint; +use garage_net::peering::PeeringManager; +use garage_net::NodeID; use garage_util::config::Config; use garage_util::data::*; @@ -37,7 +37,7 @@ impl LayoutManager { config: &Config, node_id: NodeID, system_endpoint: Arc<Endpoint<SystemRpc, System>>, - fullmesh: Arc<FullMeshPeeringStrategy>, + peering: Arc<PeeringManager>, replication_mode: ReplicationMode, ) -> Result<Arc<Self>, Error> { let replication_factor = replication_mode.replication_factor(); @@ -74,7 +74,7 @@ impl LayoutManager { let rpc_helper = RpcHelper::new( node_id.into(), - fullmesh, + peering, layout.clone(), config.rpc_timeout_msec.map(Duration::from_millis), ); diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index ae3a19c4..977c6ed8 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -14,13 +14,13 @@ use opentelemetry::{ Context, }; -pub use netapp::endpoint::{Endpoint, EndpointHandler, StreamingEndpointHandler}; -pub use netapp::message::{ +pub use garage_net::endpoint::{Endpoint, EndpointHandler, StreamingEndpointHandler}; +pub use garage_net::message::{ IntoReq, Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL, PRIO_SECONDARY, }; -use netapp::peering::fullmesh::FullMeshPeeringStrategy; -pub use netapp::{self, NetApp, NodeID}; +use garage_net::peering::PeeringManager; +pub use garage_net::{self, NetApp, NodeID}; use garage_util::data::*; use garage_util::error::Error; @@ -89,7 +89,7 @@ pub struct RpcHelper(Arc<RpcHelperInner>); struct RpcHelperInner { our_node_id: Uuid, - fullmesh: Arc<FullMeshPeeringStrategy>, + peering: Arc<PeeringManager>, layout: Arc<RwLock<LayoutHelper>>, metrics: RpcMetrics, rpc_timeout: Duration, @@ -98,7 +98,7 @@ struct RpcHelperInner { impl RpcHelper { pub(crate) fn new( our_node_id: Uuid, - fullmesh: Arc<FullMeshPeeringStrategy>, + peering: Arc<PeeringManager>, layout: Arc<RwLock<LayoutHelper>>, rpc_timeout: Option<Duration>, ) -> Self { @@ -106,7 +106,7 @@ impl RpcHelper { Self(Arc::new(RpcHelperInner { our_node_id, - fullmesh, + peering, layout, metrics, rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT), @@ -193,7 +193,7 @@ impl RpcHelper { let span_name = format!("RPC [{}] call_many {} nodes", endpoint.path(), to.len()); let span = tracer.start(span_name); - let msg = msg.into_req().map_err(netapp::error::Error::from)?; + let msg = msg.into_req().map_err(garage_net::error::Error::from)?; let resps = join_all( to.iter() @@ -221,7 +221,7 @@ impl RpcHelper { { let to = self .0 - .fullmesh + .peering .get_peer_list() .iter() .map(|p| p.id.into()) @@ -310,7 +310,7 @@ impl RpcHelper { // Build future for each request // They are not started now: they are added below in a FuturesUnordered // object that will take care of polling them (see below) - let msg = msg.into_req().map_err(netapp::error::Error::from)?; + let msg = msg.into_req().map_err(garage_net::error::Error::from)?; let mut requests = request_order.into_iter().map(|to| { let self2 = self.clone(); let msg = msg.clone(); @@ -441,7 +441,7 @@ impl RpcHelper { let mut result_tracker = QuorumSetResultTracker::new(to_sets, quorum); // Send one request to each peer of the quorum sets - let msg = msg.into_req().map_err(netapp::error::Error::from)?; + let msg = msg.into_req().map_err(garage_net::error::Error::from)?; let requests = result_tracker.nodes.keys().map(|peer| { let self2 = self.clone(); let msg = msg.clone(); @@ -521,7 +521,7 @@ impl RpcHelper { nodes: impl Iterator<Item = Uuid>, ) -> Vec<Uuid> { // Retrieve some status variables that we will use to sort requests - let peer_list = self.0.fullmesh.get_peer_list(); + let peer_list = self.0.peering.get_peer_list(); let our_zone = layout .current() .get_node_zone(&self.0.our_node_id) diff --git a/src/rpc/system.rs b/src/rpc/system.rs index f22247c3..21156d15 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -15,11 +15,11 @@ use sodiumoxide::crypto::sign::ed25519; use tokio::select; use tokio::sync::{watch, Notify}; -use netapp::endpoint::{Endpoint, EndpointHandler}; -use netapp::message::*; -use netapp::peering::fullmesh::FullMeshPeeringStrategy; -use netapp::util::parse_and_resolve_peer_addr_async; -use netapp::{NetApp, NetworkKey, NodeID, NodeKey}; +use garage_net::endpoint::{Endpoint, EndpointHandler}; +use garage_net::message::*; +use garage_net::peering::PeeringManager; +use garage_net::util::parse_and_resolve_peer_addr_async; +use garage_net::{NetApp, NetworkKey, NodeID, NodeKey}; #[cfg(feature = "kubernetes-discovery")] use garage_util::config::KubernetesDiscoveryConfig; @@ -96,7 +96,7 @@ pub struct System { node_status: RwLock<HashMap<Uuid, (u64, NodeStatus)>>, pub netapp: Arc<NetApp>, - fullmesh: Arc<FullMeshPeeringStrategy>, + peering: Arc<PeeringManager>, pub(crate) system_endpoint: Arc<Endpoint<SystemRpc, System>>, @@ -265,9 +265,9 @@ impl System { warn!("This Garage node does not know its publicly reachable RPC address, this might hamper intra-cluster communication."); } - let fullmesh = FullMeshPeeringStrategy::new(netapp.clone(), vec![], rpc_public_addr); + let peering = PeeringManager::new(netapp.clone(), vec![], rpc_public_addr); if let Some(ping_timeout) = config.rpc_ping_timeout_msec { - fullmesh.set_ping_timeout_millis(ping_timeout); + peering.set_ping_timeout_millis(ping_timeout); } let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list"); @@ -279,7 +279,7 @@ impl System { config, netapp.id, system_endpoint.clone(), - fullmesh.clone(), + peering.clone(), replication_mode, )?; @@ -315,7 +315,7 @@ impl System { local_status: ArcSwap::new(Arc::new(local_status)), node_status: RwLock::new(HashMap::new()), netapp: netapp.clone(), - fullmesh, + peering: peering.clone(), system_endpoint, replication_mode, replication_factor, @@ -343,7 +343,7 @@ impl System { self.netapp .clone() .listen(self.rpc_listen_addr, None, must_exit.clone()), - self.fullmesh.clone().run(must_exit.clone()), + self.peering.clone().run(must_exit.clone()), self.discovery_loop(must_exit.clone()), self.status_exchange_loop(must_exit.clone()), ); @@ -369,7 +369,7 @@ impl System { pub fn get_known_nodes(&self) -> Vec<KnownNodeInfo> { let node_status = self.node_status.read().unwrap(); let known_nodes = self - .fullmesh + .peering .get_peer_list() .iter() .map(|n| KnownNodeInfo { @@ -623,10 +623,10 @@ impl System { async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) { while !*stop_signal.borrow() { let not_configured = self.cluster_layout().check().is_err(); - let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor; + let no_peers = self.peering.get_peer_list().len() < self.replication_factor; let expected_n_nodes = self.cluster_layout().all_nodes().len(); let bad_peers = self - .fullmesh + .peering .get_peer_list() .iter() .filter(|p| p.is_up()) @@ -708,7 +708,7 @@ impl System { // Prepare new peer list to save to file // It is a vec of tuples (node ID as Uuid, node SocketAddr) let mut peer_list = self - .fullmesh + .peering .get_peer_list() .iter() .map(|n| (n.id.into(), n.addr)) @@ -916,7 +916,7 @@ async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> { fn connect_error_message( addr: SocketAddr, pubkey: ed25519::PublicKey, - e: netapp::error::Error, + e: garage_net::error::Error, ) -> String { format!("Error establishing RPC connection to remote node: {}@{}.\nThis can happen if the remote node is not reachable on the network, but also if the two nodes are not configured with the same rpc_secret.\n{}", hex::encode(pubkey), addr, e) } |