diff options
Diffstat (limited to 'src/rpc')
-rw-r--r-- | src/rpc/lib.rs | 4 | ||||
-rw-r--r-- | src/rpc/membership.rs | 33 | ||||
-rw-r--r-- | src/rpc/ring.rs | 36 | ||||
-rw-r--r-- | src/rpc/rpc_client.rs | 26 | ||||
-rw-r--r-- | src/rpc/rpc_server.rs | 8 |
5 files changed, 100 insertions, 7 deletions
diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs index 00e31f57..96561d0e 100644 --- a/src/rpc/lib.rs +++ b/src/rpc/lib.rs @@ -1,7 +1,9 @@ +//! Crate containing rpc related functions and types used in Garage + #[macro_use] extern crate log; -pub mod consul; +mod consul; pub(crate) mod tls_util; pub mod membership; diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index 9fb24ad4..5f7bbc96 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -1,3 +1,4 @@ +//! Module containing structs related to membership management use std::collections::HashMap; use std::fmt::Write as FmtWrite; use std::io::{Read, Write}; @@ -30,20 +31,29 @@ const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60); const PING_TIMEOUT: Duration = Duration::from_secs(2); const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5; +/// RPC endpoint used for calls related to membership pub const MEMBERSHIP_RPC_PATH: &str = "_membership"; +/// RPC messages related to membership #[derive(Debug, Serialize, Deserialize)] pub enum Message { + /// Response to successfull advertisements Ok, + /// Message sent to detect other nodes status Ping(PingMessage), + /// Ask other node for the nodes it knows. Answered with AdvertiseNodesUp PullStatus, + /// Ask other node its config. Answered with AdvertiseConfig PullConfig, + /// Advertisement of nodes the host knows up. Sent spontanously or in response to PullStatus AdvertiseNodesUp(Vec<AdvertisedNode>), + /// Advertisement of nodes config. Sent spontanously or in response to PullConfig AdvertiseConfig(NetworkConfig), } impl RpcMessage for Message {} +/// A ping, containing informations about status and config #[derive(Debug, Serialize, Deserialize)] pub struct PingMessage { id: UUID, @@ -55,18 +65,25 @@ pub struct PingMessage { state_info: StateInfo, } +/// A node advertisement #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AdvertisedNode { + /// Id of the node this advertisement relates to pub id: UUID, + /// IP and port of the node pub addr: SocketAddr, + /// Is the node considered up pub is_up: bool, + /// When was the node last seen up, in milliseconds since UNIX epoch pub last_seen: u64, pub state_info: StateInfo, } +/// This node's membership manager pub struct System { + /// The id of this node pub id: UUID, persist_config: Persister<NetworkConfig>, @@ -79,10 +96,12 @@ pub struct System { rpc_client: Arc<RpcClient<Message>>, pub(crate) status: watch::Receiver<Arc<Status>>, + /// The ring pub ring: watch::Receiver<Arc<Ring>>, update_lock: Mutex<Updaters>, + /// The job runner of this node pub background: Arc<BackgroundRunner>, } @@ -91,21 +110,29 @@ struct Updaters { update_ring: watch::Sender<Arc<Ring>>, } +/// The status of each nodes, viewed by this node #[derive(Debug, Clone)] pub struct Status { + /// Mapping of each node id to its known status pub nodes: HashMap<UUID, Arc<StatusEntry>>, + /// Hash of `nodes`, used to detect when nodes have different views of the cluster pub hash: Hash, } +/// The status of a single node #[derive(Debug)] pub struct StatusEntry { + /// The IP and port used to connect to this node pub addr: SocketAddr, + /// Last time this node was seen pub last_seen: u64, + /// Number of consecutive pings sent without reply to this node pub num_failures: AtomicUsize, pub state_info: StateInfo, } impl StatusEntry { + /// is the node associated to this entry considered up pub fn is_up(&self) -> bool { self.num_failures.load(Ordering::SeqCst) < MAX_FAILURES_BEFORE_CONSIDERED_DOWN } @@ -195,6 +222,7 @@ fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> { } impl System { + /// Create this node's membership manager pub fn new( metadata_dir: PathBuf, rpc_http_client: Arc<RpcHttpClient>, @@ -279,6 +307,7 @@ impl System { }); } + /// Get an RPC client pub fn rpc_client<M: RpcMessage + 'static>(self: &Arc<Self>, path: &str) -> Arc<RpcClient<M>> { RpcClient::new( RpcAddrClient::new(self.rpc_http_client.clone(), path.to_string()), @@ -287,6 +316,7 @@ impl System { ) } + /// Save network configuration to disc async fn save_network_config(self: Arc<Self>) -> Result<(), Error> { let ring = self.ring.borrow().clone(); self.persist_config @@ -319,6 +349,7 @@ impl System { self.rpc_client.call_many(&to[..], msg, timeout).await; } + /// Perform bootstraping, starting the ping loop pub async fn bootstrap( self: Arc<Self>, peers: Vec<SocketAddr>, @@ -386,6 +417,8 @@ impl System { } } else if let Some(id) = id_option { if let Some(st) = status.nodes.get_mut(id) { + // we need to increment failure counter as call was done using by_addr so the + // counter was not auto-incremented st.num_failures.fetch_add(1, Ordering::SeqCst); if !st.is_up() { warn!("Node {:?} seems to be down.", id); diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs index 2e997523..bffd7f1f 100644 --- a/src/rpc/ring.rs +++ b/src/rpc/ring.rs @@ -1,3 +1,5 @@ +//! Module containing types related to computing nodes which should receive a copy of data blocks +//! and metadata use std::collections::{HashMap, HashSet}; use std::convert::TryInto; @@ -8,23 +10,30 @@ use garage_util::data::*; // A partition number is encoded on 16 bits, // i.e. we have up to 2**16 partitions. // (in practice we have exactly 2**PARTITION_BITS partitions) +/// A partition id, stored on 16 bits pub type Partition = u16; // TODO: make this constant parametrizable in the config file // For deployments with many nodes it might make sense to bump // it up to 10. // Maximum value : 16 +/// How many bits from the hash are used to make partitions. Higher numbers means more fairness in +/// presence of numerous nodes, but exponentially bigger ring. Max 16 pub const PARTITION_BITS: usize = 8; const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS); // TODO: make this constant paraetrizable in the config file // (most deployments use a replication factor of 3, so...) +/// The maximum number of time an object might get replicated pub const MAX_REPLICATION: usize = 3; +/// The user-defined configuration of the cluster's nodes #[derive(Clone, Debug, Serialize, Deserialize)] pub struct NetworkConfig { + /// Map of each node's id to it's configuration pub members: HashMap<UUID, NetworkConfigEntry>, + /// Version of this config pub version: u64, } @@ -37,26 +46,40 @@ impl NetworkConfig { } } +/// The overall configuration of one (possibly remote) node #[derive(Clone, Debug, Serialize, Deserialize)] pub struct NetworkConfigEntry { + /// Datacenter at which this entry belong. This infromation might be used to perform a better + /// geodistribution pub datacenter: String, + /// The (relative) capacity of the node pub capacity: u32, + /// A tag to recognize the entry, not used for other things than display pub tag: String, } +/// A ring distributing fairly objects to nodes #[derive(Clone)] pub struct Ring { + /// The network configuration used to generate this ring pub config: NetworkConfig, + /// The list of entries in the ring pub ring: Vec<RingEntry>, } +/// An entry in the ring #[derive(Clone, Debug)] pub struct RingEntry { + /// The prefix of the Hash of object which should use this entry pub location: Hash, + /// The nodes in which a matching object should get stored pub nodes: [UUID; MAX_REPLICATION], } impl Ring { + // TODO this function MUST be refactored, it's 100 lines long, with a 50 lines loop, going up to 6 + // levels of imbrication. It is basically impossible to test, maintain, or understand for an + // outsider. pub(crate) fn new(config: NetworkConfig) -> Self { // Create a vector of partition indices (0 to 2**PARTITION_BITS-1) let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>(); @@ -166,20 +189,16 @@ impl Ring { }) .collect::<Vec<_>>(); - // eprintln!("RING: --"); - // for e in ring.iter() { - // eprintln!("{:?}", e); - // } - // eprintln!("END --"); - Self { config, ring } } + /// Get the partition in which data would fall on pub fn partition_of(&self, from: &Hash) -> Partition { let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap()); top >> (16 - PARTITION_BITS) } + /// Get the list of partitions and the first hash of a partition key that would fall in it pub fn partitions(&self) -> Vec<(Partition, Hash)> { let mut ret = vec![]; @@ -193,6 +212,8 @@ impl Ring { ret } + // TODO rename this function as it no longer walk the ring + /// Walk the ring to find the n servers in which data should be replicated pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> { if self.ring.len() != 1 << PARTITION_BITS { warn!("Ring not yet ready, read/writes will be lost!"); @@ -201,12 +222,15 @@ impl Ring { let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap()); let partition_idx = (top >> (16 - PARTITION_BITS)) as usize; + // TODO why computing two time in the same way and asserting? assert_eq!(partition_idx, self.partition_of(from) as usize); let partition = &self.ring[partition_idx]; let partition_top = u16::from_be_bytes(partition.location.as_slice()[0..2].try_into().unwrap()); + // TODO is this an assertion on the validity of PARTITION_MASK_U16? If so, it should + // probably be a test more than a runtime assertion assert_eq!(partition_top & PARTITION_MASK_U16, top & PARTITION_MASK_U16); assert!(n <= partition.nodes.len()); diff --git a/src/rpc/rpc_client.rs b/src/rpc/rpc_client.rs index 5e76e215..8a6cc721 100644 --- a/src/rpc/rpc_client.rs +++ b/src/rpc/rpc_client.rs @@ -1,3 +1,4 @@ +//! Contain structs related to making RPCs use std::borrow::Borrow; use std::marker::PhantomData; use std::net::SocketAddr; @@ -26,14 +27,19 @@ use crate::tls_util; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); +/// Strategy to apply when making RPC #[derive(Copy, Clone)] pub struct RequestStrategy { + /// Max time to wait for reponse pub rs_timeout: Duration, + /// Min number of response to consider the request successful pub rs_quorum: usize, + /// Should requests be dropped after enough response are received pub rs_interrupt_after_quorum: bool, } impl RequestStrategy { + /// Create a RequestStrategy with default timeout and not interrupting when quorum reached pub fn with_quorum(quorum: usize) -> Self { RequestStrategy { rs_timeout: DEFAULT_TIMEOUT, @@ -41,19 +47,25 @@ impl RequestStrategy { rs_interrupt_after_quorum: false, } } + /// Set timeout of the strategy pub fn with_timeout(mut self, timeout: Duration) -> Self { self.rs_timeout = timeout; self } + /// Set if requests can be dropped after quorum has been reached + /// In general true for read requests, and false for write pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self { self.rs_interrupt_after_quorum = interrupt; self } } +/// Shortcut for a boxed async function taking a message, and resolving to another message or an +/// error pub type LocalHandlerFn<M> = Box<dyn Fn(Arc<M>) -> Pin<Box<dyn Future<Output = Result<M, Error>> + Send>> + Send + Sync>; +/// Client used to send RPC pub struct RpcClient<M: RpcMessage> { status: watch::Receiver<Arc<Status>>, background: Arc<BackgroundRunner>, @@ -64,6 +76,7 @@ pub struct RpcClient<M: RpcMessage> { } impl<M: RpcMessage + 'static> RpcClient<M> { + /// Create a new RpcClient from an address, a job runner, and the status of all RPC servers pub fn new( rac: RpcAddrClient<M>, background: Arc<BackgroundRunner>, @@ -77,6 +90,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> { }) } + /// Set the local handler, to process RPC to this node without network usage pub fn set_local_handler<F, Fut>(&self, my_id: UUID, handler: F) where F: Fn(Arc<M>) -> Fut + Send + Sync + 'static, @@ -90,14 +104,17 @@ impl<M: RpcMessage + 'static> RpcClient<M> { self.local_handler.swap(Some(Arc::new((my_id, handler)))); } + /// Get a RPC client to make calls using node's SocketAddr instead of its ID pub fn by_addr(&self) -> &RpcAddrClient<M> { &self.rpc_addr_client } + /// Make a RPC call pub async fn call(&self, to: UUID, msg: M, timeout: Duration) -> Result<M, Error> { self.call_arc(to, Arc::new(msg), timeout).await } + /// Make a RPC call from a message stored in an Arc pub async fn call_arc(&self, to: UUID, msg: Arc<M>, timeout: Duration) -> Result<M, Error> { if let Some(lh) = self.local_handler.load_full() { let (my_id, local_handler) = lh.as_ref(); @@ -134,6 +151,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> { } } + /// Make a RPC call to multiple servers, returning a Vec containing each result pub async fn call_many(&self, to: &[UUID], msg: M, timeout: Duration) -> Vec<Result<M, Error>> { let msg = Arc::new(msg); let mut resp_stream = to @@ -148,6 +166,8 @@ impl<M: RpcMessage + 'static> RpcClient<M> { results } + /// Make a RPC call to multiple servers, returning either a Vec of responses, or an error if + /// strategy could not be respected due to too many errors pub async fn try_call_many( self: &Arc<Self>, to: &[UUID], @@ -207,6 +227,7 @@ impl<M: RpcMessage + 'static> RpcClient<M> { } } +/// Thin wrapper arround an `RpcHttpClient` specifying the path of the request pub struct RpcAddrClient<M: RpcMessage> { phantom: PhantomData<M>, @@ -215,6 +236,7 @@ pub struct RpcAddrClient<M: RpcMessage> { } impl<M: RpcMessage> RpcAddrClient<M> { + /// Create an RpcAddrClient from an HTTP client and the endpoint to reach for RPCs pub fn new(http_client: Arc<RpcHttpClient>, path: String) -> Self { Self { phantom: PhantomData::default(), @@ -223,6 +245,7 @@ impl<M: RpcMessage> RpcAddrClient<M> { } } + /// Make a RPC pub async fn call<MB>( &self, to_addr: &SocketAddr, @@ -238,6 +261,7 @@ impl<M: RpcMessage> RpcAddrClient<M> { } } +/// HTTP client used to make RPCs pub struct RpcHttpClient { request_limiter: Semaphore, method: ClientMethod, @@ -249,6 +273,7 @@ enum ClientMethod { } impl RpcHttpClient { + /// Create a new RpcHttpClient pub fn new( max_concurrent_requests: usize, tls_config: &Option<TlsConfig>, @@ -279,6 +304,7 @@ impl RpcHttpClient { }) } + /// Make a RPC async fn call<M, MB>( &self, path: &str, diff --git a/src/rpc/rpc_server.rs b/src/rpc/rpc_server.rs index 0d82d796..4419a6f0 100644 --- a/src/rpc/rpc_server.rs +++ b/src/rpc/rpc_server.rs @@ -1,3 +1,4 @@ +//! Contains structs related to receiving RPCs use std::collections::HashMap; use std::net::SocketAddr; use std::pin::Pin; @@ -22,13 +23,17 @@ use garage_util::error::Error; use crate::tls_util; +/// Trait for messages that can be sent as RPC pub trait RpcMessage: Serialize + for<'de> Deserialize<'de> + Send + Sync {} type ResponseFuture = Pin<Box<dyn Future<Output = Result<Response<Body>, Error>> + Send>>; type Handler = Box<dyn Fn(Request<Body>, SocketAddr) -> ResponseFuture + Send + Sync>; +/// Structure handling RPCs pub struct RpcServer { + /// The address the RpcServer will bind pub bind_addr: SocketAddr, + /// The tls configuration used for RPC pub tls_config: Option<TlsConfig>, handlers: HashMap<String, Handler>, @@ -87,6 +92,7 @@ where } impl RpcServer { + /// Create a new RpcServer pub fn new(bind_addr: SocketAddr, tls_config: Option<TlsConfig>) -> Self { Self { bind_addr, @@ -95,6 +101,7 @@ impl RpcServer { } } + /// Add handler handling request made to `name` pub fn add_handler<M, F, Fut>(&mut self, name: String, handler: F) where M: RpcMessage + 'static, @@ -156,6 +163,7 @@ impl RpcServer { } } + /// Run the RpcServer pub async fn run( self: Arc<Self>, shutdown_signal: impl Future<Output = ()>, |