aboutsummaryrefslogtreecommitdiff
path: root/src/membership.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2020-04-18 19:21:34 +0200
committerAlex Auvolat <alex@adnab.me>2020-04-18 19:21:34 +0200
commitf41583e1b731574b4bb13a20d4b3fd9fe3a899f5 (patch)
treea2c1d32284fa0dc30fdf5408afad8255d50e51f6 /src/membership.rs
parent3f40ef149f6dd4d61ceb326b5691e186aec178c3 (diff)
downloadgarage-f41583e1b731574b4bb13a20d4b3fd9fe3a899f5.tar.gz
garage-f41583e1b731574b4bb13a20d4b3fd9fe3a899f5.zip
Massive RPC refactoring
Diffstat (limited to 'src/membership.rs')
-rw-r--r--src/membership.rs84
1 files changed, 75 insertions, 9 deletions
diff --git a/src/membership.rs b/src/membership.rs
index 6d758c59..499637fb 100644
--- a/src/membership.rs
+++ b/src/membership.rs
@@ -10,6 +10,7 @@ use std::time::Duration;
use futures::future::join_all;
use futures::select;
use futures_util::future::*;
+use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use tokio::prelude::*;
use tokio::sync::watch;
@@ -20,17 +21,31 @@ use crate::data::*;
use crate::error::Error;
use crate::proto::*;
use crate::rpc_client::*;
+use crate::rpc_server::*;
use crate::server::Config;
const PING_INTERVAL: Duration = Duration::from_secs(10);
const PING_TIMEOUT: Duration = Duration::from_secs(2);
const MAX_FAILED_PINGS: usize = 3;
+#[derive(Debug, Serialize, Deserialize)]
+pub enum Message {
+ Ok,
+ Ping(PingMessage),
+ PullStatus,
+ PullConfig,
+ AdvertiseNodesUp(Vec<AdvertisedNode>),
+ AdvertiseConfig(NetworkConfig),
+}
+
+impl RpcMessage for Message {}
+
pub struct System {
pub config: Config,
pub id: UUID,
- pub rpc_client: RpcClient,
+ pub rpc_http_client: Arc<RpcHttpClient>,
+ rpc_client: Arc<RpcClient<Message>>,
pub status: watch::Receiver<Arc<Status>>,
pub ring: watch::Receiver<Arc<Ring>>,
@@ -199,7 +214,12 @@ fn read_network_config(metadata_dir: &PathBuf) -> Result<NetworkConfig, Error> {
}
impl System {
- pub fn new(config: Config, id: UUID, background: Arc<BackgroundRunner>) -> Self {
+ pub fn new(
+ config: Config,
+ id: UUID,
+ background: Arc<BackgroundRunner>,
+ rpc_server: &mut RpcServer,
+ ) -> Arc<Self> {
let net_config = match read_network_config(&config.metadata_dir) {
Ok(x) => x,
Err(e) => {
@@ -228,17 +248,54 @@ impl System {
ring.rebuild_ring();
let (update_ring, ring) = watch::channel(Arc::new(ring));
- let rpc_client = RpcClient::new(&config.rpc_tls).expect("Could not create RPC client");
+ let rpc_http_client =
+ Arc::new(RpcHttpClient::new(&config.rpc_tls).expect("Could not create RPC client"));
+
+ let rpc_path = "_membership";
+ let rpc_client = RpcClient::new(
+ RpcAddrClient::<Message>::new(rpc_http_client.clone(), rpc_path.into()),
+ background.clone(),
+ status.clone(),
+ );
- System {
+ let sys = Arc::new(System {
config,
id,
+ rpc_http_client,
rpc_client,
status,
ring,
update_lock: Mutex::new((update_status, update_ring)),
background,
- }
+ });
+ sys.clone().register_handler(rpc_server, rpc_path.into());
+ sys
+ }
+
+ fn register_handler(self: Arc<Self>, rpc_server: &mut RpcServer, path: String) {
+ rpc_server.add_handler::<Message, _, _>(path, move |msg, addr| {
+ let self2 = self.clone();
+ async move {
+ match msg {
+ Message::Ping(ping) => self2.handle_ping(&addr, &ping).await,
+
+ Message::PullStatus => self2.handle_pull_status(),
+ Message::PullConfig => self2.handle_pull_config(),
+ Message::AdvertiseNodesUp(adv) => self2.handle_advertise_nodes_up(&adv).await,
+ Message::AdvertiseConfig(adv) => self2.handle_advertise_config(&adv).await,
+
+ _ => Err(Error::Message(format!("Unexpected RPC message"))),
+ }
+ }
+ });
+ }
+
+ pub fn rpc_client<M: RpcMessage + 'static>(self: &Arc<Self>, path: &str) -> Arc<RpcClient<M>> {
+ RpcClient::new(
+ RpcAddrClient::new(self.rpc_http_client.clone(), path.to_string()),
+ self.background.clone(),
+ self.status.clone(),
+ )
}
async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
@@ -272,7 +329,7 @@ impl System {
.filter(|x| **x != self.id)
.cloned()
.collect::<Vec<_>>();
- rpc_call_many(self.clone(), &to[..], msg, timeout).await;
+ self.rpc_client.call_many(&to[..], msg, timeout).await;
}
pub async fn bootstrap(self: Arc<Self>) {
@@ -299,7 +356,10 @@ impl System {
(
id_option,
addr.clone(),
- sys.rpc_client.call(&addr, ping_msg_ref, PING_TIMEOUT).await,
+ sys.rpc_client
+ .by_addr()
+ .call(&addr, ping_msg_ref, PING_TIMEOUT)
+ .await,
)
}
}))
@@ -509,7 +569,10 @@ impl System {
peer: UUID,
) -> impl futures::future::Future<Output = ()> + Send + 'static {
async move {
- let resp = rpc_call(self.clone(), &peer, &Message::PullStatus, PING_TIMEOUT).await;
+ let resp = self
+ .rpc_client
+ .call(&peer, Message::PullStatus, PING_TIMEOUT)
+ .await;
if let Ok(Message::AdvertiseNodesUp(nodes)) = resp {
let _: Result<_, _> = self.handle_advertise_nodes_up(&nodes).await;
}
@@ -517,7 +580,10 @@ impl System {
}
pub async fn pull_config(self: Arc<Self>, peer: UUID) {
- let resp = rpc_call(self.clone(), &peer, &Message::PullConfig, PING_TIMEOUT).await;
+ let resp = self
+ .rpc_client
+ .call(&peer, Message::PullConfig, PING_TIMEOUT)
+ .await;
if let Ok(Message::AdvertiseConfig(config)) = resp {
let _: Result<_, _> = self.handle_advertise_config(&config).await;
}