aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/system.rs
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-11-08 17:49:06 +0100
committerAlex Auvolat <alex@adnab.me>2023-11-08 17:49:06 +0100
commitfe9af1dcaae31a117528a9cfa10c422c9a850201 (patch)
tree6e43dbb97d37d48f6af5398b4d067747e652108c /src/rpc/system.rs
parent4a9c94514f49aa4e9880a8e0f5cf5a52d11ae993 (diff)
downloadgarage-fe9af1dcaae31a117528a9cfa10c422c9a850201.tar.gz
garage-fe9af1dcaae31a117528a9cfa10c422c9a850201.zip
WIP: garage_rpc: store layout version history
Diffstat (limited to 'src/rpc/system.rs')
-rw-r--r--src/rpc/system.rs44
1 files changed, 23 insertions, 21 deletions
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 93144e39..86d724f1 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -64,7 +64,7 @@ pub enum SystemRpc {
/// Exchanged with every node on a regular basis.
AdvertiseStatus(NodeStatus),
/// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout
- AdvertiseClusterLayout(ClusterLayout),
+ AdvertiseClusterLayout(LayoutHistory),
/// Get known nodes states
GetKnownNodes,
/// Return known nodes
@@ -84,7 +84,7 @@ pub struct System {
/// The id of this node
pub id: Uuid,
- persist_cluster_layout: Persister<ClusterLayout>,
+ persist_cluster_layout: Persister<LayoutHistory>,
persist_peer_list: Persister<PeerList>,
local_status: ArcSwap<NodeStatus>,
@@ -112,8 +112,8 @@ pub struct System {
replication_factor: usize,
/// The layout
- pub layout_watch: watch::Receiver<Arc<ClusterLayout>>,
- update_layout: Mutex<watch::Sender<Arc<ClusterLayout>>>,
+ pub layout_watch: watch::Receiver<Arc<LayoutHistory>>,
+ update_layout: Mutex<watch::Sender<Arc<LayoutHistory>>>,
/// Path to metadata directory
pub metadata_dir: PathBuf,
@@ -256,16 +256,16 @@ impl System {
hex::encode(&node_key.public_key()[..8])
);
- let persist_cluster_layout: Persister<ClusterLayout> =
+ let persist_cluster_layout: Persister<LayoutHistory> =
Persister::new(&config.metadata_dir, "cluster_layout");
let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list");
let cluster_layout = match persist_cluster_layout.load() {
Ok(x) => {
- if x.replication_factor != replication_factor {
+ if x.current().replication_factor != replication_factor {
return Err(Error::Message(format!(
"Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.",
- x.replication_factor,
+ x.current().replication_factor,
replication_factor
)));
}
@@ -276,7 +276,7 @@ impl System {
"No valid previous cluster layout stored ({}), starting fresh.",
e
);
- ClusterLayout::new(replication_factor)
+ LayoutHistory::new(replication_factor)
}
};
@@ -423,13 +423,13 @@ impl System {
known_nodes
}
- pub fn cluster_layout(&self) -> watch::Ref<Arc<ClusterLayout>> {
+ pub fn cluster_layout(&self) -> watch::Ref<Arc<LayoutHistory>> {
self.layout_watch.borrow()
}
pub async fn update_cluster_layout(
self: &Arc<Self>,
- layout: &ClusterLayout,
+ layout: &LayoutHistory,
) -> Result<(), Error> {
self.handle_advertise_cluster_layout(layout).await?;
Ok(())
@@ -475,7 +475,9 @@ impl System {
.collect::<HashMap<Uuid, _>>();
let connected_nodes = nodes.iter().filter(|(_, n)| n.is_up).count();
+ // TODO: not only layout.current()
let storage_nodes = layout
+ .current()
.roles
.items()
.iter()
@@ -486,11 +488,11 @@ impl System {
.filter(|(x, _, _)| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
.count();
- let partitions = layout.partitions();
+ let partitions = layout.current().partitions();
let partitions_n_up = partitions
.iter()
.map(|(_, h)| {
- let pn = layout.nodes_of(h, layout.replication_factor);
+ let pn = layout.current().nodes_of(h, replication_factor);
pn.iter()
.filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
.count()
@@ -581,7 +583,7 @@ impl System {
/// Save network configuration to disc
async fn save_cluster_layout(&self) -> Result<(), Error> {
- let layout: Arc<ClusterLayout> = self.layout_watch.borrow().clone();
+ let layout: Arc<LayoutHistory> = self.layout_watch.borrow().clone();
self.persist_cluster_layout
.save_async(&layout)
.await
@@ -593,7 +595,7 @@ impl System {
let mut new_si: NodeStatus = self.local_status.load().as_ref().clone();
let layout = self.layout_watch.borrow();
- new_si.cluster_layout_version = layout.version;
+ new_si.cluster_layout_version = layout.current().version;
new_si.cluster_layout_staging_hash = layout.staging_hash;
new_si.update_disk_usage(&self.metadata_dir, &self.data_dir, &self.metrics);
@@ -648,12 +650,12 @@ impl System {
async fn handle_advertise_cluster_layout(
self: &Arc<Self>,
- adv: &ClusterLayout,
+ adv: &LayoutHistory,
) -> Result<SystemRpc, Error> {
- if adv.replication_factor != self.replication_factor {
+ if adv.current().replication_factor != self.replication_factor {
let msg = format!(
"Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.",
- adv.replication_factor,
+ adv.current().replication_factor,
self.replication_factor
);
error!("{}", msg);
@@ -662,7 +664,7 @@ impl System {
let update_layout = self.update_layout.lock().await;
// TODO: don't clone each time an AdvertiseClusterLayout is received
- let mut layout: ClusterLayout = self.layout_watch.borrow().as_ref().clone();
+ let mut layout: LayoutHistory = self.layout_watch.borrow().as_ref().clone();
let prev_layout_check = layout.check().is_ok();
if layout.merge(adv) {
@@ -724,7 +726,7 @@ impl System {
while !*stop_signal.borrow() {
let not_configured = self.layout_watch.borrow().check().is_err();
let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor;
- let expected_n_nodes = self.layout_watch.borrow().num_nodes();
+ let expected_n_nodes = self.layout_watch.borrow().current().num_nodes();
let bad_peers = self
.fullmesh
.get_peer_list()
@@ -863,13 +865,13 @@ impl EndpointHandler<SystemRpc> for System {
}
impl NodeStatus {
- fn initial(replication_factor: usize, layout: &ClusterLayout) -> Self {
+ fn initial(replication_factor: usize, layout: &LayoutHistory) -> Self {
NodeStatus {
hostname: gethostname::gethostname()
.into_string()
.unwrap_or_else(|_| "<invalid utf-8>".to_string()),
replication_factor,
- cluster_layout_version: layout.version,
+ cluster_layout_version: layout.current().version,
cluster_layout_staging_hash: layout.staging_hash,
meta_disk_avail: None,
data_disk_avail: None,