aboutsummaryrefslogtreecommitdiff
path: root/src/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc')
-rw-r--r--src/rpc/lib.rs4
-rw-r--r--src/rpc/membership.rs33
-rw-r--r--src/rpc/ring.rs36
-rw-r--r--src/rpc/rpc_client.rs26
-rw-r--r--src/rpc/rpc_server.rs8
5 files changed, 100 insertions, 7 deletions
diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs
index 00e31f57..96561d0e 100644
--- a/src/rpc/lib.rs
+++ b/src/rpc/lib.rs
@@ -1,7 +1,9 @@
+//! Crate containing rpc related functions and types used in Garage
+
#[macro_use]
extern crate log;
-pub mod consul;
+mod consul;
pub(crate) mod tls_util;
pub mod membership;
diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs
index 9fb24ad4..5f7bbc96 100644
--- a/src/rpc/membership.rs
+++ b/src/rpc/membership.rs
@@ -1,3 +1,4 @@
+//! Module containing structs related to membership management
use std::collections::HashMap;
use std::fmt::Write as FmtWrite;
use std::io::{Read, Write};
@@ -30,20 +31,29 @@ const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
const PING_TIMEOUT: Duration = Duration::from_secs(2);
const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5;
+/// RPC endpoint used for calls related to membership
pub const MEMBERSHIP_RPC_PATH: &str = "_membership";
+/// RPC messages related to membership
#[derive(Debug, Serialize, Deserialize)]
pub enum Message {
+ /// Response to successfull advertisements
Ok,
+ /// Message sent to detect other nodes status
Ping(PingMessage),
+ /// Ask other node for the nodes it knows. Answered with AdvertiseNodesUp
PullStatus,
+ /// Ask other node its config. Answered with AdvertiseConfig
PullConfig,
+ /// Advertisement of nodes the host knows up. Sent spontanously or in response to PullStatus
AdvertiseNodesUp(Vec<AdvertisedNode>),
+ /// Advertisement of nodes config. Sent spontanously or in response to PullConfig
AdvertiseConfig(NetworkConfig),
}
impl RpcMessage for Message {}
+/// A ping, containing informations about status and config
#[derive(Debug, Serialize, Deserialize)]
pub struct PingMessage {
id: UUID,
@@ -55,18 +65,25 @@ pub struct PingMessage {
state_info: StateInfo,
}
+/// A node advertisement
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AdvertisedNode {
+ /// Id of the node this advertisement relates to
pub id: UUID,
+ /// IP and port of the node
pub addr: SocketAddr,
+ /// Is the node considered up
pub is_up: bool,
+ /// When was the node last seen up, in milliseconds since UNIX epoch
pub last_seen: u64,
pub state_info: StateInfo,
}
+/// This node's membership manager
pub struct System {
+ /// The id of this node
pub id: UUID,
persist_config: Persister<NetworkConfig>,
@@ -79,10 +96,12 @@ pub struct System {
rpc_client: Arc<RpcClient<Message>>,
pub(crate) status: watch::Receiver<Arc<Status>>,
+ /// The ring
pub ring: watch::Receiver<Arc<Ring>>,
update_lock: Mutex<Updaters>,
+ /// The job runner of this node
pub background: Arc<BackgroundRunner>,
}
@@ -91,21 +110,29 @@ struct Updaters {
update_ring: watch::Sender<Arc<Ring>>,
}
+/// The status of each nodes, viewed by this node
#[derive(Debug, Clone)]
pub struct Status {
+ /// Mapping of each node id to its known status
pub nodes: HashMap<UUID, Arc<StatusEntry>>,
+ /// Hash of `nodes`, used to detect when nodes have different views of the cluster
pub hash: Hash,
}
+/// The status of a single node
#[derive(Debug)]
pub struct StatusEntry {
+ /// The IP and port used to connect to this node
pub addr: SocketAddr,
+ /// Last time this node was seen
pub last_seen: u64,
+ /// Number of consecutive pings sent without reply to this node
pub num_failures: AtomicUsize,
pub state_info: StateInfo,
}
impl StatusEntry {
+ /// is the node associated to this entry considered up
pub fn is_up(&self) -> bool {
self.num_failures.load(Ordering::SeqCst) < MAX_FAILURES_BEFORE_CONSIDERED_DOWN
}
@@ -195,6 +222,7 @@ fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
}
impl System {
+ /// Create this node's membership manager
pub fn new(
metadata_dir: PathBuf,
rpc_http_client: Arc<RpcHttpClient>,
@@ -279,6 +307,7 @@ impl System {
});
}
+ /// Get an RPC client
pub fn rpc_client<M: RpcMessage + 'static>(self: &Arc<Self>, path: &str) -> Arc<RpcClient<M>> {
RpcClient::new(
RpcAddrClient::new(self.rpc_http_client.clone(), path.to_string()),
@@ -287,6 +316,7 @@ impl System {
)
}
+ /// Save network configuration to disc
async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
let ring = self.ring.borrow().clone();
self.persist_config
@@ -319,6 +349,7 @@ impl System {
self.rpc_client.call_many(&to[..], msg, timeout).await;
}
+ /// Perform bootstraping, starting the ping loop
pub async fn bootstrap(
self: Arc<Self>,
peers: Vec<SocketAddr>,
@@ -386,6 +417,8 @@ impl System {
}
} else if let Some(id) = id_option {
if let Some(st) = status.nodes.get_mut(id) {
+ // we need to increment failure counter as call was done using by_addr so the
+ // counter was not auto-incremented
st.num_failures.fetch_add(1, Ordering::SeqCst);
if !st.is_up() {
warn!("Node {:?} seems to be down.", id);
diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs
index 2e997523..bffd7f1f 100644
--- a/src/rpc/ring.rs
+++ b/src/rpc/ring.rs
@@ -1,3 +1,5 @@
+//! Module containing types related to computing nodes which should receive a copy of data blocks
+//! and metadata
use std::collections::{HashMap, HashSet};
use std::convert::TryInto;
@@ -8,23 +10,30 @@ use garage_util::data::*;
// A partition number is encoded on 16 bits,
// i.e. we have up to 2**16 partitions.
// (in practice we have exactly 2**PARTITION_BITS partitions)
+/// A partition id, stored on 16 bits
pub type Partition = u16;
// TODO: make this constant parametrizable in the config file
// For deployments with many nodes it might make sense to bump
// it up to 10.
// Maximum value : 16
+/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in
+/// presence of numerous nodes, but exponentially bigger ring. Max 16
pub const PARTITION_BITS: usize = 8;
const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS);
// TODO: make this constant paraetrizable in the config file
// (most deployments use a replication factor of 3, so...)
+/// The maximum number of time an object might get replicated
pub const MAX_REPLICATION: usize = 3;
+/// The user-defined configuration of the cluster's nodes
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NetworkConfig {
+ /// Map of each node's id to it's configuration
pub members: HashMap<UUID, NetworkConfigEntry>,
+ /// Version of this config
pub version: u64,
}
@@ -37,26 +46,40 @@ impl NetworkConfig {
}
}
+/// The overall configuration of one (possibly remote) node
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct NetworkConfigEntry {
+ /// Datacenter at which this entry belong. This infromation might be used to perform a better
+ /// geodistribution
pub datacenter: String,
+ /// The (relative) capacity of the node
pub capacity: u32,
+ /// A tag to recognize the entry, not used for other things than display
pub tag: String,
}
+/// A ring distributing fairly objects to nodes
#[derive(Clone)]
pub struct Ring {
+ /// The network configuration used to generate this ring
pub config: NetworkConfig,
+ /// The list of entries in the ring
pub ring: Vec<RingEntry>,
}
+/// An entry in the ring
#[derive(Clone, Debug)]
pub struct RingEntry {
+ /// The prefix of the Hash of object which should use this entry
pub location: Hash,
+ /// The nodes in which a matching object should get stored
pub nodes: [UUID; MAX_REPLICATION],
}
impl Ring {
+ // TODO this function MUST be refactored, it's 100 lines long, with a 50 lines loop, going up to 6
+ // levels of imbrication. It is basically impossible to test, maintain, or understand for an
+ // outsider.
pub(crate) fn new(config: NetworkConfig) -> Self {
// Create a vector of partition indices (0 to 2**PARTITION_BITS-1)
let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>();
@@ -166,20 +189,16 @@ impl Ring {
})
.collect::<Vec<_>>();
- // eprintln!("RING: --");
- // for e in ring.iter() {
- // eprintln!("{:?}", e);
- // }
- // eprintln!("END --");
-
Self { config, ring }
}
+ /// Get the partition in which data would fall on
pub fn partition_of(&self, from: &Hash) -> Partition {
let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
top >> (16 - PARTITION_BITS)
}
+ /// Get the list of partitions and the first hash of a partition key that would fall in it
pub fn partitions(&self) -> Vec<(Partition, Hash)> {
let mut ret = vec![];
@@ -193,6 +212,8 @@ impl Ring {
ret
}
+ // TODO rename this function as it no longer walk the ring
+ /// Walk the ring to find the n servers in which data should be replicated
pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> {
if self.ring.len() != 1 << PARTITION_BITS {
warn!("Ring not yet ready, read/writes will be lost!");
@@ -201,12 +222,15 @@ impl Ring {
let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap());
let partition_idx = (top >> (16 - PARTITION_BITS)) as usize;
+ // TODO why computing two time in the same way and asserting?
assert_eq!(partition_idx, self.partition_of(from) as usize);
let partition = &self.ring[partition_idx];
let partition_top =
u16::from_be_bytes(partition.location.as_slice()[0..2].try_into().unwrap());
+ // TODO is this an assertion on the validity of PARTITION_MASK_U16? If so, it should
+ // probably be a test more than a runtime assertion
assert_eq!(partition_top & PARTITION_MASK_U16, top & PARTITION_MASK_U16);
assert!(n <= partition.nodes.len());
diff --git a/src/rpc/rpc_client.rs b/src/rpc/rpc_client.rs
index 5e76e215..8a6cc721 100644
--- a/src/rpc/rpc_client.rs
+++ b/src/rpc/rpc_client.rs
@@ -1,3 +1,4 @@
+//! Contain structs related to making RPCs
use std::borrow::Borrow;
use std::marker::PhantomData;
use std::net::SocketAddr;
@@ -26,14 +27,19 @@ use crate::tls_util;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
+/// Strategy to apply when making RPC
#[derive(Copy, Clone)]
pub struct RequestStrategy {
+ /// Max time to wait for reponse
pub rs_timeout: Duration,
+ /// Min number of response to consider the request successful
pub rs_quorum: usize,
+ /// Should requests be dropped after enough response are received
pub rs_interrupt_after_quorum: bool,
}
impl RequestStrategy {
+ /// Create a RequestStrategy with default timeout and not interrupting when quorum reached
pub fn with_quorum(quorum: usize) -> Self {
RequestStrategy {
rs_timeout: DEFAULT_TIMEOUT,
@@ -41,19 +47,25 @@ impl RequestStrategy {
rs_interrupt_after_quorum: false,
}
}
+ /// Set timeout of the strategy
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.rs_timeout = timeout;
self
}
+ /// Set if requests can be dropped after quorum has been reached
+ /// In general true for read requests, and false for write
pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self {
self.rs_interrupt_after_quorum = interrupt;
self
}
}
+/// Shortcut for a boxed async function taking a message, and resolving to another message or an
+/// error
pub type LocalHandlerFn<M> =
Box<dyn Fn(Arc<M>) -> Pin<Box<dyn Future<Output = Result<M, Error>> + Send>> + Send + Sync>;
+/// Client used to send RPC
pub struct RpcClient<M: RpcMessage> {
status: watch::Receiver<Arc<Status>>,
background: Arc<BackgroundRunner>,
@@ -64,6 +76,7 @@ pub struct RpcClient<M: RpcMessage> {
}
impl<M: RpcMessage + 'static> RpcClient<M> {
+ /// Create a new RpcClient from an address, a job runner, and the status of all RPC servers
pub fn new(
rac: RpcAddrClient<M>,
background: Arc<BackgroundRunner>,
@@ -77,6 +90,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
})
}
+ /// Set the local handler, to process RPC to this node without network usage
pub fn set_local_handler<F, Fut>(&self, my_id: UUID, handler: F)
where
F: Fn(Arc<M>) -> Fut + Send + Sync + 'static,
@@ -90,14 +104,17 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
self.local_handler.swap(Some(Arc::new((my_id, handler))));
}
+ /// Get a RPC client to make calls using node's SocketAddr instead of its ID
pub fn by_addr(&self) -> &RpcAddrClient<M> {
&self.rpc_addr_client
}
+ /// Make a RPC call
pub async fn call(&self, to: UUID, msg: M, timeout: Duration) -> Result<M, Error> {
self.call_arc(to, Arc::new(msg), timeout).await
}
+ /// Make a RPC call from a message stored in an Arc
pub async fn call_arc(&self, to: UUID, msg: Arc<M>, timeout: Duration) -> Result<M, Error> {
if let Some(lh) = self.local_handler.load_full() {
let (my_id, local_handler) = lh.as_ref();
@@ -134,6 +151,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
}
}
+ /// Make a RPC call to multiple servers, returning a Vec containing each result
pub async fn call_many(&self, to: &[UUID], msg: M, timeout: Duration) -> Vec<Result<M, Error>> {
let msg = Arc::new(msg);
let mut resp_stream = to
@@ -148,6 +166,8 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
results
}
+ /// Make a RPC call to multiple servers, returning either a Vec of responses, or an error if
+ /// strategy could not be respected due to too many errors
pub async fn try_call_many(
self: &Arc<Self>,
to: &[UUID],
@@ -207,6 +227,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> {
}
}
+/// Thin wrapper arround an `RpcHttpClient` specifying the path of the request
pub struct RpcAddrClient<M: RpcMessage> {
phantom: PhantomData<M>,
@@ -215,6 +236,7 @@ pub struct RpcAddrClient<M: RpcMessage> {
}
impl<M: RpcMessage> RpcAddrClient<M> {
+ /// Create an RpcAddrClient from an HTTP client and the endpoint to reach for RPCs
pub fn new(http_client: Arc<RpcHttpClient>, path: String) -> Self {
Self {
phantom: PhantomData::default(),
@@ -223,6 +245,7 @@ impl<M: RpcMessage> RpcAddrClient<M> {
}
}
+ /// Make a RPC
pub async fn call<MB>(
&self,
to_addr: &SocketAddr,
@@ -238,6 +261,7 @@ impl<M: RpcMessage> RpcAddrClient<M> {
}
}
+/// HTTP client used to make RPCs
pub struct RpcHttpClient {
request_limiter: Semaphore,
method: ClientMethod,
@@ -249,6 +273,7 @@ enum ClientMethod {
}
impl RpcHttpClient {
+ /// Create a new RpcHttpClient
pub fn new(
max_concurrent_requests: usize,
tls_config: &Option<TlsConfig>,
@@ -279,6 +304,7 @@ impl RpcHttpClient {
})
}
+ /// Make a RPC
async fn call<M, MB>(
&self,
path: &str,
diff --git a/src/rpc/rpc_server.rs b/src/rpc/rpc_server.rs
index 0d82d796..4419a6f0 100644
--- a/src/rpc/rpc_server.rs
+++ b/src/rpc/rpc_server.rs
@@ -1,3 +1,4 @@
+//! Contains structs related to receiving RPCs
use std::collections::HashMap;
use std::net::SocketAddr;
use std::pin::Pin;
@@ -22,13 +23,17 @@ use garage_util::error::Error;
use crate::tls_util;
+/// Trait for messages that can be sent as RPC
pub trait RpcMessage: Serialize + for<'de> Deserialize<'de> + Send + Sync {}
type ResponseFuture = Pin<Box<dyn Future<Output = Result<Response<Body>, Error>> + Send>>;
type Handler = Box<dyn Fn(Request<Body>, SocketAddr) -> ResponseFuture + Send + Sync>;
+/// Structure handling RPCs
pub struct RpcServer {
+ /// The address the RpcServer will bind
pub bind_addr: SocketAddr,
+ /// The tls configuration used for RPC
pub tls_config: Option<TlsConfig>,
handlers: HashMap<String, Handler>,
@@ -87,6 +92,7 @@ where
}
impl RpcServer {
+ /// Create a new RpcServer
pub fn new(bind_addr: SocketAddr, tls_config: Option<TlsConfig>) -> Self {
Self {
bind_addr,
@@ -95,6 +101,7 @@ impl RpcServer {
}
}
+ /// Add handler handling request made to `name`
pub fn add_handler<M, F, Fut>(&mut self, name: String, handler: F)
where
M: RpcMessage + 'static,
@@ -156,6 +163,7 @@ impl RpcServer {
}
}
+ /// Run the RpcServer
pub async fn run(
self: Arc<Self>,
shutdown_signal: impl Future<Output = ()>,