diff options
author | Trinity Pointard <trinity.pointard@gmail.com> | 2021-03-22 00:00:09 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2021-04-27 16:37:10 +0200 |
commit | 8e0524ae15e5252aecd30dc01d6993810cf49811 (patch) | |
tree | c12c1079617112c90bd47d3240587ebb54bdaf9b /src/rpc/membership.rs | |
parent | f9bd2d8fb79a8f3dbea54834b39e65438846ea5c (diff) | |
download | garage-8e0524ae15e5252aecd30dc01d6993810cf49811.tar.gz garage-8e0524ae15e5252aecd30dc01d6993810cf49811.zip |
document rpc crate
Diffstat (limited to 'src/rpc/membership.rs')
-rw-r--r-- | src/rpc/membership.rs | 34 |
1 files changed, 33 insertions, 1 deletions
diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index 9fb24ad4..c465ce68 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -1,3 +1,4 @@ +/// Module containing structs related to membership management use std::collections::HashMap; use std::fmt::Write as FmtWrite; use std::io::{Read, Write}; @@ -30,20 +31,29 @@ const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60); const PING_TIMEOUT: Duration = Duration::from_secs(2); const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5; +/// RPC endpoint used for calls related to membership pub const MEMBERSHIP_RPC_PATH: &str = "_membership"; +/// RPC messages related to membership #[derive(Debug, Serialize, Deserialize)] pub enum Message { + /// Response to successfull advertisements Ok, + /// Message sent to detect other nodes status Ping(PingMessage), + /// Ask other node for the nodes it knows. Answered with AdvertiseNodesUp PullStatus, + /// Ask other node its config. Answered with AdvertiseConfig PullConfig, + /// Advertisement of nodes the host knows up. Sent spontanously or in response to PullStatus AdvertiseNodesUp(Vec<AdvertisedNode>), + /// Advertisement of nodes config. Sent spontanously or in response to PullConfig AdvertiseConfig(NetworkConfig), } impl RpcMessage for Message {} +/// A ping, containing informations about status and config #[derive(Debug, Serialize, Deserialize)] pub struct PingMessage { id: UUID, @@ -55,18 +65,25 @@ pub struct PingMessage { state_info: StateInfo, } +/// A node advertisement #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AdvertisedNode { + /// Id of the node this advertisement relates to pub id: UUID, + /// IP and port of the node pub addr: SocketAddr, + /// Is the node considered up pub is_up: bool, + /// When was the node last seen up, in milliseconds since UNIX epoch pub last_seen: u64, pub state_info: StateInfo, } +/// This node's membership manager pub struct System { + /// The id of this node pub id: UUID, persist_config: Persister<NetworkConfig>, @@ -79,10 +96,12 @@ pub struct System { rpc_client: Arc<RpcClient<Message>>, pub(crate) status: watch::Receiver<Arc<Status>>, + /// The ring, viewed by this node pub ring: watch::Receiver<Arc<Ring>>, update_lock: Mutex<Updaters>, + /// The job runner of this node pub background: Arc<BackgroundRunner>, } @@ -91,21 +110,30 @@ struct Updaters { update_ring: watch::Sender<Arc<Ring>>, } +/// The status of each nodes, viewed by this node #[derive(Debug, Clone)] pub struct Status { + /// Mapping of each node id to its known status + // considering its sorted regularly, maybe it should be a btreeset? pub nodes: HashMap<UUID, Arc<StatusEntry>>, + /// Hash of this entry pub hash: Hash, } +/// The status of a single node #[derive(Debug)] pub struct StatusEntry { + /// The IP and port used to connect to this node pub addr: SocketAddr, + /// Last time this node was seen pub last_seen: u64, + /// Number of consecutive pings sent without reply to this node pub num_failures: AtomicUsize, pub state_info: StateInfo, } impl StatusEntry { + /// is the node associated to this entry considered up pub fn is_up(&self) -> bool { self.num_failures.load(Ordering::SeqCst) < MAX_FAILURES_BEFORE_CONSIDERED_DOWN } @@ -195,6 +223,7 @@ fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> { } impl System { + /// Create this node's membership manager pub fn new( metadata_dir: PathBuf, rpc_http_client: Arc<RpcHttpClient>, @@ -279,6 +308,7 @@ impl System { }); } + /// Get an RPC client pub fn rpc_client<M: RpcMessage + 'static>(self: &Arc<Self>, path: &str) -> Arc<RpcClient<M>> { RpcClient::new( RpcAddrClient::new(self.rpc_http_client.clone(), path.to_string()), @@ -287,6 +317,7 @@ impl System { ) } + /// Save network configuration to disc async fn save_network_config(self: Arc<Self>) -> Result<(), Error> { let ring = self.ring.borrow().clone(); self.persist_config @@ -319,6 +350,7 @@ impl System { self.rpc_client.call_many(&to[..], msg, timeout).await; } + /// Perform bootstraping, starting the ping loop pub async fn bootstrap( self: Arc<Self>, peers: Vec<SocketAddr>, @@ -348,7 +380,7 @@ impl System { id_option, addr, sys.rpc_client - .by_addr() + .get_addr() .call(&addr, ping_msg_ref, PING_TIMEOUT) .await, ) |