aboutsummaryrefslogtreecommitdiff
path: root/src/rpc
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2024-02-15 13:58:15 +0100
committerAlex Auvolat <alex@adnab.me>2024-02-15 14:06:34 +0100
commiteb4a6ce1060a847be0b62c6a10ff3ba956e3f34d (patch)
tree84f6746ec92ca3a5344da22edadcd1ac3413aa29 /src/rpc
parentcf2af186fcc0c8f581a966454b6cd4720d3821f0 (diff)
parent7be3f15e45fcfff10a45302a040c2919a3ba8ccd (diff)
downloadgarage-eb4a6ce1060a847be0b62c6a10ff3ba956e3f34d.tar.gz
garage-eb4a6ce1060a847be0b62c6a10ff3ba956e3f34d.zip
Merge branch 'main' into next-0.10
Diffstat (limited to 'src/rpc')
-rw-r--r--src/rpc/Cargo.toml3
-rw-r--r--src/rpc/consul.rs2
-rw-r--r--src/rpc/kubernetes.rs2
-rw-r--r--src/rpc/layout/manager.rs10
-rw-r--r--src/rpc/rpc_helper.rs24
-rw-r--r--src/rpc/system.rs32
6 files changed, 36 insertions, 37 deletions
diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml
index 1b2867a5..3e7ac635 100644
--- a/src/rpc/Cargo.toml
+++ b/src/rpc/Cargo.toml
@@ -17,6 +17,7 @@ path = "lib.rs"
format_table.workspace = true
garage_db.workspace = true
garage_util.workspace = true
+garage_net.workspace = true
arc-swap.workspace = true
bytes.workspace = true
@@ -49,8 +50,6 @@ tokio.workspace = true
tokio-stream.workspace = true
opentelemetry.workspace = true
-netapp.workspace = true
-
[features]
kubernetes-discovery = [ "kube", "k8s-openapi", "schemars" ]
consul-discovery = [ "reqwest", "err-derive" ]
diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs
index 71fd71b0..f088bf3f 100644
--- a/src/rpc/consul.rs
+++ b/src/rpc/consul.rs
@@ -6,7 +6,7 @@ use std::net::{IpAddr, SocketAddr};
use err_derive::Error;
use serde::{Deserialize, Serialize};
-use netapp::NodeID;
+use garage_net::NodeID;
use garage_util::config::ConsulDiscoveryAPI;
use garage_util::config::ConsulDiscoveryConfig;
diff --git a/src/rpc/kubernetes.rs b/src/rpc/kubernetes.rs
index 63c6567d..85254bb5 100644
--- a/src/rpc/kubernetes.rs
+++ b/src/rpc/kubernetes.rs
@@ -10,7 +10,7 @@ use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomRe
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
-use netapp::NodeID;
+use garage_net::NodeID;
use garage_util::config::KubernetesDiscoveryConfig;
diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs
index 6747b79d..0b6c7e63 100644
--- a/src/rpc/layout/manager.rs
+++ b/src/rpc/layout/manager.rs
@@ -4,9 +4,9 @@ use std::time::Duration;
use tokio::sync::Notify;
-use netapp::endpoint::Endpoint;
-use netapp::peering::fullmesh::FullMeshPeeringStrategy;
-use netapp::NodeID;
+use garage_net::endpoint::Endpoint;
+use garage_net::peering::PeeringManager;
+use garage_net::NodeID;
use garage_util::config::Config;
use garage_util::data::*;
@@ -37,7 +37,7 @@ impl LayoutManager {
config: &Config,
node_id: NodeID,
system_endpoint: Arc<Endpoint<SystemRpc, System>>,
- fullmesh: Arc<FullMeshPeeringStrategy>,
+ peering: Arc<PeeringManager>,
replication_mode: ReplicationMode,
) -> Result<Arc<Self>, Error> {
let replication_factor = replication_mode.replication_factor();
@@ -74,7 +74,7 @@ impl LayoutManager {
let rpc_helper = RpcHelper::new(
node_id.into(),
- fullmesh,
+ peering,
layout.clone(),
config.rpc_timeout_msec.map(Duration::from_millis),
);
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index ae3a19c4..977c6ed8 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -14,13 +14,13 @@ use opentelemetry::{
Context,
};
-pub use netapp::endpoint::{Endpoint, EndpointHandler, StreamingEndpointHandler};
-pub use netapp::message::{
+pub use garage_net::endpoint::{Endpoint, EndpointHandler, StreamingEndpointHandler};
+pub use garage_net::message::{
IntoReq, Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH,
PRIO_NORMAL, PRIO_SECONDARY,
};
-use netapp::peering::fullmesh::FullMeshPeeringStrategy;
-pub use netapp::{self, NetApp, NodeID};
+use garage_net::peering::PeeringManager;
+pub use garage_net::{self, NetApp, NodeID};
use garage_util::data::*;
use garage_util::error::Error;
@@ -89,7 +89,7 @@ pub struct RpcHelper(Arc<RpcHelperInner>);
struct RpcHelperInner {
our_node_id: Uuid,
- fullmesh: Arc<FullMeshPeeringStrategy>,
+ peering: Arc<PeeringManager>,
layout: Arc<RwLock<LayoutHelper>>,
metrics: RpcMetrics,
rpc_timeout: Duration,
@@ -98,7 +98,7 @@ struct RpcHelperInner {
impl RpcHelper {
pub(crate) fn new(
our_node_id: Uuid,
- fullmesh: Arc<FullMeshPeeringStrategy>,
+ peering: Arc<PeeringManager>,
layout: Arc<RwLock<LayoutHelper>>,
rpc_timeout: Option<Duration>,
) -> Self {
@@ -106,7 +106,7 @@ impl RpcHelper {
Self(Arc::new(RpcHelperInner {
our_node_id,
- fullmesh,
+ peering,
layout,
metrics,
rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT),
@@ -193,7 +193,7 @@ impl RpcHelper {
let span_name = format!("RPC [{}] call_many {} nodes", endpoint.path(), to.len());
let span = tracer.start(span_name);
- let msg = msg.into_req().map_err(netapp::error::Error::from)?;
+ let msg = msg.into_req().map_err(garage_net::error::Error::from)?;
let resps = join_all(
to.iter()
@@ -221,7 +221,7 @@ impl RpcHelper {
{
let to = self
.0
- .fullmesh
+ .peering
.get_peer_list()
.iter()
.map(|p| p.id.into())
@@ -310,7 +310,7 @@ impl RpcHelper {
// Build future for each request
// They are not started now: they are added below in a FuturesUnordered
// object that will take care of polling them (see below)
- let msg = msg.into_req().map_err(netapp::error::Error::from)?;
+ let msg = msg.into_req().map_err(garage_net::error::Error::from)?;
let mut requests = request_order.into_iter().map(|to| {
let self2 = self.clone();
let msg = msg.clone();
@@ -441,7 +441,7 @@ impl RpcHelper {
let mut result_tracker = QuorumSetResultTracker::new(to_sets, quorum);
// Send one request to each peer of the quorum sets
- let msg = msg.into_req().map_err(netapp::error::Error::from)?;
+ let msg = msg.into_req().map_err(garage_net::error::Error::from)?;
let requests = result_tracker.nodes.keys().map(|peer| {
let self2 = self.clone();
let msg = msg.clone();
@@ -521,7 +521,7 @@ impl RpcHelper {
nodes: impl Iterator<Item = Uuid>,
) -> Vec<Uuid> {
// Retrieve some status variables that we will use to sort requests
- let peer_list = self.0.fullmesh.get_peer_list();
+ let peer_list = self.0.peering.get_peer_list();
let our_zone = layout
.current()
.get_node_zone(&self.0.our_node_id)
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index f22247c3..21156d15 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -15,11 +15,11 @@ use sodiumoxide::crypto::sign::ed25519;
use tokio::select;
use tokio::sync::{watch, Notify};
-use netapp::endpoint::{Endpoint, EndpointHandler};
-use netapp::message::*;
-use netapp::peering::fullmesh::FullMeshPeeringStrategy;
-use netapp::util::parse_and_resolve_peer_addr_async;
-use netapp::{NetApp, NetworkKey, NodeID, NodeKey};
+use garage_net::endpoint::{Endpoint, EndpointHandler};
+use garage_net::message::*;
+use garage_net::peering::PeeringManager;
+use garage_net::util::parse_and_resolve_peer_addr_async;
+use garage_net::{NetApp, NetworkKey, NodeID, NodeKey};
#[cfg(feature = "kubernetes-discovery")]
use garage_util::config::KubernetesDiscoveryConfig;
@@ -96,7 +96,7 @@ pub struct System {
node_status: RwLock<HashMap<Uuid, (u64, NodeStatus)>>,
pub netapp: Arc<NetApp>,
- fullmesh: Arc<FullMeshPeeringStrategy>,
+ peering: Arc<PeeringManager>,
pub(crate) system_endpoint: Arc<Endpoint<SystemRpc, System>>,
@@ -265,9 +265,9 @@ impl System {
warn!("This Garage node does not know its publicly reachable RPC address, this might hamper intra-cluster communication.");
}
- let fullmesh = FullMeshPeeringStrategy::new(netapp.clone(), vec![], rpc_public_addr);
+ let peering = PeeringManager::new(netapp.clone(), vec![], rpc_public_addr);
if let Some(ping_timeout) = config.rpc_ping_timeout_msec {
- fullmesh.set_ping_timeout_millis(ping_timeout);
+ peering.set_ping_timeout_millis(ping_timeout);
}
let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list");
@@ -279,7 +279,7 @@ impl System {
config,
netapp.id,
system_endpoint.clone(),
- fullmesh.clone(),
+ peering.clone(),
replication_mode,
)?;
@@ -315,7 +315,7 @@ impl System {
local_status: ArcSwap::new(Arc::new(local_status)),
node_status: RwLock::new(HashMap::new()),
netapp: netapp.clone(),
- fullmesh,
+ peering: peering.clone(),
system_endpoint,
replication_mode,
replication_factor,
@@ -343,7 +343,7 @@ impl System {
self.netapp
.clone()
.listen(self.rpc_listen_addr, None, must_exit.clone()),
- self.fullmesh.clone().run(must_exit.clone()),
+ self.peering.clone().run(must_exit.clone()),
self.discovery_loop(must_exit.clone()),
self.status_exchange_loop(must_exit.clone()),
);
@@ -369,7 +369,7 @@ impl System {
pub fn get_known_nodes(&self) -> Vec<KnownNodeInfo> {
let node_status = self.node_status.read().unwrap();
let known_nodes = self
- .fullmesh
+ .peering
.get_peer_list()
.iter()
.map(|n| KnownNodeInfo {
@@ -623,10 +623,10 @@ impl System {
async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
while !*stop_signal.borrow() {
let not_configured = self.cluster_layout().check().is_err();
- let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor;
+ let no_peers = self.peering.get_peer_list().len() < self.replication_factor;
let expected_n_nodes = self.cluster_layout().all_nodes().len();
let bad_peers = self
- .fullmesh
+ .peering
.get_peer_list()
.iter()
.filter(|p| p.is_up())
@@ -708,7 +708,7 @@ impl System {
// Prepare new peer list to save to file
// It is a vec of tuples (node ID as Uuid, node SocketAddr)
let mut peer_list = self
- .fullmesh
+ .peering
.get_peer_list()
.iter()
.map(|n| (n.id.into(), n.addr))
@@ -916,7 +916,7 @@ async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> {
fn connect_error_message(
addr: SocketAddr,
pubkey: ed25519::PublicKey,
- e: netapp::error::Error,
+ e: garage_net::error::Error,
) -> String {
format!("Error establishing RPC connection to remote node: {}@{}.\nThis can happen if the remote node is not reachable on the network, but also if the two nodes are not configured with the same rpc_secret.\n{}", hex::encode(pubkey), addr, e)
}