aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/membership.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/membership.rs')
-rw-r--r--src/rpc/membership.rs33
1 files changed, 33 insertions, 0 deletions
diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs
index 9fb24ad4..5f7bbc96 100644
--- a/src/rpc/membership.rs
+++ b/src/rpc/membership.rs
@@ -1,3 +1,4 @@
+//! Module containing structs related to membership management
use std::collections::HashMap;
use std::fmt::Write as FmtWrite;
use std::io::{Read, Write};
@@ -30,20 +31,29 @@ const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
const PING_TIMEOUT: Duration = Duration::from_secs(2);
const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5;
+/// RPC endpoint used for calls related to membership
pub const MEMBERSHIP_RPC_PATH: &str = "_membership";
+/// RPC messages related to membership
#[derive(Debug, Serialize, Deserialize)]
pub enum Message {
+ /// Response to successfull advertisements
Ok,
+ /// Message sent to detect other nodes status
Ping(PingMessage),
+ /// Ask other node for the nodes it knows. Answered with AdvertiseNodesUp
PullStatus,
+ /// Ask other node its config. Answered with AdvertiseConfig
PullConfig,
+ /// Advertisement of nodes the host knows up. Sent spontanously or in response to PullStatus
AdvertiseNodesUp(Vec<AdvertisedNode>),
+ /// Advertisement of nodes config. Sent spontanously or in response to PullConfig
AdvertiseConfig(NetworkConfig),
}
impl RpcMessage for Message {}
+/// A ping, containing informations about status and config
#[derive(Debug, Serialize, Deserialize)]
pub struct PingMessage {
id: UUID,
@@ -55,18 +65,25 @@ pub struct PingMessage {
state_info: StateInfo,
}
+/// A node advertisement
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AdvertisedNode {
+ /// Id of the node this advertisement relates to
pub id: UUID,
+ /// IP and port of the node
pub addr: SocketAddr,
+ /// Is the node considered up
pub is_up: bool,
+ /// When was the node last seen up, in milliseconds since UNIX epoch
pub last_seen: u64,
pub state_info: StateInfo,
}
+/// This node's membership manager
pub struct System {
+ /// The id of this node
pub id: UUID,
persist_config: Persister<NetworkConfig>,
@@ -79,10 +96,12 @@ pub struct System {
rpc_client: Arc<RpcClient<Message>>,
pub(crate) status: watch::Receiver<Arc<Status>>,
+ /// The ring
pub ring: watch::Receiver<Arc<Ring>>,
update_lock: Mutex<Updaters>,
+ /// The job runner of this node
pub background: Arc<BackgroundRunner>,
}
@@ -91,21 +110,29 @@ struct Updaters {
update_ring: watch::Sender<Arc<Ring>>,
}
+/// The status of each nodes, viewed by this node
#[derive(Debug, Clone)]
pub struct Status {
+ /// Mapping of each node id to its known status
pub nodes: HashMap<UUID, Arc<StatusEntry>>,
+ /// Hash of `nodes`, used to detect when nodes have different views of the cluster
pub hash: Hash,
}
+/// The status of a single node
#[derive(Debug)]
pub struct StatusEntry {
+ /// The IP and port used to connect to this node
pub addr: SocketAddr,
+ /// Last time this node was seen
pub last_seen: u64,
+ /// Number of consecutive pings sent without reply to this node
pub num_failures: AtomicUsize,
pub state_info: StateInfo,
}
impl StatusEntry {
+ /// is the node associated to this entry considered up
pub fn is_up(&self) -> bool {
self.num_failures.load(Ordering::SeqCst) < MAX_FAILURES_BEFORE_CONSIDERED_DOWN
}
@@ -195,6 +222,7 @@ fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
}
impl System {
+ /// Create this node's membership manager
pub fn new(
metadata_dir: PathBuf,
rpc_http_client: Arc<RpcHttpClient>,
@@ -279,6 +307,7 @@ impl System {
});
}
+ /// Get an RPC client
pub fn rpc_client<M: RpcMessage + 'static>(self: &Arc<Self>, path: &str) -> Arc<RpcClient<M>> {
RpcClient::new(
RpcAddrClient::new(self.rpc_http_client.clone(), path.to_string()),
@@ -287,6 +316,7 @@ impl System {
)
}
+ /// Save network configuration to disc
async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
let ring = self.ring.borrow().clone();
self.persist_config
@@ -319,6 +349,7 @@ impl System {
self.rpc_client.call_many(&to[..], msg, timeout).await;
}
+ /// Perform bootstraping, starting the ping loop
pub async fn bootstrap(
self: Arc<Self>,
peers: Vec<SocketAddr>,
@@ -386,6 +417,8 @@ impl System {
}
} else if let Some(id) = id_option {
if let Some(st) = status.nodes.get_mut(id) {
+ // we need to increment failure counter as call was done using by_addr so the
+ // counter was not auto-incremented
st.num_failures.fetch_add(1, Ordering::SeqCst);
if !st.is_up() {
warn!("Node {:?} seems to be down.", id);