diff options
author | Alex Auvolat <alex@adnab.me> | 2020-04-18 19:21:34 +0200 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2020-04-18 19:21:34 +0200 |
commit | f41583e1b731574b4bb13a20d4b3fd9fe3a899f5 (patch) | |
tree | a2c1d32284fa0dc30fdf5408afad8255d50e51f6 /src/membership.rs | |
parent | 3f40ef149f6dd4d61ceb326b5691e186aec178c3 (diff) | |
download | garage-f41583e1b731574b4bb13a20d4b3fd9fe3a899f5.tar.gz garage-f41583e1b731574b4bb13a20d4b3fd9fe3a899f5.zip |
Massive RPC refactoring
Diffstat (limited to 'src/membership.rs')
-rw-r--r-- | src/membership.rs | 84 |
1 files changed, 75 insertions, 9 deletions
diff --git a/src/membership.rs b/src/membership.rs index 6d758c59..499637fb 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -10,6 +10,7 @@ use std::time::Duration; use futures::future::join_all; use futures::select; use futures_util::future::*; +use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; use tokio::prelude::*; use tokio::sync::watch; @@ -20,17 +21,31 @@ use crate::data::*; use crate::error::Error; use crate::proto::*; use crate::rpc_client::*; +use crate::rpc_server::*; use crate::server::Config; const PING_INTERVAL: Duration = Duration::from_secs(10); const PING_TIMEOUT: Duration = Duration::from_secs(2); const MAX_FAILED_PINGS: usize = 3; +#[derive(Debug, Serialize, Deserialize)] +pub enum Message { + Ok, + Ping(PingMessage), + PullStatus, + PullConfig, + AdvertiseNodesUp(Vec<AdvertisedNode>), + AdvertiseConfig(NetworkConfig), +} + +impl RpcMessage for Message {} + pub struct System { pub config: Config, pub id: UUID, - pub rpc_client: RpcClient, + pub rpc_http_client: Arc<RpcHttpClient>, + rpc_client: Arc<RpcClient<Message>>, pub status: watch::Receiver<Arc<Status>>, pub ring: watch::Receiver<Arc<Ring>>, @@ -199,7 +214,12 @@ fn read_network_config(metadata_dir: &PathBuf) -> Result<NetworkConfig, Error> { } impl System { - pub fn new(config: Config, id: UUID, background: Arc<BackgroundRunner>) -> Self { + pub fn new( + config: Config, + id: UUID, + background: Arc<BackgroundRunner>, + rpc_server: &mut RpcServer, + ) -> Arc<Self> { let net_config = match read_network_config(&config.metadata_dir) { Ok(x) => x, Err(e) => { @@ -228,17 +248,54 @@ impl System { ring.rebuild_ring(); let (update_ring, ring) = watch::channel(Arc::new(ring)); - let rpc_client = RpcClient::new(&config.rpc_tls).expect("Could not create RPC client"); + let rpc_http_client = + Arc::new(RpcHttpClient::new(&config.rpc_tls).expect("Could not create RPC client")); + + let rpc_path = "_membership"; + let rpc_client = RpcClient::new( + RpcAddrClient::<Message>::new(rpc_http_client.clone(), rpc_path.into()), + background.clone(), + status.clone(), + ); - System { + let sys = Arc::new(System { config, id, + rpc_http_client, rpc_client, status, ring, update_lock: Mutex::new((update_status, update_ring)), background, - } + }); + sys.clone().register_handler(rpc_server, rpc_path.into()); + sys + } + + fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer, path: String) { + rpc_server.add_handler::<Message, _, _>(path, move |msg, addr| { + let self2 = self.clone(); + async move { + match msg { + Message::Ping(ping) => self2.handle_ping(&addr, &ping).await, + + Message::PullStatus => self2.handle_pull_status(), + Message::PullConfig => self2.handle_pull_config(), + Message::AdvertiseNodesUp(adv) => self2.handle_advertise_nodes_up(&adv).await, + Message::AdvertiseConfig(adv) => self2.handle_advertise_config(&adv).await, + + _ => Err(Error::Message(format!("Unexpected RPC message"))), + } + } + }); + } + + pub fn rpc_client<M: RpcMessage + 'static>(self: &Arc<Self>, path: &str) -> Arc<RpcClient<M>> { + RpcClient::new( + RpcAddrClient::new(self.rpc_http_client.clone(), path.to_string()), + self.background.clone(), + self.status.clone(), + ) } async fn save_network_config(self: Arc<Self>) -> Result<(), Error> { @@ -272,7 +329,7 @@ impl System { .filter(|x| **x != self.id) .cloned() .collect::<Vec<_>>(); - rpc_call_many(self.clone(), &to[..], msg, timeout).await; + self.rpc_client.call_many(&to[..], msg, timeout).await; } pub async fn bootstrap(self: Arc<Self>) { @@ -299,7 +356,10 @@ impl System { ( id_option, addr.clone(), - sys.rpc_client.call(&addr, ping_msg_ref, PING_TIMEOUT).await, + sys.rpc_client + .by_addr() + .call(&addr, ping_msg_ref, PING_TIMEOUT) + .await, ) } })) @@ -509,7 +569,10 @@ impl System { peer: UUID, ) -> impl futures::future::Future<Output = ()> + Send + 'static { async move { - let resp = rpc_call(self.clone(), &peer, &Message::PullStatus, PING_TIMEOUT).await; + let resp = self + .rpc_client + .call(&peer, Message::PullStatus, PING_TIMEOUT) + .await; if let Ok(Message::AdvertiseNodesUp(nodes)) = resp { let _: Result<_, _> = self.handle_advertise_nodes_up(&nodes).await; } @@ -517,7 +580,10 @@ impl System { } pub async fn pull_config(self: Arc<Self>, peer: UUID) { - let resp = rpc_call(self.clone(), &peer, &Message::PullConfig, PING_TIMEOUT).await; + let resp = self + .rpc_client + .call(&peer, Message::PullConfig, PING_TIMEOUT) + .await; if let Ok(Message::AdvertiseConfig(config)) = resp { let _: Result<_, _> = self.handle_advertise_config(&config).await; } |