aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rpc/layout/helper.rs224
-rw-r--r--src/rpc/layout/history.rs278
-rw-r--r--src/rpc/layout/manager.rs5
-rw-r--r--src/rpc/layout/mod.rs3
4 files changed, 256 insertions, 254 deletions
diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs
new file mode 100644
index 00000000..ed3da498
--- /dev/null
+++ b/src/rpc/layout/helper.rs
@@ -0,0 +1,224 @@
+use std::collections::HashMap;
+use std::ops::Deref;
+use std::sync::atomic::{AtomicUsize, Ordering};
+
+use garage_util::data::*;
+
+use super::schema::*;
+
+pub struct LayoutHelper {
+ layout: Option<LayoutHistory>,
+
+ // cached values
+ ack_map_min: u64,
+ sync_map_min: u64,
+
+ all_nodes: Vec<Uuid>,
+ all_nongateway_nodes: Vec<Uuid>,
+
+ pub(crate) trackers_hash: Hash,
+ pub(crate) staging_hash: Hash,
+
+ // ack lock: counts in-progress write operations for each
+ // layout version ; we don't increase the ack update tracker
+ // while this lock is nonzero
+ pub(crate) ack_lock: HashMap<u64, AtomicUsize>,
+}
+
+impl Deref for LayoutHelper {
+ type Target = LayoutHistory;
+ fn deref(&self) -> &LayoutHistory {
+ self.layout()
+ }
+}
+
+impl LayoutHelper {
+ pub fn new(mut layout: LayoutHistory, mut ack_lock: HashMap<u64, AtomicUsize>) -> Self {
+ layout.cleanup_old_versions();
+
+ let all_nongateway_nodes = layout.get_all_nongateway_nodes();
+ layout.clamp_update_trackers(&all_nongateway_nodes);
+
+ let min_version = layout.min_stored();
+ let ack_map_min = layout
+ .update_trackers
+ .ack_map
+ .min(&all_nongateway_nodes, min_version);
+ let sync_map_min = layout
+ .update_trackers
+ .sync_map
+ .min(&all_nongateway_nodes, min_version);
+
+ let all_nodes = layout.get_all_nodes();
+ let trackers_hash = layout.calculate_trackers_hash();
+ let staging_hash = layout.calculate_staging_hash();
+
+ ack_lock.retain(|_, cnt| *cnt.get_mut() > 0);
+ ack_lock
+ .entry(layout.current().version)
+ .or_insert(AtomicUsize::new(0));
+
+ LayoutHelper {
+ layout: Some(layout),
+ ack_map_min,
+ sync_map_min,
+ all_nodes,
+ all_nongateway_nodes,
+ trackers_hash,
+ staging_hash,
+ ack_lock,
+ }
+ }
+
+ // ------------------ single updating function --------------
+
+ fn layout(&self) -> &LayoutHistory {
+ self.layout.as_ref().unwrap()
+ }
+
+ pub(crate) fn update<F>(&mut self, f: F) -> bool
+ where
+ F: FnOnce(&mut LayoutHistory) -> bool,
+ {
+ let changed = f(&mut self.layout.as_mut().unwrap());
+ if changed {
+ *self = Self::new(
+ self.layout.take().unwrap(),
+ std::mem::take(&mut self.ack_lock),
+ );
+ }
+ changed
+ }
+
+ // ------------------ read helpers ---------------
+
+ pub fn all_nodes(&self) -> &[Uuid] {
+ &self.all_nodes
+ }
+
+ pub fn all_nongateway_nodes(&self) -> &[Uuid] {
+ &self.all_nongateway_nodes
+ }
+
+ pub fn all_ack(&self) -> u64 {
+ self.ack_map_min
+ }
+
+ pub fn sync_versions(&self) -> (u64, u64, u64) {
+ (
+ self.layout().current().version,
+ self.all_ack(),
+ self.layout().min_stored(),
+ )
+ }
+
+ pub fn read_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
+ let sync_min = self.sync_map_min;
+ let version = self
+ .layout()
+ .versions
+ .iter()
+ .find(|x| x.version == sync_min)
+ .or(self.layout().versions.last())
+ .unwrap();
+ version
+ .nodes_of(position, version.replication_factor)
+ .collect()
+ }
+
+ pub(crate) fn write_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> {
+ self.layout()
+ .versions
+ .iter()
+ .map(|x| x.nodes_of(position, x.replication_factor).collect())
+ .collect()
+ }
+
+ pub fn storage_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
+ let mut ret = vec![];
+ for version in self.layout().versions.iter() {
+ ret.extend(version.nodes_of(position, version.replication_factor));
+ }
+ ret.sort();
+ ret.dedup();
+ ret
+ }
+
+ pub fn trackers_hash(&self) -> Hash {
+ self.trackers_hash
+ }
+
+ pub fn staging_hash(&self) -> Hash {
+ self.staging_hash
+ }
+
+ // ------------------ helpers for update tracking ---------------
+
+ pub(crate) fn update_trackers(&mut self, local_node_id: Uuid) {
+ // Ensure trackers for this node's values are up-to-date
+
+ // 1. Acknowledge the last layout version which is not currently
+ // locked by an in-progress write operation
+ self.ack_max_free(local_node_id);
+
+ // 2. Assume the data on this node is sync'ed up at least to
+ // the first layout version in the history
+ self.sync_first(local_node_id);
+
+ // 3. Acknowledge everyone has synced up to min(self.sync_map)
+ self.sync_ack(local_node_id);
+
+ info!("ack_map: {:?}", self.update_trackers.ack_map);
+ info!("sync_map: {:?}", self.update_trackers.sync_map);
+ info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map);
+ }
+
+ fn sync_first(&mut self, local_node_id: Uuid) {
+ let first_version = self.versions.first().as_ref().unwrap().version;
+ self.update(|layout| {
+ layout
+ .update_trackers
+ .sync_map
+ .set_max(local_node_id, first_version)
+ });
+ }
+
+ fn sync_ack(&mut self, local_node_id: Uuid) {
+ let sync_map_min = self.sync_map_min;
+ self.update(|layout| {
+ layout
+ .update_trackers
+ .sync_ack_map
+ .set_max(local_node_id, sync_map_min)
+ });
+ }
+
+ pub(crate) fn ack_max_free(&mut self, local_node_id: Uuid) -> bool {
+ let max_ack = self.max_free_ack();
+ let changed = self.update(|layout| {
+ layout
+ .update_trackers
+ .ack_map
+ .set_max(local_node_id, max_ack)
+ });
+ if changed {
+ info!("ack_until updated to {}", max_ack);
+ }
+ changed
+ }
+
+ pub(crate) fn max_free_ack(&self) -> u64 {
+ self.layout()
+ .versions
+ .iter()
+ .map(|x| x.version)
+ .take_while(|v| {
+ self.ack_lock
+ .get(v)
+ .map(|x| x.load(Ordering::Relaxed) == 0)
+ .unwrap_or(true)
+ })
+ .max()
+ .unwrap_or(self.min_stored())
+ }
+}
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs
index dd38efa7..0a139549 100644
--- a/src/rpc/layout/history.rs
+++ b/src/rpc/layout/history.rs
@@ -1,7 +1,4 @@
-use std::collections::HashMap;
use std::collections::HashSet;
-use std::ops::Deref;
-use std::sync::atomic::{AtomicUsize, Ordering};
use garage_util::crdt::{Crdt, Lww, LwwMap};
use garage_util::data::*;
@@ -11,225 +8,6 @@ use garage_util::error::*;
use super::schema::*;
use super::*;
-pub struct LayoutHelper {
- layout: Option<LayoutHistory>,
-
- // cached values
- ack_map_min: u64,
- sync_map_min: u64,
-
- all_nodes: Vec<Uuid>,
- all_nongateway_nodes: Vec<Uuid>,
-
- trackers_hash: Hash,
- staging_hash: Hash,
-
- // ack lock: counts in-progress write operations for each
- // layout version ; we don't increase the ack update tracker
- // while this lock is nonzero
- pub(crate) ack_lock: HashMap<u64, AtomicUsize>,
-}
-
-impl Deref for LayoutHelper {
- type Target = LayoutHistory;
- fn deref(&self) -> &LayoutHistory {
- self.layout()
- }
-}
-
-impl LayoutHelper {
- pub fn new(mut layout: LayoutHistory, mut ack_lock: HashMap<u64, AtomicUsize>) -> Self {
- layout.cleanup_old_versions();
-
- let all_nongateway_nodes = layout.get_all_nongateway_nodes();
- layout.clamp_update_trackers(&all_nongateway_nodes);
-
- let min_version = layout.min_stored();
- let ack_map_min = layout
- .update_trackers
- .ack_map
- .min(&all_nongateway_nodes, min_version);
- let sync_map_min = layout
- .update_trackers
- .sync_map
- .min(&all_nongateway_nodes, min_version);
-
- let all_nodes = layout.get_all_nodes();
- let trackers_hash = layout.calculate_trackers_hash();
- let staging_hash = layout.calculate_staging_hash();
-
- ack_lock.retain(|_, cnt| *cnt.get_mut() > 0);
- ack_lock
- .entry(layout.current().version)
- .or_insert(AtomicUsize::new(0));
-
- LayoutHelper {
- layout: Some(layout),
- ack_map_min,
- sync_map_min,
- all_nodes,
- all_nongateway_nodes,
- trackers_hash,
- staging_hash,
- ack_lock,
- }
- }
-
- // ------------------ single updating function --------------
-
- fn layout(&self) -> &LayoutHistory {
- self.layout.as_ref().unwrap()
- }
-
- pub(crate) fn update<F>(&mut self, f: F) -> bool
- where
- F: FnOnce(&mut LayoutHistory) -> bool,
- {
- let changed = f(&mut self.layout.as_mut().unwrap());
- if changed {
- *self = Self::new(
- self.layout.take().unwrap(),
- std::mem::take(&mut self.ack_lock),
- );
- }
- changed
- }
-
- // ------------------ read helpers ---------------
-
- pub fn all_nodes(&self) -> &[Uuid] {
- &self.all_nodes
- }
-
- pub fn all_nongateway_nodes(&self) -> &[Uuid] {
- &self.all_nongateway_nodes
- }
-
- pub fn all_ack(&self) -> u64 {
- self.ack_map_min
- }
-
- pub fn sync_versions(&self) -> (u64, u64, u64) {
- (
- self.layout().current().version,
- self.all_ack(),
- self.layout().min_stored(),
- )
- }
-
- pub fn read_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
- let sync_min = self.sync_map_min;
- let version = self
- .layout()
- .versions
- .iter()
- .find(|x| x.version == sync_min)
- .or(self.layout().versions.last())
- .unwrap();
- version
- .nodes_of(position, version.replication_factor)
- .collect()
- }
-
- pub(crate) fn write_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> {
- self.layout()
- .versions
- .iter()
- .map(|x| x.nodes_of(position, x.replication_factor).collect())
- .collect()
- }
-
- pub fn storage_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
- let mut ret = vec![];
- for version in self.layout().versions.iter() {
- ret.extend(version.nodes_of(position, version.replication_factor));
- }
- ret.sort();
- ret.dedup();
- ret
- }
-
- pub fn trackers_hash(&self) -> Hash {
- self.trackers_hash
- }
-
- pub fn staging_hash(&self) -> Hash {
- self.staging_hash
- }
-
- // ------------------ helpers for update tracking ---------------
-
- pub(crate) fn update_trackers(&mut self, local_node_id: Uuid) {
- // Ensure trackers for this node's values are up-to-date
-
- // 1. Acknowledge the last layout version which is not currently
- // locked by an in-progress write operation
- self.ack_max_free(local_node_id);
-
- // 2. Assume the data on this node is sync'ed up at least to
- // the first layout version in the history
- self.sync_first(local_node_id);
-
- // 3. Acknowledge everyone has synced up to min(self.sync_map)
- self.sync_ack(local_node_id);
-
- info!("ack_map: {:?}", self.update_trackers.ack_map);
- info!("sync_map: {:?}", self.update_trackers.sync_map);
- info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map);
- }
-
- fn sync_first(&mut self, local_node_id: Uuid) {
- let first_version = self.versions.first().as_ref().unwrap().version;
- self.update(|layout| {
- layout
- .update_trackers
- .sync_map
- .set_max(local_node_id, first_version)
- });
- }
-
- fn sync_ack(&mut self, local_node_id: Uuid) {
- let sync_map_min = self.sync_map_min;
- self.update(|layout| {
- layout
- .update_trackers
- .sync_ack_map
- .set_max(local_node_id, sync_map_min)
- });
- }
-
- pub(crate) fn ack_max_free(&mut self, local_node_id: Uuid) -> bool {
- let max_ack = self.max_free_ack();
- let changed = self.update(|layout| {
- layout
- .update_trackers
- .ack_map
- .set_max(local_node_id, max_ack)
- });
- if changed {
- info!("ack_until updated to {}", max_ack);
- }
- changed
- }
-
- pub(crate) fn max_free_ack(&self) -> u64 {
- self.layout()
- .versions
- .iter()
- .map(|x| x.version)
- .take_while(|v| {
- self.ack_lock
- .get(v)
- .map(|x| x.load(Ordering::Relaxed) == 0)
- .unwrap_or(true)
- })
- .max()
- .unwrap_or(self.min_stored())
- }
-}
-
-// ----
-
impl LayoutHistory {
pub fn new(replication_factor: usize) -> Self {
let version = LayoutVersion::new(replication_factor);
@@ -270,7 +48,7 @@ impl LayoutHistory {
}
}
- fn get_all_nongateway_nodes(&self) -> Vec<Uuid> {
+ pub(crate) fn get_all_nongateway_nodes(&self) -> Vec<Uuid> {
if self.versions.len() == 1 {
self.versions[0].nongateway_nodes().to_vec()
} else {
@@ -286,8 +64,21 @@ impl LayoutHistory {
// ---- housekeeping (all invoked by LayoutHelper) ----
- fn cleanup_old_versions(&mut self) {
- loop {
+ pub(crate) fn cleanup_old_versions(&mut self) {
+ // If there are invalid versions before valid versions, remove them
+ if self.versions.len() > 1 && self.current().check().is_ok() {
+ while self.versions.len() > 1 && self.versions.first().unwrap().check().is_err() {
+ let removed = self.versions.remove(0);
+ info!(
+ "Layout history: pruning old invalid version {}",
+ removed.version
+ );
+ }
+ }
+
+ // If there are old versions that no one is reading from anymore,
+ // remove them
+ while self.versions.len() > 1 {
let all_nongateway_nodes = self.get_all_nongateway_nodes();
let min_version = self.min_stored();
let sync_ack_map_min = self
@@ -303,7 +94,7 @@ impl LayoutHistory {
}
}
- fn clamp_update_trackers(&mut self, nodes: &[Uuid]) {
+ pub(crate) fn clamp_update_trackers(&mut self, nodes: &[Uuid]) {
let min_v = self.min_stored();
for node in nodes {
self.update_trackers.ack_map.set_max(*node, min_v);
@@ -312,11 +103,11 @@ impl LayoutHistory {
}
}
- fn calculate_trackers_hash(&self) -> Hash {
+ pub(crate) fn calculate_trackers_hash(&self) -> Hash {
blake2sum(&nonversioned_encode(&self.update_trackers).unwrap()[..])
}
- fn calculate_staging_hash(&self) -> Hash {
+ pub(crate) fn calculate_staging_hash(&self) -> Hash {
blake2sum(&nonversioned_encode(&self.staging).unwrap()[..])
}
@@ -328,6 +119,7 @@ impl LayoutHistory {
// Add any new versions to history
for v2 in other.versions.iter() {
if let Some(v1) = self.versions.iter().find(|v| v.version == v2.version) {
+ // Version is already present, check consistency
if v1 != v2 {
error!("Inconsistent layout histories: different layout compositions for version {}. Your cluster will be broken as long as this layout version is not replaced.", v2.version);
}
@@ -344,24 +136,14 @@ impl LayoutHistory {
}
// Merge trackers
- if self.update_trackers != other.update_trackers {
- let c = self.update_trackers.merge(&other.update_trackers);
- changed = changed || c;
- }
-
- // If there are invalid versions before valid versions, remove them,
- // and increment update trackers
- if self.versions.len() > 1 && self.current().check().is_ok() {
- while self.versions.first().unwrap().check().is_err() {
- self.versions.remove(0);
- changed = true;
- }
- }
+ let c = self.update_trackers.merge(&other.update_trackers);
+ changed = changed || c;
// Merge staged layout changes
if self.staging != other.staging {
+ let prev_staging = self.staging.clone();
self.staging.merge(&other.staging);
- changed = true;
+ changed = changed || self.staging != prev_staging;
}
changed
@@ -390,11 +172,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
.calculate_next_version(&self.staging.get())?;
self.versions.push(new_version);
- if self.current().check().is_ok() {
- while self.versions.first().unwrap().check().is_err() {
- self.versions.remove(0);
- }
- }
+ self.cleanup_old_versions();
// Reset the staged layout changes
self.staging.update(LayoutStaging {
@@ -415,11 +193,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
}
pub fn check(&self) -> Result<(), String> {
- for version in self.versions.iter() {
- version.check()?;
- }
-
// TODO: anything more ?
- Ok(())
+ self.current().check()
}
}
diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs
index 4e073d1f..85d94ffa 100644
--- a/src/rpc/layout/manager.rs
+++ b/src/rpc/layout/manager.rs
@@ -184,17 +184,20 @@ impl LayoutManager {
return Some(layout.clone());
}
}
+
None
}
fn merge_layout_trackers(&self, adv: &UpdateTrackers) -> Option<UpdateTrackers> {
let mut layout = self.layout.write().unwrap();
+
if layout.update_trackers != *adv {
if layout.update(|l| l.update_trackers.merge(adv)) {
layout.update_trackers(self.node_id);
return Some(layout.update_trackers.clone());
}
}
+
None
}
@@ -284,7 +287,7 @@ impl LayoutManager {
}
pub(crate) fn handle_pull_cluster_layout(&self) -> SystemRpc {
- let layout = self.layout.read().unwrap().clone(); // TODO: avoid cloning
+ let layout = self.layout.read().unwrap().clone();
SystemRpc::AdvertiseClusterLayout(layout)
}
diff --git a/src/rpc/layout/mod.rs b/src/rpc/layout/mod.rs
index 859287c8..91151ab4 100644
--- a/src/rpc/layout/mod.rs
+++ b/src/rpc/layout/mod.rs
@@ -1,4 +1,5 @@
mod graph_algo;
+mod helper;
mod history;
mod schema;
mod version;
@@ -10,7 +11,7 @@ pub mod manager;
// ---- re-exports ----
-pub use history::*;
+pub use helper::LayoutHelper;
pub use manager::WriteLock;
pub use schema::*;
pub use version::*;