aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/garage/cli/layout.rs6
-rw-r--r--src/rpc/layout/manager.rs116
-rw-r--r--src/rpc/layout/schema.rs6
-rw-r--r--src/rpc/system.rs40
4 files changed, 97 insertions, 71 deletions
diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs
index 269d92f4..bffc81d3 100644
--- a/src/garage/cli/layout.rs
+++ b/src/garage/cli/layout.rs
@@ -1,3 +1,5 @@
+use std::sync::Arc;
+
use bytesize::ByteSize;
use format_table::format_table;
@@ -321,7 +323,7 @@ pub async fn fetch_layout(
.call(&rpc_host, SystemRpc::PullClusterLayout, PRIO_NORMAL)
.await??
{
- SystemRpc::AdvertiseClusterLayout(t) => Ok(t),
+ SystemRpc::AdvertiseClusterLayout(t) => Ok(Arc::try_unwrap(t).unwrap()),
resp => Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
}
}
@@ -334,7 +336,7 @@ pub async fn send_layout(
rpc_cli
.call(
&rpc_host,
- SystemRpc::AdvertiseClusterLayout(layout),
+ SystemRpc::AdvertiseClusterLayout(Arc::new(layout)),
PRIO_NORMAL,
)
.await??;
diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs
index a8a77139..351e0959 100644
--- a/src/rpc/layout/manager.rs
+++ b/src/rpc/layout/manager.rs
@@ -1,6 +1,8 @@
use std::sync::Arc;
use std::time::Duration;
+use serde::{Deserialize, Serialize};
+
use tokio::sync::watch;
use tokio::sync::Mutex;
@@ -28,6 +30,16 @@ pub struct LayoutManager {
system_endpoint: Arc<Endpoint<SystemRpc, System>>,
}
+#[derive(Debug, Clone, Serialize, Deserialize, Default)]
+pub struct LayoutStatus {
+ /// Cluster layout version
+ pub cluster_layout_version: u64,
+ /// Hash of cluster layout update trackers
+ // (TODO) pub cluster_layout_trackers_hash: Hash,
+ /// Hash of cluster layout staging data
+ pub cluster_layout_staging_hash: Hash,
+}
+
impl LayoutManager {
pub fn new(
config: &Config,
@@ -35,7 +47,7 @@ impl LayoutManager {
system_endpoint: Arc<Endpoint<SystemRpc, System>>,
fullmesh: Arc<FullMeshPeeringStrategy>,
replication_factor: usize,
- ) -> Result<Self, Error> {
+ ) -> Result<Arc<Self>, Error> {
let persist_cluster_layout: Persister<LayoutHistory> =
Persister::new(&config.metadata_dir, "cluster_layout");
@@ -68,28 +80,39 @@ impl LayoutManager {
config.rpc_timeout_msec.map(Duration::from_millis),
);
- Ok(Self {
+ Ok(Arc::new(Self {
replication_factor,
persist_cluster_layout,
layout_watch,
update_layout: Mutex::new(update_layout),
system_endpoint,
rpc_helper,
- })
+ }))
}
// ---- PUBLIC INTERFACE ----
- pub async fn update_cluster_layout(&self, layout: &LayoutHistory) -> Result<(), Error> {
+ pub fn status(&self) -> LayoutStatus {
+ let layout = self.layout();
+ LayoutStatus {
+ cluster_layout_version: layout.current().version,
+ cluster_layout_staging_hash: layout.staging_hash,
+ }
+ }
+
+ pub async fn update_cluster_layout(
+ self: &Arc<Self>,
+ layout: &LayoutHistory,
+ ) -> Result<(), Error> {
self.handle_advertise_cluster_layout(layout).await?;
Ok(())
}
- pub fn history(&self) -> watch::Ref<Arc<LayoutHistory>> {
+ pub fn layout(&self) -> watch::Ref<Arc<LayoutHistory>> {
self.layout_watch.borrow()
}
- pub(crate) async fn pull_cluster_layout(&self, peer: Uuid) {
+ pub(crate) async fn pull_cluster_layout(self: &Arc<Self>, peer: Uuid) {
let resp = self
.rpc_helper
.call(
@@ -118,13 +141,25 @@ impl LayoutManager {
// ---- RPC HANDLERS ----
+ pub(crate) fn handle_advertise_status(self: &Arc<Self>, from: Uuid, status: &LayoutStatus) {
+ let local_status = self.status();
+ if status.cluster_layout_version > local_status.cluster_layout_version
+ || status.cluster_layout_staging_hash != local_status.cluster_layout_staging_hash
+ {
+ tokio::spawn({
+ let this = self.clone();
+ async move { this.pull_cluster_layout(from).await }
+ });
+ }
+ }
+
pub(crate) fn handle_pull_cluster_layout(&self) -> SystemRpc {
- let layout = self.layout_watch.borrow().as_ref().clone();
+ let layout = self.layout_watch.borrow().clone();
SystemRpc::AdvertiseClusterLayout(layout)
}
pub(crate) async fn handle_advertise_cluster_layout(
- &self,
+ self: &Arc<Self>,
adv: &LayoutHistory,
) -> Result<SystemRpc, Error> {
if adv.current().replication_factor != self.replication_factor {
@@ -137,39 +172,42 @@ impl LayoutManager {
return Err(Error::Message(msg));
}
- let update_layout = self.update_layout.lock().await;
- // TODO: don't clone each time an AdvertiseClusterLayout is received
- let mut layout: LayoutHistory = self.layout_watch.borrow().as_ref().clone();
-
- let prev_layout_check = layout.check().is_ok();
- if layout.merge(adv) {
- if prev_layout_check && layout.check().is_err() {
- error!("New cluster layout is invalid, discarding.");
- return Err(Error::Message(
- "New cluster layout is invalid, discarding.".into(),
- ));
- }
-
- update_layout.send(Arc::new(layout.clone()))?;
- drop(update_layout);
-
- /* TODO
- tokio::spawn(async move {
- if let Err(e) = system
- .rpc_helper()
- .broadcast(
- &system.system_endpoint,
- SystemRpc::AdvertiseClusterLayout(layout),
- RequestStrategy::with_priority(PRIO_HIGH),
- )
- .await
- {
- warn!("Error while broadcasting new cluster layout: {}", e);
+ if *adv != **self.layout_watch.borrow() {
+ let update_layout = self.update_layout.lock().await;
+ let mut layout: LayoutHistory = self.layout_watch.borrow().as_ref().clone();
+
+ let prev_layout_check = layout.check().is_ok();
+ if layout.merge(adv) {
+ if prev_layout_check && layout.check().is_err() {
+ error!("New cluster layout is invalid, discarding.");
+ return Err(Error::Message(
+ "New cluster layout is invalid, discarding.".into(),
+ ));
}
- });
- */
- self.save_cluster_layout().await?;
+ let layout = Arc::new(layout);
+ update_layout.send(layout.clone())?;
+ drop(update_layout); // release mutex
+
+ tokio::spawn({
+ let this = self.clone();
+ async move {
+ if let Err(e) = this
+ .rpc_helper
+ .broadcast(
+ &this.system_endpoint,
+ SystemRpc::AdvertiseClusterLayout(layout),
+ RequestStrategy::with_priority(PRIO_HIGH),
+ )
+ .await
+ {
+ warn!("Error while broadcasting new cluster layout: {}", e);
+ }
+ }
+ });
+
+ self.save_cluster_layout().await?;
+ }
}
Ok(SystemRpc::Ok)
diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs
index c5b9b1d3..d587a6cb 100644
--- a/src/rpc/layout/schema.rs
+++ b/src/rpc/layout/schema.rs
@@ -226,7 +226,7 @@ mod v010 {
}
/// The history of cluster layouts
- #[derive(Clone, Debug, Serialize, Deserialize)]
+ #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct LayoutHistory {
/// The versions currently in use in the cluster
pub versions: Vec<LayoutVersion>,
@@ -241,7 +241,7 @@ mod v010 {
}
/// The tracker of acknowlegments and data syncs around the cluster
- #[derive(Clone, Debug, Serialize, Deserialize, Default)]
+ #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)]
pub struct UpdateTrackers {
/// The highest layout version number each node has ack'ed
pub ack_map: UpdateTracker,
@@ -253,7 +253,7 @@ mod v010 {
}
/// The history of cluster layouts
- #[derive(Clone, Debug, Serialize, Deserialize, Default)]
+ #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)]
pub struct UpdateTracker(pub HashMap<Uuid, u64>);
impl garage_util::migrate::Migrate for LayoutHistory {
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index a8e88425..88c4d443 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -33,7 +33,7 @@ use garage_util::time::*;
use crate::consul::ConsulDiscovery;
#[cfg(feature = "kubernetes-discovery")]
use crate::kubernetes::*;
-use crate::layout::manager::LayoutManager;
+use crate::layout::manager::{LayoutManager, LayoutStatus};
use crate::layout::*;
use crate::replication_mode::*;
use crate::rpc_helper::*;
@@ -68,7 +68,7 @@ pub enum SystemRpc {
/// Ask other node its cluster layout. Answered with AdvertiseClusterLayout
PullClusterLayout,
/// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout
- AdvertiseClusterLayout(LayoutHistory),
+ AdvertiseClusterLayout(Arc<LayoutHistory>),
}
impl Rpc for SystemRpc {
@@ -104,7 +104,7 @@ pub struct System {
#[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: Option<KubernetesDiscoveryConfig>,
- pub layout_manager: LayoutManager,
+ pub layout_manager: Arc<LayoutManager>,
metrics: SystemMetrics,
@@ -125,12 +125,8 @@ pub struct NodeStatus {
/// Replication factor configured on the node
pub replication_factor: usize,
- /// Cluster layout version
- pub cluster_layout_version: u64,
- /// Hash of cluster layout update trackers
- // (TODO) pub cluster_layout_trackers_hash: Hash,
- /// Hash of cluster layout staging data
- pub cluster_layout_staging_hash: Hash,
+ /// Layout status
+ pub layout_status: LayoutStatus,
/// Disk usage on partition containing metadata directory (tuple: `(avail, total)`)
#[serde(default)]
@@ -284,7 +280,7 @@ impl System {
// ---- set up metrics and status exchange ----
let metrics = SystemMetrics::new(replication_factor);
- let mut local_status = NodeStatus::initial(replication_factor, &layout_manager.history());
+ let mut local_status = NodeStatus::initial(replication_factor, &layout_manager);
local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics);
// ---- if enabled, set up additionnal peer discovery methods ----
@@ -350,7 +346,7 @@ impl System {
// ---- Public utilities / accessors ----
pub fn cluster_layout(&self) -> watch::Ref<Arc<LayoutHistory>> {
- self.layout_manager.history()
+ self.layout_manager.layout()
}
pub fn layout_watch(&self) -> watch::Receiver<Arc<LayoutHistory>> {
@@ -536,9 +532,7 @@ impl System {
fn update_local_status(&self) {
let mut new_si: NodeStatus = self.local_status.load().as_ref().clone();
- let layout = self.cluster_layout();
- new_si.cluster_layout_version = layout.current().version;
- new_si.cluster_layout_staging_hash = layout.staging_hash;
+ new_si.layout_status = self.layout_manager.status();
new_si.update_disk_usage(&self.metadata_dir, &self.data_dir, &self.metrics);
@@ -571,14 +565,8 @@ impl System {
std::process::exit(1);
}
- if info.cluster_layout_version > local_info.cluster_layout_version
- || info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash
- {
- tokio::spawn({
- let system = self.clone();
- async move { system.layout_manager.pull_cluster_layout(from).await }
- });
- }
+ self.layout_manager
+ .handle_advertise_status(from, &info.layout_status);
self.node_status
.write()
@@ -746,14 +734,13 @@ impl EndpointHandler<SystemRpc> for System {
}
impl NodeStatus {
- fn initial(replication_factor: usize, layout: &LayoutHistory) -> Self {
+ fn initial(replication_factor: usize, layout_manager: &LayoutManager) -> Self {
NodeStatus {
hostname: gethostname::gethostname()
.into_string()
.unwrap_or_else(|_| "<invalid utf-8>".to_string()),
replication_factor,
- cluster_layout_version: layout.current().version,
- cluster_layout_staging_hash: layout.staging_hash,
+ layout_status: layout_manager.status(),
meta_disk_avail: None,
data_disk_avail: None,
}
@@ -763,8 +750,7 @@ impl NodeStatus {
NodeStatus {
hostname: "?".to_string(),
replication_factor: 0,
- cluster_layout_version: 0,
- cluster_layout_staging_hash: Hash::from([0u8; 32]),
+ layout_status: Default::default(),
meta_disk_avail: None,
data_disk_avail: None,
}