diff options
author | Alex Auvolat <alex@adnab.me> | 2024-02-14 10:04:46 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2024-02-15 12:15:35 +0100 |
commit | 125c662860621f9c834e254d62b29b5d5ace5dd4 (patch) | |
tree | eb32dadec4de88e5ec69dbdf5e73f68f9299201a | |
parent | 5766befb245efba7d9241e2e2c68bcf4ed28d7a4 (diff) | |
download | garage-125c662860621f9c834e254d62b29b5d5ace5dd4.tar.gz garage-125c662860621f9c834e254d62b29b5d5ace5dd4.zip |
[import-netapp] move and rename FullMeshPeeringSrategy to PeeringManagerimport-netapp
-rw-r--r-- | src/net/peering.rs (renamed from src/net/peering/fullmesh.rs) | 9 | ||||
-rw-r--r-- | src/net/peering/mod.rs | 1 | ||||
-rw-r--r-- | src/net/test.rs | 6 | ||||
-rw-r--r-- | src/rpc/rpc_helper.rs | 12 | ||||
-rw-r--r-- | src/rpc/system.rs | 22 |
5 files changed, 25 insertions, 25 deletions
diff --git a/src/net/peering/fullmesh.rs b/src/net/peering.rs index 8e666044..32199cf8 100644 --- a/src/net/peering/fullmesh.rs +++ b/src/net/peering.rs @@ -177,7 +177,7 @@ impl KnownHosts { /// A "Full Mesh" peering strategy is a peering strategy that tries /// to establish and maintain a direct connection with all of the /// known nodes in the network. -pub struct FullMeshPeeringStrategy { +pub struct PeeringManager { netapp: Arc<NetApp>, known_hosts: RwLock<KnownHosts>, public_peer_list: ArcSwap<Vec<PeerInfo>>, @@ -189,7 +189,7 @@ pub struct FullMeshPeeringStrategy { ping_timeout_millis: AtomicU64, } -impl FullMeshPeeringStrategy { +impl PeeringManager { /// Create a new Full Mesh peering strategy. /// The strategy will not be run until `.run()` is called and awaited. /// Once that happens, the peering strategy will try to connect @@ -216,6 +216,7 @@ impl FullMeshPeeringStrategy { ); } + // TODO for v0.10 / v1.0 : rename the endpoint (it will break compatibility) let strat = Arc::new(Self { netapp: netapp.clone(), known_hosts: RwLock::new(known_hosts), @@ -588,7 +589,7 @@ impl FullMeshPeeringStrategy { } #[async_trait] -impl EndpointHandler<PingMessage> for FullMeshPeeringStrategy { +impl EndpointHandler<PingMessage> for PeeringManager { async fn handle(self: &Arc<Self>, ping: &PingMessage, from: NodeID) -> PingMessage { let ping_resp = PingMessage { id: ping.id, @@ -600,7 +601,7 @@ impl EndpointHandler<PingMessage> for FullMeshPeeringStrategy { } #[async_trait] -impl EndpointHandler<PeerListMessage> for FullMeshPeeringStrategy { +impl EndpointHandler<PeerListMessage> for PeeringManager { async fn handle( self: &Arc<Self>, peer_list: &PeerListMessage, diff --git a/src/net/peering/mod.rs b/src/net/peering/mod.rs deleted file mode 100644 index 044b1dfe..00000000 --- a/src/net/peering/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod fullmesh; diff --git a/src/net/test.rs b/src/net/test.rs index d4da6f23..c6259752 100644 --- a/src/net/test.rs +++ b/src/net/test.rs @@ -9,7 +9,7 @@ use sodiumoxide::crypto::auth; use sodiumoxide::crypto::sign::ed25519; use crate::netapp::*; -use crate::peering::fullmesh::*; +use crate::peering::*; use crate::NodeID; #[tokio::test(flavor = "current_thread")] @@ -100,10 +100,10 @@ fn run_netapp( ) -> ( tokio::task::JoinHandle<()>, Arc<NetApp>, - Arc<FullMeshPeeringStrategy>, + Arc<PeeringManager>, ) { let netapp = NetApp::new(0u64, netid, sk); - let peering = FullMeshPeeringStrategy::new(netapp.clone(), bootstrap_peers, None); + let peering = PeeringManager::new(netapp.clone(), bootstrap_peers, None); let peering2 = peering.clone(); let netapp2 = netapp.clone(); diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index b5279bcd..c46e577f 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -19,7 +19,7 @@ pub use garage_net::message::{ IntoReq, Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL, PRIO_SECONDARY, }; -use garage_net::peering::fullmesh::FullMeshPeeringStrategy; +use garage_net::peering::PeeringManager; pub use garage_net::{self, NetApp, NodeID}; use garage_util::data::*; @@ -90,7 +90,7 @@ pub struct RpcHelper(Arc<RpcHelperInner>); struct RpcHelperInner { our_node_id: Uuid, - fullmesh: Arc<FullMeshPeeringStrategy>, + peering: Arc<PeeringManager>, ring: watch::Receiver<Arc<Ring>>, metrics: RpcMetrics, rpc_timeout: Duration, @@ -99,7 +99,7 @@ struct RpcHelperInner { impl RpcHelper { pub(crate) fn new( our_node_id: Uuid, - fullmesh: Arc<FullMeshPeeringStrategy>, + peering: Arc<PeeringManager>, ring: watch::Receiver<Arc<Ring>>, rpc_timeout: Option<Duration>, ) -> Self { @@ -107,7 +107,7 @@ impl RpcHelper { Self(Arc::new(RpcHelperInner { our_node_id, - fullmesh, + peering, ring, metrics, rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT), @@ -210,7 +210,7 @@ impl RpcHelper { { let to = self .0 - .fullmesh + .peering .get_peer_list() .iter() .map(|p| p.id.into()) @@ -391,7 +391,7 @@ impl RpcHelper { pub fn request_order(&self, nodes: &[Uuid]) -> Vec<Uuid> { // Retrieve some status variables that we will use to sort requests - let peer_list = self.0.fullmesh.get_peer_list(); + let peer_list = self.0.peering.get_peer_list(); let ring: Arc<Ring> = self.0.ring.borrow().clone(); let our_zone = match ring.layout.node_role(&self.0.our_node_id) { Some(pc) => &pc.zone, diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 4b9a72ef..de44e656 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -18,7 +18,7 @@ use tokio::sync::Mutex; use garage_net::endpoint::{Endpoint, EndpointHandler}; use garage_net::message::*; -use garage_net::peering::fullmesh::FullMeshPeeringStrategy; +use garage_net::peering::PeeringManager; use garage_net::util::parse_and_resolve_peer_addr_async; use garage_net::{NetApp, NetworkKey, NodeID, NodeKey}; @@ -92,7 +92,7 @@ pub struct System { node_status: RwLock<HashMap<Uuid, (u64, NodeStatus)>>, pub netapp: Arc<NetApp>, - fullmesh: Arc<FullMeshPeeringStrategy>, + peering: Arc<PeeringManager>, pub rpc: RpcHelper, system_endpoint: Arc<Endpoint<SystemRpc, System>>, @@ -326,9 +326,9 @@ impl System { } let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key); - let fullmesh = FullMeshPeeringStrategy::new(netapp.clone(), vec![], rpc_public_addr); + let peering = PeeringManager::new(netapp.clone(), vec![], rpc_public_addr); if let Some(ping_timeout) = config.rpc_ping_timeout_msec { - fullmesh.set_ping_timeout_millis(ping_timeout); + peering.set_ping_timeout_millis(ping_timeout); } let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into()); @@ -358,10 +358,10 @@ impl System { local_status: ArcSwap::new(Arc::new(local_status)), node_status: RwLock::new(HashMap::new()), netapp: netapp.clone(), - fullmesh: fullmesh.clone(), + peering: peering.clone(), rpc: RpcHelper::new( netapp.id.into(), - fullmesh, + peering, ring.clone(), config.rpc_timeout_msec.map(Duration::from_millis), ), @@ -393,7 +393,7 @@ impl System { self.netapp .clone() .listen(self.rpc_listen_addr, None, must_exit.clone()), - self.fullmesh.clone().run(must_exit.clone()), + self.peering.clone().run(must_exit.clone()), self.discovery_loop(must_exit.clone()), self.status_exchange_loop(must_exit.clone()), ); @@ -405,7 +405,7 @@ impl System { pub fn get_known_nodes(&self) -> Vec<KnownNodeInfo> { let node_status = self.node_status.read().unwrap(); let known_nodes = self - .fullmesh + .peering .get_peer_list() .iter() .map(|n| KnownNodeInfo { @@ -726,10 +726,10 @@ impl System { async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) { while !*stop_signal.borrow() { let not_configured = self.ring.borrow().layout.check().is_err(); - let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor; + let no_peers = self.peering.get_peer_list().len() < self.replication_factor; let expected_n_nodes = self.ring.borrow().layout.num_nodes(); let bad_peers = self - .fullmesh + .peering .get_peer_list() .iter() .filter(|p| p.is_up()) @@ -811,7 +811,7 @@ impl System { // Prepare new peer list to save to file // It is a vec of tuples (node ID as Uuid, node SocketAddr) let mut peer_list = self - .fullmesh + .peering .get_peer_list() .iter() .map(|n| (n.id.into(), n.addr)) |