aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/layout
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc/layout')
-rw-r--r--src/rpc/layout/history.rs74
-rw-r--r--src/rpc/layout/manager.rs7
-rw-r--r--src/rpc/layout/schema.rs15
-rw-r--r--src/rpc/layout/version.rs46
4 files changed, 109 insertions, 33 deletions
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs
index 357b9d62..347f03db 100644
--- a/src/rpc/layout/history.rs
+++ b/src/rpc/layout/history.rs
@@ -1,3 +1,5 @@
+use std::collections::HashSet;
+
use garage_util::crdt::{Crdt, Lww, LwwMap};
use garage_util::data::*;
use garage_util::encode::nonversioned_encode;
@@ -30,6 +32,14 @@ impl LayoutHistory {
self.versions.last().as_ref().unwrap()
}
+ pub fn all_storage_nodes(&self) -> HashSet<Uuid> {
+ self.versions
+ .iter()
+ .map(|x| x.nongateway_nodes())
+ .flatten()
+ .collect::<HashSet<_>>()
+ }
+
pub(crate) fn update_hashes(&mut self) {
self.trackers_hash = self.calculate_trackers_hash();
self.staging_hash = self.calculate_staging_hash();
@@ -43,6 +53,65 @@ impl LayoutHistory {
blake2sum(&nonversioned_encode(&self.staging).unwrap()[..])
}
+ // ------------------ update tracking ---------------
+
+ pub(crate) fn update_trackers(&mut self, node_id: Uuid) {
+ // Ensure trackers for this node's values are up-to-date
+
+ // 1. Acknowledge the last layout version in the history
+ self.ack_last(node_id);
+
+ // 2. Assume the data on this node is sync'ed up at least to
+ // the first layout version in the history
+ self.sync_first(node_id);
+
+ // 3. Acknowledge everyone has synced up to min(self.sync_map)
+ self.sync_ack(node_id);
+
+ // 4. Cleanup layout versions that are not needed anymore
+ self.cleanup_old_versions();
+
+ info!("ack_map: {:?}", self.update_trackers.ack_map);
+ info!("sync_map: {:?}", self.update_trackers.sync_map);
+ info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map);
+
+ // Finally, update hashes
+ self.update_hashes();
+ }
+
+ pub(crate) fn ack_last(&mut self, node: Uuid) {
+ let last_version = self.current().version;
+ self.update_trackers.ack_map.set_max(node, last_version);
+ }
+
+ pub(crate) fn sync_first(&mut self, node: Uuid) {
+ let first_version = self.versions.first().as_ref().unwrap().version;
+ self.update_trackers.sync_map.set_max(node, first_version);
+ }
+
+ pub(crate) fn sync_ack(&mut self, node: Uuid) {
+ self.update_trackers.sync_ack_map.set_max(
+ node,
+ self.calculate_global_min(&self.update_trackers.sync_map),
+ );
+ }
+
+ pub(crate) fn cleanup_old_versions(&mut self) {
+ let min_sync_ack = self.calculate_global_min(&self.update_trackers.sync_ack_map);
+ while self.versions.first().as_ref().unwrap().version < min_sync_ack {
+ self.versions.remove(0);
+ }
+ }
+
+ pub(crate) fn calculate_global_min(&self, tracker: &UpdateTracker) -> u64 {
+ let storage_nodes = self.all_storage_nodes();
+ storage_nodes
+ .iter()
+ .map(|x| tracker.0.get(x).copied().unwrap_or(0))
+ .min()
+ .unwrap_or(0)
+ }
+
// ================== updates to layout, public interface ===================
pub fn merge(&mut self, other: &LayoutHistory) -> bool {
@@ -78,11 +147,6 @@ impl LayoutHistory {
changed = true;
}
- // Update hashes if there are changes
- if changed {
- self.update_hashes();
- }
-
changed
}
diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs
index a2502f58..ffcc938b 100644
--- a/src/rpc/layout/manager.rs
+++ b/src/rpc/layout/manager.rs
@@ -51,7 +51,7 @@ impl LayoutManager {
let persist_cluster_layout: Persister<LayoutHistory> =
Persister::new(&config.metadata_dir, "cluster_layout");
- let cluster_layout = match persist_cluster_layout.load() {
+ let mut cluster_layout = match persist_cluster_layout.load() {
Ok(x) => {
if x.current().replication_factor != replication_factor {
return Err(Error::Message(format!(
@@ -71,6 +71,8 @@ impl LayoutManager {
}
};
+ cluster_layout.update_trackers(node_id.into());
+
let layout = Arc::new(RwLock::new(cluster_layout));
let change_notify = Arc::new(Notify::new());
@@ -126,7 +128,7 @@ impl LayoutManager {
if prev_layout_check && layout.check().is_err() {
panic!("Merged two correct layouts and got an incorrect layout.");
}
-
+ layout.update_trackers(self.node_id);
return Some(layout.clone());
}
}
@@ -137,6 +139,7 @@ impl LayoutManager {
let mut layout = self.layout.write().unwrap();
if layout.update_trackers != *adv {
if layout.update_trackers.merge(adv) {
+ layout.update_trackers(self.node_id);
return Some(layout.update_trackers.clone());
}
}
diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs
index abae5bd8..9f5d6f62 100644
--- a/src/rpc/layout/schema.rs
+++ b/src/rpc/layout/schema.rs
@@ -3,6 +3,7 @@ use std::fmt;
use bytesize::ByteSize;
use garage_util::crdt::{AutoCrdt, Crdt};
+use garage_util::data::Uuid;
mod v08 {
use crate::layout::CompactNodeType;
@@ -276,8 +277,7 @@ mod v010 {
let update_tracker = UpdateTracker(
version
.nongateway_nodes()
- .iter()
- .map(|x| (*x, version.version))
+ .map(|x| (x, version.version))
.collect::<HashMap<Uuid, u64>>(),
);
let staging = LayoutStaging {
@@ -375,8 +375,15 @@ impl UpdateTracker {
changed
}
- pub(crate) fn min(&self) -> u64 {
- self.0.iter().map(|(_, v)| *v).min().unwrap_or(0)
+ pub(crate) fn set_max(&mut self, peer: Uuid, value: u64) {
+ match self.0.get_mut(&peer) {
+ Some(e) => {
+ *e = std::cmp::max(*e, value);
+ }
+ None => {
+ self.0.insert(peer, value);
+ }
+ }
}
}
diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs
index 6918fdf9..65c62f63 100644
--- a/src/rpc/layout/version.rs
+++ b/src/rpc/layout/version.rs
@@ -134,15 +134,14 @@ impl LayoutVersion {
// ===================== internal information extractors ======================
/// Returns the uuids of the non_gateway nodes in self.node_id_vec.
- pub(crate) fn nongateway_nodes(&self) -> Vec<Uuid> {
- let mut result = Vec::<Uuid>::new();
- for uuid in self.node_id_vec.iter() {
- match self.node_role(uuid) {
- Some(role) if role.capacity.is_some() => result.push(*uuid),
- _ => (),
- }
- }
- result
+ pub(crate) fn nongateway_nodes(&self) -> impl Iterator<Item = Uuid> + '_ {
+ self.node_id_vec
+ .iter()
+ .copied()
+ .filter(move |uuid| match self.node_role(uuid) {
+ Some(role) if role.capacity.is_some() => true,
+ _ => false,
+ })
}
/// Given a node uuids, this function returns the label of its zone
@@ -158,8 +157,8 @@ impl LayoutVersion {
/// Returns the sum of capacities of non gateway nodes in the cluster
fn get_total_capacity(&self) -> Result<u64, Error> {
let mut total_capacity = 0;
- for uuid in self.nongateway_nodes().iter() {
- total_capacity += self.get_node_capacity(uuid)?;
+ for uuid in self.nongateway_nodes() {
+ total_capacity += self.get_node_capacity(&uuid)?;
}
Ok(total_capacity)
}
@@ -320,7 +319,7 @@ impl LayoutVersion {
// to use them as indices in the flow graphs.
let (id_to_zone, zone_to_id) = self.generate_nongateway_zone_ids()?;
- let nb_nongateway_nodes = self.nongateway_nodes().len();
+ let nb_nongateway_nodes = self.nongateway_nodes().count();
if nb_nongateway_nodes < self.replication_factor {
return Err(Error::Message(format!(
"The number of nodes with positive \
@@ -479,7 +478,8 @@ impl LayoutVersion {
let mut id_to_zone = Vec::<String>::new();
let mut zone_to_id = HashMap::<String, usize>::new();
- for uuid in self.nongateway_nodes().iter() {
+ let nongateway_nodes = self.nongateway_nodes().collect::<Vec<_>>();
+ for uuid in nongateway_nodes.iter() {
let r = self.node_role(uuid).unwrap();
if !zone_to_id.contains_key(&r.zone) && r.capacity.is_some() {
zone_to_id.insert(r.zone.clone(), id_to_zone.len());
@@ -556,8 +556,10 @@ impl LayoutVersion {
exclude_assoc: &HashSet<(usize, usize)>,
zone_redundancy: usize,
) -> Result<Graph<FlowEdge>, Error> {
- let vertices =
- LayoutVersion::generate_graph_vertices(zone_to_id.len(), self.nongateway_nodes().len());
+ let vertices = LayoutVersion::generate_graph_vertices(
+ zone_to_id.len(),
+ self.nongateway_nodes().count(),
+ );
let mut g = Graph::<FlowEdge>::new(&vertices);
let nb_zones = zone_to_id.len();
for p in 0..NB_PARTITIONS {
@@ -576,7 +578,7 @@ impl LayoutVersion {
)?;
}
}
- for n in 0..self.nongateway_nodes().len() {
+ for n in 0..self.nongateway_nodes().count() {
let node_capacity = self.get_node_capacity(&self.node_id_vec[n])?;
let node_zone = zone_to_id[self.get_node_zone(&self.node_id_vec[n])?];
g.add_edge(Vertex::N(n), Vertex::Sink, node_capacity / partition_size)?;
@@ -600,7 +602,7 @@ impl LayoutVersion {
// previous assignment
let mut exclude_edge = HashSet::<(usize, usize)>::new();
if let Some(prev_assign) = prev_assign_opt {
- let nb_nodes = self.nongateway_nodes().len();
+ let nb_nodes = self.nongateway_nodes().count();
for (p, prev_assign_p) in prev_assign.iter().enumerate() {
for n in 0..nb_nodes {
exclude_edge.insert((p, n));
@@ -652,7 +654,7 @@ impl LayoutVersion {
// We compute the maximal length of a simple path in gflow. It is used in the
// Bellman-Ford algorithm in optimize_flow_with_cost to set the number
// of iterations.
- let nb_nodes = self.nongateway_nodes().len();
+ let nb_nodes = self.nongateway_nodes().count();
let path_length = 4 * nb_nodes;
gflow.optimize_flow_with_cost(&cost, path_length)?;
@@ -730,7 +732,7 @@ impl LayoutVersion {
}
// We define and fill in the following tables
- let storing_nodes = self.nongateway_nodes();
+ let storing_nodes = self.nongateway_nodes().collect::<Vec<_>>();
let mut new_partitions = vec![0; storing_nodes.len()];
let mut stored_partitions = vec![0; storing_nodes.len()];
@@ -873,9 +875,9 @@ mod tests {
for z in zones.iter() {
zone_token.insert(z.clone(), 0);
}
- for uuid in cl.nongateway_nodes().iter() {
- let z = cl.get_node_zone(uuid)?;
- let c = cl.get_node_capacity(uuid)?;
+ for uuid in cl.nongateway_nodes() {
+ let z = cl.get_node_zone(&uuid)?;
+ let c = cl.get_node_capacity(&uuid)?;
zone_token.insert(
z.clone(),
zone_token[&z] + min(NB_PARTITIONS, (c / over_size) as usize),