diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/api/admin/cluster.rs | 9 | ||||
-rw-r--r-- | src/api/k2v/index.rs | 9 | ||||
-rw-r--r-- | src/garage/admin/mod.rs | 14 | ||||
-rw-r--r-- | src/garage/cli/cmd.rs | 17 | ||||
-rw-r--r-- | src/garage/cli/layout.rs | 38 | ||||
-rw-r--r-- | src/model/helper/bucket.rs | 9 | ||||
-rw-r--r-- | src/model/index_counter.rs | 6 | ||||
-rw-r--r-- | src/rpc/layout/history.rs | 17 | ||||
-rw-r--r-- | src/rpc/layout/schema.rs | 5 | ||||
-rw-r--r-- | src/table/replication/fullcopy.rs | 5 | ||||
-rw-r--r-- | src/table/replication/sharded.rs | 16 | ||||
-rw-r--r-- | src/table/sync.rs | 4 |
12 files changed, 80 insertions, 69 deletions
diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs index 01ff9885..6dd2e8da 100644 --- a/src/api/admin/cluster.rs +++ b/src/api/admin/cluster.rs @@ -89,8 +89,9 @@ pub async fn handle_get_cluster_layout(garage: &Arc<Garage>) -> Result<Response< Ok(json_ok_response(&res)?) } -fn format_cluster_layout(layout: &layout::ClusterLayout) -> GetClusterLayoutResponse { +fn format_cluster_layout(layout: &layout::LayoutHistory) -> GetClusterLayoutResponse { let roles = layout + .current() .roles .items() .iter() @@ -107,7 +108,7 @@ fn format_cluster_layout(layout: &layout::ClusterLayout) -> GetClusterLayoutResp .staging_roles .items() .iter() - .filter(|(k, _, v)| layout.roles.get(k) != Some(v)) + .filter(|(k, _, v)| layout.current().roles.get(k) != Some(v)) .map(|(k, _, v)| match &v.0 { None => NodeRoleChange { id: hex::encode(k), @@ -125,7 +126,7 @@ fn format_cluster_layout(layout: &layout::ClusterLayout) -> GetClusterLayoutResp .collect::<Vec<_>>(); GetClusterLayoutResponse { - version: layout.version, + version: layout.current().version, roles, staged_role_changes, } @@ -209,7 +210,7 @@ pub async fn handle_update_cluster_layout( let mut layout = garage.system.cluster_layout().as_ref().clone(); - let mut roles = layout.roles.clone(); + let mut roles = layout.current().roles.clone(); roles.merge(&layout.staging_roles); for change in updates { diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs index 3fc39de6..a9bc3826 100644 --- a/src/api/k2v/index.rs +++ b/src/api/k2v/index.rs @@ -5,7 +5,7 @@ use serde::Serialize; use garage_util::data::*; -use garage_rpc::layout::ClusterLayout; +use garage_rpc::layout::LayoutHistory; use garage_table::util::*; use garage_model::garage::Garage; @@ -26,7 +26,7 @@ pub async fn handle_read_index( ) -> Result<Response<Body>, Error> { let reverse = reverse.unwrap_or(false); - let layout: Arc<ClusterLayout> = garage.system.cluster_layout().clone(); + let layout: Arc<LayoutHistory> = garage.system.cluster_layout().clone(); let (partition_keys, more, next_start) = read_range( &garage.k2v.counter_table.table, @@ -35,7 +35,10 @@ pub async fn handle_read_index( &start, &end, limit, - Some((DeletedFilter::NotDeleted, layout.node_id_vec.clone())), + Some(( + DeletedFilter::NotDeleted, + layout.current().node_id_vec.clone(), + )), EnumerationOrder::from_reverse(reverse), ) .await?; diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs index c3fa801a..e3ba6d35 100644 --- a/src/garage/admin/mod.rs +++ b/src/garage/admin/mod.rs @@ -127,7 +127,7 @@ impl AdminRpcHandler { let mut failures = vec![]; let layout = self.garage.system.cluster_layout().clone(); - for node in layout.node_ids().iter() { + for node in layout.current().node_ids().iter() { let node = (*node).into(); let resp = self .endpoint @@ -165,7 +165,7 @@ impl AdminRpcHandler { let mut ret = String::new(); let layout = self.garage.system.cluster_layout().clone(); - for node in layout.node_ids().iter() { + for node in layout.current().node_ids().iter() { let mut opt = opt.clone(); opt.all_nodes = false; opt.skip_global = true; @@ -277,8 +277,8 @@ impl AdminRpcHandler { // Gather storage node and free space statistics let layout = &self.garage.system.cluster_layout(); let mut node_partition_count = HashMap::<Uuid, u64>::new(); - for short_id in layout.ring_assignment_data.iter() { - let id = layout.node_id_vec[*short_id as usize]; + for short_id in layout.current().ring_assignment_data.iter() { + let id = layout.current().node_id_vec[*short_id as usize]; *node_partition_count.entry(id).or_default() += 1; } let node_info = self @@ -293,7 +293,7 @@ impl AdminRpcHandler { for (id, parts) in node_partition_count.iter() { let info = node_info.get(id); let status = info.map(|x| &x.status); - let role = layout.roles.get(id).and_then(|x| x.0.as_ref()); + let role = layout.current().roles.get(id).and_then(|x| x.0.as_ref()); let hostname = status.map(|x| x.hostname.as_str()).unwrap_or("?"); let zone = role.map(|x| x.zone.as_str()).unwrap_or("?"); let capacity = role @@ -441,7 +441,7 @@ impl AdminRpcHandler { if all_nodes { let mut ret = vec![]; let layout = self.garage.system.cluster_layout().clone(); - for node in layout.node_ids().iter() { + for node in layout.current().node_ids().iter() { let node = (*node).into(); match self .endpoint @@ -489,7 +489,7 @@ impl AdminRpcHandler { if all_nodes { let mut ret = vec![]; let layout = self.garage.system.cluster_layout().clone(); - for node in layout.node_ids().iter() { + for node in layout.current().node_ids().iter() { let node = (*node).into(); match self .endpoint diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index 48359614..8be43873 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -62,7 +62,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> let mut healthy_nodes = vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tDataAvail".to_string()]; for adv in status.iter().filter(|adv| adv.is_up) { - match layout.roles.get(&adv.id) { + match layout.current().roles.get(&adv.id) { Some(NodeRoleV(Some(cfg))) => { let data_avail = match &adv.status.data_disk_avail { _ if cfg.capacity.is_none() => "N/A".into(), @@ -102,10 +102,15 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> format_table(healthy_nodes); let status_keys = status.iter().map(|adv| adv.id).collect::<HashSet<_>>(); - let failure_case_1 = status - .iter() - .any(|adv| !adv.is_up && matches!(layout.roles.get(&adv.id), Some(NodeRoleV(Some(_))))); + let failure_case_1 = status.iter().any(|adv| { + !adv.is_up + && matches!( + layout.current().roles.get(&adv.id), + Some(NodeRoleV(Some(_))) + ) + }); let failure_case_2 = layout + .current() .roles .items() .iter() @@ -115,7 +120,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> let mut failed_nodes = vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tLast seen".to_string()]; for adv in status.iter().filter(|adv| !adv.is_up) { - if let Some(NodeRoleV(Some(cfg))) = layout.roles.get(&adv.id) { + if let Some(NodeRoleV(Some(cfg))) = layout.current().roles.get(&adv.id) { let tf = timeago::Formatter::new(); failed_nodes.push(format!( "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}", @@ -132,7 +137,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> )); } } - for (id, _, role_v) in layout.roles.items().iter() { + for (id, _, role_v) in layout.current().roles.items().iter() { if let NodeRoleV(Some(cfg)) = role_v { if !status_keys.contains(id) { failed_nodes.push(format!( diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index ce2b11e0..4a617337 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -58,17 +58,18 @@ pub async fn cmd_assign_role( status .iter() .map(|adv| adv.id) - .chain(layout.node_ids().iter().cloned()), + .chain(layout.current().node_ids().iter().cloned()), node_id, ) }) .collect::<Result<Vec<_>, _>>()?; - let mut roles = layout.roles.clone(); + let mut roles = layout.current().roles.clone(); roles.merge(&layout.staging_roles); for replaced in args.replace.iter() { - let replaced_node = find_matching_node(layout.node_ids().iter().cloned(), replaced)?; + let replaced_node = + find_matching_node(layout.current().node_ids().iter().cloned(), replaced)?; match roles.get(&replaced_node) { Some(NodeRoleV(Some(_))) => { layout @@ -149,7 +150,7 @@ pub async fn cmd_remove_role( ) -> Result<(), Error> { let mut layout = fetch_layout(rpc_cli, rpc_host).await?; - let mut roles = layout.roles.clone(); + let mut roles = layout.current().roles.clone(); roles.merge(&layout.staging_roles); let deleted_node = @@ -174,13 +175,16 @@ pub async fn cmd_show_layout( let layout = fetch_layout(rpc_cli, rpc_host).await?; println!("==== CURRENT CLUSTER LAYOUT ===="); - print_cluster_layout(&layout, "No nodes currently have a role in the cluster.\nSee `garage status` to view available nodes."); + print_cluster_layout(layout.current(), "No nodes currently have a role in the cluster.\nSee `garage status` to view available nodes."); println!(); - println!("Current cluster layout version: {}", layout.version); + println!( + "Current cluster layout version: {}", + layout.current().version + ); let has_role_changes = print_staging_role_changes(&layout); if has_role_changes { - let v = layout.version; + let v = layout.current().version; let res_apply = layout.apply_staged_changes(Some(v + 1)); // this will print the stats of what partitions @@ -189,7 +193,7 @@ pub async fn cmd_show_layout( Ok((layout, msg)) => { println!(); println!("==== NEW CLUSTER LAYOUT AFTER APPLYING CHANGES ===="); - print_cluster_layout(&layout, "No nodes have a role in the new layout."); + print_cluster_layout(layout.current(), "No nodes have a role in the new layout."); println!(); for line in msg.iter() { @@ -266,11 +270,11 @@ pub async fn cmd_config_layout( .parse::<ZoneRedundancy>() .ok_or_message("invalid zone redundancy value")?; if let ZoneRedundancy::AtLeast(r_int) = r { - if r_int > layout.replication_factor { + if r_int > layout.current().replication_factor { return Err(Error::Message(format!( "The zone redundancy must be smaller or equal to the \ replication factor ({}).", - layout.replication_factor + layout.current().replication_factor ))); } else if r_int < 1 { return Err(Error::Message( @@ -302,7 +306,7 @@ pub async fn cmd_config_layout( pub async fn fetch_layout( rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID, -) -> Result<ClusterLayout, Error> { +) -> Result<LayoutHistory, Error> { match rpc_cli .call(&rpc_host, SystemRpc::PullClusterLayout, PRIO_NORMAL) .await?? @@ -315,7 +319,7 @@ pub async fn fetch_layout( pub async fn send_layout( rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID, - layout: ClusterLayout, + layout: LayoutHistory, ) -> Result<(), Error> { rpc_cli .call( @@ -327,7 +331,7 @@ pub async fn send_layout( Ok(()) } -pub fn print_cluster_layout(layout: &ClusterLayout, empty_msg: &str) { +pub fn print_cluster_layout(layout: &LayoutVersion, empty_msg: &str) { let mut table = vec!["ID\tTags\tZone\tCapacity\tUsable capacity".to_string()]; for (id, _, role) in layout.roles.items().iter() { let role = match &role.0 { @@ -366,13 +370,13 @@ pub fn print_cluster_layout(layout: &ClusterLayout, empty_msg: &str) { } } -pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool { +pub fn print_staging_role_changes(layout: &LayoutHistory) -> bool { let has_role_changes = layout .staging_roles .items() .iter() - .any(|(k, _, v)| layout.roles.get(k) != Some(v)); - let has_layout_changes = *layout.staging_parameters.get() != layout.parameters; + .any(|(k, _, v)| layout.current().roles.get(k) != Some(v)); + let has_layout_changes = *layout.staging_parameters.get() != layout.current().parameters; if has_role_changes || has_layout_changes { println!(); @@ -380,7 +384,7 @@ pub fn print_staging_role_changes(layout: &ClusterLayout) -> bool { if has_role_changes { let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()]; for (id, _, role) in layout.staging_roles.items().iter() { - if layout.roles.get(id) == Some(role) { + if layout.current().roles.get(id) == Some(role) { continue; } if let Some(role) = &role.0 { diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs index 8cd5b27b..18904c8d 100644 --- a/src/model/helper/bucket.rs +++ b/src/model/helper/bucket.rs @@ -450,10 +450,10 @@ impl<'a> BucketHelper<'a> { #[cfg(feature = "k2v")] { - use garage_rpc::layout::ClusterLayout; + use garage_rpc::layout::LayoutHistory; use std::sync::Arc; - let layout: Arc<ClusterLayout> = self.0.system.cluster_layout().clone(); + let layout: Arc<LayoutHistory> = self.0.system.cluster_layout().clone(); let k2vindexes = self .0 .k2v @@ -462,7 +462,10 @@ impl<'a> BucketHelper<'a> { .get_range( &bucket_id, None, - Some((DeletedFilter::NotDeleted, layout.node_id_vec.clone())), + Some(( + DeletedFilter::NotDeleted, + layout.current().node_id_vec.clone(), + )), 10, EnumerationOrder::Forward, ) diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index d514cb06..9637cc4c 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; use garage_db as db; -use garage_rpc::layout::ClusterLayout; +use garage_rpc::layout::LayoutHistory; use garage_rpc::system::System; use garage_util::background::BackgroundRunner; use garage_util::data::*; @@ -83,8 +83,8 @@ impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> { } impl<T: CountedItem> CounterEntry<T> { - pub fn filtered_values(&self, layout: &ClusterLayout) -> HashMap<String, i64> { - let nodes = &layout.node_id_vec[..]; + pub fn filtered_values(&self, layout: &LayoutHistory) -> HashMap<String, i64> { + let nodes = &layout.current().node_id_vec[..]; self.filtered_values_with_nodes(nodes) } diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index b3019f58..e59c9e9c 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -1,5 +1,4 @@ use std::cmp::Ordering; -use std::sync::Arc; use garage_util::crdt::{Crdt, Lww, LwwMap}; use garage_util::data::*; @@ -64,24 +63,22 @@ impl LayoutHistory { } // Add any new versions to history - let mut versions = self.versions.to_vec(); for v2 in other.versions.iter() { - if let Some(v1) = versions.iter().find(|v| v.version == v2.version) { + if let Some(v1) = self.versions.iter().find(|v| v.version == v2.version) { if v1 != v2 { error!("Inconsistent layout histories: different layout compositions for version {}. Your cluster will be broken as long as this layout version is not replaced.", v2.version); } - } else if versions.iter().all(|v| v.version != v2.version - 1) { + } else if self.versions.iter().all(|v| v.version != v2.version - 1) { error!( "Cannot receive new layout version {}, version {} is missing", v2.version, v2.version - 1 ); } else { - versions.push(v2.clone()); + self.versions.push(v2.clone()); changed = true; } } - self.versions = Arc::from(versions.into_boxed_slice()); // Merge trackers self.update_trackers.merge(&other.update_trackers); @@ -117,9 +114,7 @@ To know the correct value of the new layout version, invoke `garage layout show` let msg = new_version.calculate_partition_assignment()?; - let mut versions = self.versions.to_vec(); - versions.push(new_version); - self.versions = Arc::from(versions.into_boxed_slice()); + self.versions.push(new_version); Ok((self, msg)) } @@ -149,9 +144,7 @@ To know the correct value of the new layout version, invoke `garage layout show` let mut new_version = self.current().clone(); new_version.version += 1; - let mut versions = self.versions.to_vec(); - versions.push(new_version); - self.versions = Arc::from(versions.into_boxed_slice()); + self.versions.push(new_version); Ok(self) } diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs index fa0822fa..14e797be 100644 --- a/src/rpc/layout/schema.rs +++ b/src/rpc/layout/schema.rs @@ -184,7 +184,6 @@ mod v010 { use garage_util::data::{Hash, Uuid}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; - use std::sync::Arc; pub use v09::{LayoutParameters, NodeRole, NodeRoleV, ZoneRedundancy}; /// The layout of the cluster, i.e. the list of roles @@ -215,7 +214,7 @@ mod v010 { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct LayoutHistory { /// The versions currently in use in the cluster - pub versions: Arc<[LayoutVersion]>, + pub versions: Vec<LayoutVersion>, /// Update trackers pub update_trackers: UpdateTrackers, @@ -267,7 +266,7 @@ mod v010 { .collect::<HashMap<Uuid, u64>>(), ); let mut ret = Self { - versions: Arc::from(vec![version].into_boxed_slice()), + versions: vec![version], update_trackers: UpdateTrackers { ack_map: update_tracker.clone(), sync_map: update_tracker.clone(), diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index 34807e3d..a5c83d0f 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -27,11 +27,10 @@ impl TableReplication for TableFullReplication { } fn write_nodes(&self, _hash: &Hash) -> Vec<Uuid> { - let layout = self.system.cluster_layout(); - layout.node_ids().to_vec() + self.system.cluster_layout().current().node_ids().to_vec() } fn write_quorum(&self) -> usize { - let nmembers = self.system.cluster_layout().node_ids().len(); + let nmembers = self.system.cluster_layout().current().node_ids().len(); if nmembers > self.max_faults { nmembers - self.max_faults } else { diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index 60c95cb4..793d87fd 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -26,16 +26,20 @@ pub struct TableShardedReplication { impl TableReplication for TableShardedReplication { fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> { - let layout = self.system.cluster_layout(); - layout.nodes_of(hash, self.replication_factor) + self.system + .cluster_layout() + .current() + .nodes_of(hash, self.replication_factor) } fn read_quorum(&self) -> usize { self.read_quorum } fn write_nodes(&self, hash: &Hash) -> Vec<Uuid> { - let layout = self.system.cluster_layout(); - layout.nodes_of(hash, self.replication_factor) + self.system + .cluster_layout() + .current() + .nodes_of(hash, self.replication_factor) } fn write_quorum(&self) -> usize { self.write_quorum @@ -45,9 +49,9 @@ impl TableReplication for TableShardedReplication { } fn partition_of(&self, hash: &Hash) -> Partition { - self.system.cluster_layout().partition_of(hash) + self.system.cluster_layout().current().partition_of(hash) } fn partitions(&self) -> Vec<(Partition, Hash)> { - self.system.cluster_layout().partitions() + self.system.cluster_layout().current().partitions() } } diff --git a/src/table/sync.rs b/src/table/sync.rs index 65eff7cd..620d83b9 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -492,8 +492,8 @@ impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSync struct SyncWorker<F: TableSchema, R: TableReplication> { syncer: Arc<TableSyncer<F, R>>, - layout_watch: watch::Receiver<Arc<ClusterLayout>>, - layout: Arc<ClusterLayout>, + layout_watch: watch::Receiver<Arc<LayoutHistory>>, + layout: Arc<LayoutHistory>, add_full_sync_rx: mpsc::UnboundedReceiver<()>, todo: Vec<TodoPartition>, next_full_sync: Instant, |