aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/membership.rs
diff options
context:
space:
mode:
authorTrinity Pointard <trinity.pointard@gmail.com>2021-03-22 00:00:09 +0100
committerAlex Auvolat <alex@adnab.me>2021-04-27 16:37:10 +0200
commit8e0524ae15e5252aecd30dc01d6993810cf49811 (patch)
treec12c1079617112c90bd47d3240587ebb54bdaf9b /src/rpc/membership.rs
parentf9bd2d8fb79a8f3dbea54834b39e65438846ea5c (diff)
downloadgarage-8e0524ae15e5252aecd30dc01d6993810cf49811.tar.gz
garage-8e0524ae15e5252aecd30dc01d6993810cf49811.zip
document rpc crate
Diffstat (limited to 'src/rpc/membership.rs')
-rw-r--r--src/rpc/membership.rs34
1 files changed, 33 insertions, 1 deletions
diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs
index 9fb24ad4..c465ce68 100644
--- a/src/rpc/membership.rs
+++ b/src/rpc/membership.rs
@@ -1,3 +1,4 @@
+/// Module containing structs related to membership management
use std::collections::HashMap;
use std::fmt::Write as FmtWrite;
use std::io::{Read, Write};
@@ -30,20 +31,29 @@ const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
const PING_TIMEOUT: Duration = Duration::from_secs(2);
const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5;
+/// RPC endpoint used for calls related to membership
pub const MEMBERSHIP_RPC_PATH: &str = "_membership";
+/// RPC messages related to membership
#[derive(Debug, Serialize, Deserialize)]
pub enum Message {
+ /// Response to successfull advertisements
Ok,
+ /// Message sent to detect other nodes status
Ping(PingMessage),
+ /// Ask other node for the nodes it knows. Answered with AdvertiseNodesUp
PullStatus,
+ /// Ask other node its config. Answered with AdvertiseConfig
PullConfig,
+ /// Advertisement of nodes the host knows up. Sent spontanously or in response to PullStatus
AdvertiseNodesUp(Vec<AdvertisedNode>),
+ /// Advertisement of nodes config. Sent spontanously or in response to PullConfig
AdvertiseConfig(NetworkConfig),
}
impl RpcMessage for Message {}
+/// A ping, containing informations about status and config
#[derive(Debug, Serialize, Deserialize)]
pub struct PingMessage {
id: UUID,
@@ -55,18 +65,25 @@ pub struct PingMessage {
state_info: StateInfo,
}
+/// A node advertisement
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AdvertisedNode {
+ /// Id of the node this advertisement relates to
pub id: UUID,
+ /// IP and port of the node
pub addr: SocketAddr,
+ /// Is the node considered up
pub is_up: bool,
+ /// When was the node last seen up, in milliseconds since UNIX epoch
pub last_seen: u64,
pub state_info: StateInfo,
}
+/// This node's membership manager
pub struct System {
+ /// The id of this node
pub id: UUID,
persist_config: Persister<NetworkConfig>,
@@ -79,10 +96,12 @@ pub struct System {
rpc_client: Arc<RpcClient<Message>>,
pub(crate) status: watch::Receiver<Arc<Status>>,
+ /// The ring, viewed by this node
pub ring: watch::Receiver<Arc<Ring>>,
update_lock: Mutex<Updaters>,
+ /// The job runner of this node
pub background: Arc<BackgroundRunner>,
}
@@ -91,21 +110,30 @@ struct Updaters {
update_ring: watch::Sender<Arc<Ring>>,
}
+/// The status of each nodes, viewed by this node
#[derive(Debug, Clone)]
pub struct Status {
+ /// Mapping of each node id to its known status
+ // considering its sorted regularly, maybe it should be a btreeset?
pub nodes: HashMap<UUID, Arc<StatusEntry>>,
+ /// Hash of this entry
pub hash: Hash,
}
+/// The status of a single node
#[derive(Debug)]
pub struct StatusEntry {
+ /// The IP and port used to connect to this node
pub addr: SocketAddr,
+ /// Last time this node was seen
pub last_seen: u64,
+ /// Number of consecutive pings sent without reply to this node
pub num_failures: AtomicUsize,
pub state_info: StateInfo,
}
impl StatusEntry {
+ /// is the node associated to this entry considered up
pub fn is_up(&self) -> bool {
self.num_failures.load(Ordering::SeqCst) < MAX_FAILURES_BEFORE_CONSIDERED_DOWN
}
@@ -195,6 +223,7 @@ fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
}
impl System {
+ /// Create this node's membership manager
pub fn new(
metadata_dir: PathBuf,
rpc_http_client: Arc<RpcHttpClient>,
@@ -279,6 +308,7 @@ impl System {
});
}
+ /// Get an RPC client
pub fn rpc_client<M: RpcMessage + 'static>(self: &Arc<Self>, path: &str) -> Arc<RpcClient<M>> {
RpcClient::new(
RpcAddrClient::new(self.rpc_http_client.clone(), path.to_string()),
@@ -287,6 +317,7 @@ impl System {
)
}
+ /// Save network configuration to disc
async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
let ring = self.ring.borrow().clone();
self.persist_config
@@ -319,6 +350,7 @@ impl System {
self.rpc_client.call_many(&to[..], msg, timeout).await;
}
+ /// Perform bootstraping, starting the ping loop
pub async fn bootstrap(
self: Arc<Self>,
peers: Vec<SocketAddr>,
@@ -348,7 +380,7 @@ impl System {
id_option,
addr,
sys.rpc_client
- .by_addr()
+ .get_addr()
.call(&addr, ping_msg_ref, PING_TIMEOUT)
.await,
)