aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/api/admin/cluster.rs18
-rw-r--r--src/block/manager.rs10
-rw-r--r--src/block/resync.rs4
-rw-r--r--src/model/k2v/rpc.rs20
-rw-r--r--src/rpc/layout/manager.rs177
-rw-r--r--src/rpc/layout/mod.rs2
-rw-r--r--src/rpc/system.rs295
-rw-r--r--src/table/gc.rs4
-rw-r--r--src/table/sync.rs10
-rw-r--r--src/table/table.rs10
10 files changed, 331 insertions, 219 deletions
diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs
index fe8e8764..f5483451 100644
--- a/src/api/admin/cluster.rs
+++ b/src/api/admin/cluster.rs
@@ -240,7 +240,11 @@ pub async fn handle_update_cluster_layout(
.merge(&roles.update_mutator(node, layout::NodeRoleV(new_role)));
}
- garage.system.update_cluster_layout(&layout).await?;
+ garage
+ .system
+ .layout_manager
+ .update_cluster_layout(&layout)
+ .await?;
let res = format_cluster_layout(&layout);
Ok(json_ok_response(&res)?)
@@ -255,7 +259,11 @@ pub async fn handle_apply_cluster_layout(
let layout = garage.system.cluster_layout().as_ref().clone();
let (layout, msg) = layout.apply_staged_changes(Some(param.version))?;
- garage.system.update_cluster_layout(&layout).await?;
+ garage
+ .system
+ .layout_manager
+ .update_cluster_layout(&layout)
+ .await?;
let res = ApplyClusterLayoutResponse {
message: msg,
@@ -267,7 +275,11 @@ pub async fn handle_apply_cluster_layout(
pub async fn handle_revert_cluster_layout(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
let layout = garage.system.cluster_layout().as_ref().clone();
let layout = layout.revert_staged_changes()?;
- garage.system.update_cluster_layout(&layout).await?;
+ garage
+ .system
+ .layout_manager
+ .update_cluster_layout(&layout)
+ .await?;
let res = format_cluster_layout(&layout);
Ok(json_ok_response(&res)?)
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 2d1b5c67..72b4ea66 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -265,7 +265,7 @@ impl BlockManager {
Fut: futures::Future<Output = Result<T, Error>>,
{
let who = self.replication.read_nodes(hash);
- let who = self.system.rpc.request_order(&who);
+ let who = self.system.rpc_helper().request_order(&who);
for node in who.iter() {
let node_id = NodeID::from(*node);
@@ -305,7 +305,7 @@ impl BlockManager {
// if the first one doesn't succeed rapidly
// TODO: keep first request running when initiating a new one and take the
// one that finishes earlier
- _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => {
+ _ = tokio::time::sleep(self.system.rpc_helper().rpc_timeout()) => {
debug!("Get block {:?}: node {:?} didn't return block in time, trying next.", hash, node);
}
};
@@ -363,7 +363,7 @@ impl BlockManager {
Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes);
self.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
&who[..],
@@ -439,7 +439,7 @@ impl BlockManager {
tokio::spawn(async move {
if let Err(e) = this
.resync
- .put_to_resync(&hash, 2 * this.system.rpc.rpc_timeout())
+ .put_to_resync(&hash, 2 * this.system.rpc_helper().rpc_timeout())
{
error!("Block {:?} could not be put in resync queue: {}.", hash, e);
}
@@ -533,7 +533,7 @@ impl BlockManager {
None => {
// Not found but maybe we should have had it ??
self.resync
- .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?;
+ .put_to_resync(hash, 2 * self.system.rpc_helper().rpc_timeout())?;
return Err(Error::Message(format!(
"block {:?} not found on node",
hash
diff --git a/src/block/resync.rs b/src/block/resync.rs
index 9c1da4a7..fedcd6f5 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -385,7 +385,7 @@ impl BlockResyncManager {
let who_needs_resps = manager
.system
- .rpc
+ .rpc_helper()
.call_many(
&manager.endpoint,
&who,
@@ -431,7 +431,7 @@ impl BlockResyncManager {
.with_stream_from_buffer(bytes);
manager
.system
- .rpc
+ .rpc_helper()
.try_call_many(
&manager.endpoint,
&need_nodes[..],
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index 37e142f6..2f548ad7 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -131,7 +131,7 @@ impl K2VRpcHandler {
who.sort();
self.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
&who[..],
@@ -187,7 +187,7 @@ impl K2VRpcHandler {
let call_futures = call_list.into_iter().map(|(nodes, items)| async move {
let resp = self
.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
&nodes[..],
@@ -229,7 +229,7 @@ impl K2VRpcHandler {
.replication
.write_nodes(&poll_key.partition.hash());
- let rpc = self.system.rpc.try_call_many(
+ let rpc = self.system.rpc_helper().try_call_many(
&self.endpoint,
&nodes[..],
K2VRpc::PollItem {
@@ -241,7 +241,8 @@ impl K2VRpcHandler {
.with_quorum(self.item_table.data.replication.read_quorum())
.without_timeout(),
);
- let timeout_duration = Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout();
+ let timeout_duration =
+ Duration::from_millis(timeout_msec) + self.system.rpc_helper().rpc_timeout();
let resps = select! {
r = rpc => r?,
_ = tokio::time::sleep(timeout_duration) => return Ok(None),
@@ -300,7 +301,11 @@ impl K2VRpcHandler {
let rs = RequestStrategy::with_priority(PRIO_NORMAL).without_timeout();
let mut requests = nodes
.iter()
- .map(|node| self.system.rpc.call(&self.endpoint, *node, msg.clone(), rs))
+ .map(|node| {
+ self.system
+ .rpc_helper()
+ .call(&self.endpoint, *node, msg.clone(), rs)
+ })
.collect::<FuturesUnordered<_>>();
// Fetch responses. This procedure stops fetching responses when any of the following
@@ -316,8 +321,9 @@ impl K2VRpcHandler {
// kind: all items produced by that node until time ts have been returned, so we can
// bump the entry in the global vector clock and possibly remove some item-specific
// vector clocks)
- let mut deadline =
- Instant::now() + Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout();
+ let mut deadline = Instant::now()
+ + Duration::from_millis(timeout_msec)
+ + self.system.rpc_helper().rpc_timeout();
let mut resps = vec![];
let mut errors = vec![];
loop {
diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs
new file mode 100644
index 00000000..a8a77139
--- /dev/null
+++ b/src/rpc/layout/manager.rs
@@ -0,0 +1,177 @@
+use std::sync::Arc;
+use std::time::Duration;
+
+use tokio::sync::watch;
+use tokio::sync::Mutex;
+
+use netapp::endpoint::Endpoint;
+use netapp::peering::fullmesh::FullMeshPeeringStrategy;
+use netapp::NodeID;
+
+use garage_util::config::Config;
+use garage_util::data::*;
+use garage_util::error::*;
+use garage_util::persister::Persister;
+
+use super::*;
+use crate::rpc_helper::*;
+use crate::system::*;
+
+pub struct LayoutManager {
+ replication_factor: usize,
+ persist_cluster_layout: Persister<LayoutHistory>,
+
+ pub layout_watch: watch::Receiver<Arc<LayoutHistory>>,
+ update_layout: Mutex<watch::Sender<Arc<LayoutHistory>>>,
+
+ pub(crate) rpc_helper: RpcHelper,
+ system_endpoint: Arc<Endpoint<SystemRpc, System>>,
+}
+
+impl LayoutManager {
+ pub fn new(
+ config: &Config,
+ node_id: NodeID,
+ system_endpoint: Arc<Endpoint<SystemRpc, System>>,
+ fullmesh: Arc<FullMeshPeeringStrategy>,
+ replication_factor: usize,
+ ) -> Result<Self, Error> {
+ let persist_cluster_layout: Persister<LayoutHistory> =
+ Persister::new(&config.metadata_dir, "cluster_layout");
+
+ let cluster_layout = match persist_cluster_layout.load() {
+ Ok(x) => {
+ if x.current().replication_factor != replication_factor {
+ return Err(Error::Message(format!(
+ "Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.",
+ x.current().replication_factor,
+ replication_factor
+ )));
+ }
+ x
+ }
+ Err(e) => {
+ info!(
+ "No valid previous cluster layout stored ({}), starting fresh.",
+ e
+ );
+ LayoutHistory::new(replication_factor)
+ }
+ };
+
+ let (update_layout, layout_watch) = watch::channel(Arc::new(cluster_layout));
+
+ let rpc_helper = RpcHelper::new(
+ node_id.into(),
+ fullmesh,
+ layout_watch.clone(),
+ config.rpc_timeout_msec.map(Duration::from_millis),
+ );
+
+ Ok(Self {
+ replication_factor,
+ persist_cluster_layout,
+ layout_watch,
+ update_layout: Mutex::new(update_layout),
+ system_endpoint,
+ rpc_helper,
+ })
+ }
+
+ // ---- PUBLIC INTERFACE ----
+
+ pub async fn update_cluster_layout(&self, layout: &LayoutHistory) -> Result<(), Error> {
+ self.handle_advertise_cluster_layout(layout).await?;
+ Ok(())
+ }
+
+ pub fn history(&self) -> watch::Ref<Arc<LayoutHistory>> {
+ self.layout_watch.borrow()
+ }
+
+ pub(crate) async fn pull_cluster_layout(&self, peer: Uuid) {
+ let resp = self
+ .rpc_helper
+ .call(
+ &self.system_endpoint,
+ peer,
+ SystemRpc::PullClusterLayout,
+ RequestStrategy::with_priority(PRIO_HIGH),
+ )
+ .await;
+ if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp {
+ let _: Result<_, _> = self.handle_advertise_cluster_layout(&layout).await;
+ }
+ }
+
+ // ---- INTERNALS ---
+
+ /// Save network configuration to disc
+ async fn save_cluster_layout(&self) -> Result<(), Error> {
+ let layout: Arc<LayoutHistory> = self.layout_watch.borrow().clone();
+ self.persist_cluster_layout
+ .save_async(&layout)
+ .await
+ .expect("Cannot save current cluster layout");
+ Ok(())
+ }
+
+ // ---- RPC HANDLERS ----
+
+ pub(crate) fn handle_pull_cluster_layout(&self) -> SystemRpc {
+ let layout = self.layout_watch.borrow().as_ref().clone();
+ SystemRpc::AdvertiseClusterLayout(layout)
+ }
+
+ pub(crate) async fn handle_advertise_cluster_layout(
+ &self,
+ adv: &LayoutHistory,
+ ) -> Result<SystemRpc, Error> {
+ if adv.current().replication_factor != self.replication_factor {
+ let msg = format!(
+ "Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.",
+ adv.current().replication_factor,
+ self.replication_factor
+ );
+ error!("{}", msg);
+ return Err(Error::Message(msg));
+ }
+
+ let update_layout = self.update_layout.lock().await;
+ // TODO: don't clone each time an AdvertiseClusterLayout is received
+ let mut layout: LayoutHistory = self.layout_watch.borrow().as_ref().clone();
+
+ let prev_layout_check = layout.check().is_ok();
+ if layout.merge(adv) {
+ if prev_layout_check && layout.check().is_err() {
+ error!("New cluster layout is invalid, discarding.");
+ return Err(Error::Message(
+ "New cluster layout is invalid, discarding.".into(),
+ ));
+ }
+
+ update_layout.send(Arc::new(layout.clone()))?;
+ drop(update_layout);
+
+ /* TODO
+ tokio::spawn(async move {
+ if let Err(e) = system
+ .rpc_helper()
+ .broadcast(
+ &system.system_endpoint,
+ SystemRpc::AdvertiseClusterLayout(layout),
+ RequestStrategy::with_priority(PRIO_HIGH),
+ )
+ .await
+ {
+ warn!("Error while broadcasting new cluster layout: {}", e);
+ }
+ });
+ */
+
+ self.save_cluster_layout().await?;
+ }
+
+ Ok(SystemRpc::Ok)
+ }
+}
diff --git a/src/rpc/layout/mod.rs b/src/rpc/layout/mod.rs
index 7c15988a..cd3764bc 100644
--- a/src/rpc/layout/mod.rs
+++ b/src/rpc/layout/mod.rs
@@ -3,6 +3,8 @@ mod history;
mod schema;
mod version;
+pub mod manager;
+
// ---- re-exports ----
pub use history::*;
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index a7433b68..a8e88425 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -14,7 +14,6 @@ use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::sign::ed25519;
use tokio::select;
use tokio::sync::watch;
-use tokio::sync::Mutex;
use netapp::endpoint::{Endpoint, EndpointHandler};
use netapp::message::*;
@@ -34,6 +33,7 @@ use garage_util::time::*;
use crate::consul::ConsulDiscovery;
#[cfg(feature = "kubernetes-discovery")]
use crate::kubernetes::*;
+use crate::layout::manager::LayoutManager;
use crate::layout::*;
use crate::replication_mode::*;
use crate::rpc_helper::*;
@@ -49,7 +49,7 @@ const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10);
pub const GARAGE_VERSION_TAG: u64 = 0x676172616765000A; // garage 0x000A
/// RPC endpoint used for calls related to membership
-pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc";
+pub const SYSTEM_RPC_PATH: &str = "garage_rpc/system.rs/SystemRpc";
/// RPC messages related to membership
#[derive(Debug, Serialize, Deserialize, Clone)]
@@ -58,17 +58,17 @@ pub enum SystemRpc {
Ok,
/// Request to connect to a specific node (in <pubkey>@<host>:<port> format)
Connect(String),
- /// Ask other node its cluster layout. Answered with AdvertiseClusterLayout
- PullClusterLayout,
/// Advertise Garage status. Answered with another AdvertiseStatus.
/// Exchanged with every node on a regular basis.
AdvertiseStatus(NodeStatus),
- /// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout
- AdvertiseClusterLayout(LayoutHistory),
/// Get known nodes states
GetKnownNodes,
/// Return known nodes
ReturnKnownNodes(Vec<KnownNodeInfo>),
+ /// Ask other node its cluster layout. Answered with AdvertiseClusterLayout
+ PullClusterLayout,
+ /// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout
+ AdvertiseClusterLayout(LayoutHistory),
}
impl Rpc for SystemRpc {
@@ -84,7 +84,6 @@ pub struct System {
/// The id of this node
pub id: Uuid,
- persist_cluster_layout: Persister<LayoutHistory>,
persist_peer_list: Persister<PeerList>,
local_status: ArcSwap<NodeStatus>,
@@ -92,9 +91,8 @@ pub struct System {
pub netapp: Arc<NetApp>,
fullmesh: Arc<FullMeshPeeringStrategy>,
- pub rpc: RpcHelper,
- system_endpoint: Arc<Endpoint<SystemRpc, System>>,
+ pub(crate) system_endpoint: Arc<Endpoint<SystemRpc, System>>,
rpc_listen_addr: SocketAddr,
#[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))]
@@ -106,15 +104,13 @@ pub struct System {
#[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: Option<KubernetesDiscoveryConfig>,
+ pub layout_manager: LayoutManager,
+
metrics: SystemMetrics,
replication_mode: ReplicationMode,
replication_factor: usize,
- /// The layout
- pub layout_watch: watch::Receiver<Arc<LayoutHistory>>,
- update_layout: Mutex<watch::Sender<Arc<LayoutHistory>>>,
-
/// Path to metadata directory
pub metadata_dir: PathBuf,
/// Path to data directory
@@ -128,8 +124,11 @@ pub struct NodeStatus {
/// Replication factor configured on the node
pub replication_factor: usize,
+
/// Cluster layout version
pub cluster_layout_version: u64,
+ /// Hash of cluster layout update trackers
+ // (TODO) pub cluster_layout_trackers_hash: Hash,
/// Hash of cluster layout staging data
pub cluster_layout_staging_hash: Hash,
@@ -247,8 +246,7 @@ impl System {
replication_mode: ReplicationMode,
config: &Config,
) -> Result<Arc<Self>, Error> {
- let replication_factor = replication_mode.replication_factor();
-
+ // ---- setup netapp RPC protocol ----
let node_key =
gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID");
info!(
@@ -256,81 +254,40 @@ impl System {
hex::encode(&node_key.public_key()[..8])
);
- let persist_cluster_layout: Persister<LayoutHistory> =
- Persister::new(&config.metadata_dir, "cluster_layout");
- let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list");
-
- let cluster_layout = match persist_cluster_layout.load() {
- Ok(x) => {
- if x.current().replication_factor != replication_factor {
- return Err(Error::Message(format!(
- "Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.",
- x.current().replication_factor,
- replication_factor
- )));
- }
- x
- }
- Err(e) => {
- info!(
- "No valid previous cluster layout stored ({}), starting fresh.",
- e
- );
- LayoutHistory::new(replication_factor)
- }
- };
-
- let metrics = SystemMetrics::new(replication_factor);
-
- let mut local_status = NodeStatus::initial(replication_factor, &cluster_layout);
- local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics);
+ let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key);
+ let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
- let (update_layout, layout_watch) = watch::channel(Arc::new(cluster_layout));
-
- let rpc_public_addr = match &config.rpc_public_addr {
- Some(a_str) => {
- use std::net::ToSocketAddrs;
- match a_str.to_socket_addrs() {
- Err(e) => {
- error!(
- "Cannot resolve rpc_public_addr {} from config file: {}.",
- a_str, e
- );
- None
- }
- Ok(a) => {
- let a = a.collect::<Vec<_>>();
- if a.is_empty() {
- error!("rpc_public_addr {} resolve to no known IP address", a_str);
- }
- if a.len() > 1 {
- warn!("Multiple possible resolutions for rpc_public_addr: {:?}. Taking the first one.", a);
- }
- a.into_iter().next()
- }
- }
- }
- None => {
- let addr =
- get_default_ip().map(|ip| SocketAddr::new(ip, config.rpc_bind_addr.port()));
- if let Some(a) = addr {
- warn!("Using autodetected rpc_public_addr: {}. Consider specifying it explicitly in configuration file if possible.", a);
- }
- addr
- }
- };
+ // ---- setup netapp public listener and full mesh peering strategy ----
+ let rpc_public_addr = get_rpc_public_addr(config);
if rpc_public_addr.is_none() {
warn!("This Garage node does not know its publicly reachable RPC address, this might hamper intra-cluster communication.");
}
- let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key);
let fullmesh = FullMeshPeeringStrategy::new(netapp.clone(), vec![], rpc_public_addr);
if let Some(ping_timeout) = config.rpc_ping_timeout_msec {
fullmesh.set_ping_timeout_millis(ping_timeout);
}
- let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
+ let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list");
+ // ---- setup cluster layout and layout manager ----
+ let replication_factor = replication_mode.replication_factor();
+
+ let layout_manager = LayoutManager::new(
+ config,
+ netapp.id,
+ system_endpoint.clone(),
+ fullmesh.clone(),
+ replication_factor,
+ )?;
+
+ // ---- set up metrics and status exchange ----
+ let metrics = SystemMetrics::new(replication_factor);
+
+ let mut local_status = NodeStatus::initial(replication_factor, &layout_manager.history());
+ local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics);
+
+ // ---- if enabled, set up additionnal peer discovery methods ----
#[cfg(feature = "consul-discovery")]
let consul_discovery = match &config.consul_discovery {
Some(cfg) => Some(
@@ -349,20 +306,14 @@ impl System {
warn!("Kubernetes discovery is not enabled in this build.");
}
+ // ---- done ----
let sys = Arc::new(System {
id: netapp.id.into(),
- persist_cluster_layout,
persist_peer_list,
local_status: ArcSwap::new(Arc::new(local_status)),
node_status: RwLock::new(HashMap::new()),
netapp: netapp.clone(),
fullmesh: fullmesh.clone(),
- rpc: RpcHelper::new(
- netapp.id.into(),
- fullmesh,
- layout_watch.clone(),
- config.rpc_timeout_msec.map(Duration::from_millis),
- ),
system_endpoint,
replication_mode,
replication_factor,
@@ -374,10 +325,9 @@ impl System {
consul_discovery,
#[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: config.kubernetes_discovery.clone(),
+ layout_manager,
metrics,
- layout_watch,
- update_layout: Mutex::new(update_layout),
metadata_dir: config.metadata_dir.clone(),
data_dir: config.data_dir.clone(),
});
@@ -397,6 +347,20 @@ impl System {
);
}
+ // ---- Public utilities / accessors ----
+
+ pub fn cluster_layout(&self) -> watch::Ref<Arc<LayoutHistory>> {
+ self.layout_manager.history()
+ }
+
+ pub fn layout_watch(&self) -> watch::Receiver<Arc<LayoutHistory>> {
+ self.layout_manager.layout_watch.clone()
+ }
+
+ pub fn rpc_helper(&self) -> &RpcHelper {
+ &self.layout_manager.rpc_helper
+ }
+
// ---- Administrative operations (directly available and
// also available through RPC) ----
@@ -423,18 +387,6 @@ impl System {
known_nodes
}
- pub fn cluster_layout(&self) -> watch::Ref<Arc<LayoutHistory>> {
- self.layout_watch.borrow()
- }
-
- pub async fn update_cluster_layout(
- self: &Arc<Self>,
- layout: &LayoutHistory,
- ) -> Result<(), Error> {
- self.handle_advertise_cluster_layout(layout).await?;
- Ok(())
- }
-
pub async fn connect(&self, node: &str) -> Result<(), Error> {
let (pubkey, addrs) = parse_and_resolve_peer_addr_async(node)
.await
@@ -464,7 +416,7 @@ impl System {
}
pub fn health(&self) -> ClusterHealth {
- let layout: Arc<_> = self.layout_watch.borrow().clone();
+ let layout: Arc<_> = self.cluster_layout().clone();
let quorum = self.replication_mode.write_quorum();
let replication_factor = self.replication_factor;
@@ -581,20 +533,10 @@ impl System {
}
}
- /// Save network configuration to disc
- async fn save_cluster_layout(&self) -> Result<(), Error> {
- let layout: Arc<LayoutHistory> = self.layout_watch.borrow().clone();
- self.persist_cluster_layout
- .save_async(&layout)
- .await
- .expect("Cannot save current cluster layout");
- Ok(())
- }
-
fn update_local_status(&self) {
let mut new_si: NodeStatus = self.local_status.load().as_ref().clone();
- let layout = self.layout_watch.borrow();
+ let layout = self.cluster_layout();
new_si.cluster_layout_version = layout.current().version;
new_si.cluster_layout_staging_hash = layout.staging_hash;
@@ -610,11 +552,6 @@ impl System {
Ok(SystemRpc::Ok)
}
- fn handle_pull_cluster_layout(&self) -> SystemRpc {
- let layout = self.layout_watch.borrow().as_ref().clone();
- SystemRpc::AdvertiseClusterLayout(layout)
- }
-
fn handle_get_known_nodes(&self) -> SystemRpc {
let known_nodes = self.get_known_nodes();
SystemRpc::ReturnKnownNodes(known_nodes)
@@ -637,7 +574,10 @@ impl System {
if info.cluster_layout_version > local_info.cluster_layout_version
|| info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash
{
- tokio::spawn(self.clone().pull_cluster_layout(from));
+ tokio::spawn({
+ let system = self.clone();
+ async move { system.layout_manager.pull_cluster_layout(from).await }
+ });
}
self.node_status
@@ -648,57 +588,6 @@ impl System {
Ok(SystemRpc::Ok)
}
- async fn handle_advertise_cluster_layout(
- self: &Arc<Self>,
- adv: &LayoutHistory,
- ) -> Result<SystemRpc, Error> {
- if adv.current().replication_factor != self.replication_factor {
- let msg = format!(
- "Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.",
- adv.current().replication_factor,
- self.replication_factor
- );
- error!("{}", msg);
- return Err(Error::Message(msg));
- }
-
- let update_layout = self.update_layout.lock().await;
- // TODO: don't clone each time an AdvertiseClusterLayout is received
- let mut layout: LayoutHistory = self.layout_watch.borrow().as_ref().clone();
-
- let prev_layout_check = layout.check().is_ok();
- if layout.merge(adv) {
- if prev_layout_check && layout.check().is_err() {
- error!("New cluster layout is invalid, discarding.");
- return Err(Error::Message(
- "New cluster layout is invalid, discarding.".into(),
- ));
- }
-
- update_layout.send(Arc::new(layout.clone()))?;
- drop(update_layout);
-
- let self2 = self.clone();
- tokio::spawn(async move {
- if let Err(e) = self2
- .rpc
- .broadcast(
- &self2.system_endpoint,
- SystemRpc::AdvertiseClusterLayout(layout),
- RequestStrategy::with_priority(PRIO_HIGH),
- )
- .await
- {
- warn!("Error while broadcasting new cluster layout: {}", e);
- }
- });
-
- self.save_cluster_layout().await?;
- }
-
- Ok(SystemRpc::Ok)
- }
-
async fn status_exchange_loop(&self, mut stop_signal: watch::Receiver<bool>) {
while !*stop_signal.borrow() {
let restart_at = Instant::now() + STATUS_EXCHANGE_INTERVAL;
@@ -706,7 +595,7 @@ impl System {
self.update_local_status();
let local_status: NodeStatus = self.local_status.load().as_ref().clone();
let _ = self
- .rpc
+ .rpc_helper()
.broadcast(
&self.system_endpoint,
SystemRpc::AdvertiseStatus(local_status),
@@ -724,9 +613,9 @@ impl System {
async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
while !*stop_signal.borrow() {
- let not_configured = self.layout_watch.borrow().check().is_err();
+ let not_configured = self.cluster_layout().check().is_err();
let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor;
- let expected_n_nodes = self.layout_watch.borrow().current().num_nodes();
+ let expected_n_nodes = self.cluster_layout().current().num_nodes();
let bad_peers = self
.fullmesh
.get_peer_list()
@@ -831,34 +720,26 @@ impl System {
.save_async(&PeerList(peer_list))
.await
}
-
- async fn pull_cluster_layout(self: Arc<Self>, peer: Uuid) {
- let resp = self
- .rpc
- .call(
- &self.system_endpoint,
- peer,
- SystemRpc::PullClusterLayout,
- RequestStrategy::with_priority(PRIO_HIGH),
- )
- .await;
- if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp {
- let _: Result<_, _> = self.handle_advertise_cluster_layout(&layout).await;
- }
- }
}
#[async_trait]
impl EndpointHandler<SystemRpc> for System {
async fn handle(self: &Arc<Self>, msg: &SystemRpc, from: NodeID) -> Result<SystemRpc, Error> {
match msg {
+ // ---- system functions -> System ----
SystemRpc::Connect(node) => self.handle_connect(node).await,
- SystemRpc::PullClusterLayout => Ok(self.handle_pull_cluster_layout()),
SystemRpc::AdvertiseStatus(adv) => self.handle_advertise_status(from.into(), adv).await,
+ SystemRpc::GetKnownNodes => Ok(self.handle_get_known_nodes()),
+
+ // ---- layout functions -> LayoutManager ----
+ SystemRpc::PullClusterLayout => Ok(self.layout_manager.handle_pull_cluster_layout()),
SystemRpc::AdvertiseClusterLayout(adv) => {
- self.clone().handle_advertise_cluster_layout(adv).await
+ self.layout_manager
+ .handle_advertise_cluster_layout(adv)
+ .await
}
- SystemRpc::GetKnownNodes => Ok(self.handle_get_known_nodes()),
+
+ // ---- other -> Error ----
m => Err(Error::unexpected_rpc_message(m)),
}
}
@@ -962,6 +843,40 @@ fn get_default_ip() -> Option<IpAddr> {
.map(|a| a.ip())
}
+fn get_rpc_public_addr(config: &Config) -> Option<SocketAddr> {
+ match &config.rpc_public_addr {
+ Some(a_str) => {
+ use std::net::ToSocketAddrs;
+ match a_str.to_socket_addrs() {
+ Err(e) => {
+ error!(
+ "Cannot resolve rpc_public_addr {} from config file: {}.",
+ a_str, e
+ );
+ None
+ }
+ Ok(a) => {
+ let a = a.collect::<Vec<_>>();
+ if a.is_empty() {
+ error!("rpc_public_addr {} resolve to no known IP address", a_str);
+ }
+ if a.len() > 1 {
+ warn!("Multiple possible resolutions for rpc_public_addr: {:?}. Taking the first one.", a);
+ }
+ a.into_iter().next()
+ }
+ }
+ }
+ None => {
+ let addr = get_default_ip().map(|ip| SocketAddr::new(ip, config.rpc_bind_addr.port()));
+ if let Some(a) = addr {
+ warn!("Using autodetected rpc_public_addr: {}. Consider specifying it explicitly in configuration file if possible.", a);
+ }
+ addr
+ }
+ }
+}
+
async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> {
let mut ret = vec![];
diff --git a/src/table/gc.rs b/src/table/gc.rs
index 5b9124a7..2135a358 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -227,7 +227,7 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
// GC'ing is not a critical function of the system, so it's not a big
// deal if we can't do it right now.
self.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
&nodes[..],
@@ -248,7 +248,7 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
// it means that the garbage collection wasn't completed and has
// to be retried later.
self.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
&nodes[..],
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 620d83b9..2da1bfe7 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -91,7 +91,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
bg.spawn_worker(SyncWorker {
syncer: self.clone(),
- layout_watch: self.system.layout_watch.clone(),
+ layout_watch: self.system.layout_watch(),
layout: self.system.cluster_layout().clone(),
add_full_sync_rx,
todo: vec![],
@@ -244,7 +244,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
}
self.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
nodes,
@@ -305,7 +305,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
// If so, do nothing.
let root_resp = self
.system
- .rpc
+ .rpc_helper()
.call(
&self.endpoint,
who,
@@ -361,7 +361,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
// and compare it with local node
let remote_node = match self
.system
- .rpc
+ .rpc_helper()
.call(
&self.endpoint,
who,
@@ -437,7 +437,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
let rpc_resp = self
.system
- .rpc
+ .rpc_helper()
.call(
&self.endpoint,
who,
diff --git a/src/table/table.rs b/src/table/table.rs
index 7ad79677..3e3fd138 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -123,7 +123,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
let rpc = TableRpc::<F>::Update(vec![e_enc]);
self.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
&who[..],
@@ -181,7 +181,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
let resp = self
.system
- .rpc
+ .rpc_helper()
.call(
&self.endpoint,
node,
@@ -236,7 +236,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
let rpc = TableRpc::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
let resps = self
.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
&who[..],
@@ -332,7 +332,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
let resps = self
.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
&who[..],
@@ -411,7 +411,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> {
let what_enc = Arc::new(ByteBuf::from(what.encode()?));
self.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
who,