diff options
author | Alex Auvolat <alex@adnab.me> | 2023-11-11 12:08:32 +0100 |
---|---|---|
committer | Alex Auvolat <alex@adnab.me> | 2023-11-11 12:08:32 +0100 |
commit | ce89d1ddabe3b9e638b0173949726522ae9a0311 (patch) | |
tree | 81be9981c5a6ed155d5500165dbaee5dfa8db7e9 /src/rpc/layout | |
parent | df36cf3099f6010c4fc62109b85d4d1e62f160cc (diff) | |
download | garage-ce89d1ddabe3b9e638b0173949726522ae9a0311.tar.gz garage-ce89d1ddabe3b9e638b0173949726522ae9a0311.zip |
table sync: adapt to new layout history
Diffstat (limited to 'src/rpc/layout')
-rw-r--r-- | src/rpc/layout/history.rs | 21 | ||||
-rw-r--r-- | src/rpc/layout/manager.rs | 1 | ||||
-rw-r--r-- | src/rpc/layout/version.rs | 16 |
3 files changed, 23 insertions, 15 deletions
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index dbb02269..185dbb27 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -47,11 +47,19 @@ impl LayoutHistory { // ------------------ who stores what now? --------------- - pub fn max_ack(&self) -> u64 { + pub fn all_ack(&self) -> u64 { self.calculate_global_min(&self.update_trackers.ack_map) } - pub fn all_storage_nodes(&self) -> HashSet<Uuid> { + pub fn min_stored(&self) -> u64 { + self.versions.first().as_ref().unwrap().version + } + + pub fn sync_versions(&self) -> (u64, u64, u64) { + (self.current().version, self.all_ack(), self.min_stored()) + } + + pub fn all_nongateway_nodes(&self) -> HashSet<Uuid> { // TODO: cache this self.versions .iter() @@ -71,11 +79,10 @@ impl LayoutHistory { version.nodes_of(position, version.replication_factor) } - pub fn write_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> { + pub fn write_sets_of<'a>(&'a self, position: &'a Hash) -> impl Iterator<Item = Vec<Uuid>> + 'a { self.versions .iter() - .map(|x| x.nodes_of(position, x.replication_factor)) - .collect::<Vec<_>>() + .map(move |x| x.nodes_of(position, x.replication_factor)) } // ------------------ update tracking --------------- @@ -129,7 +136,9 @@ impl LayoutHistory { } pub(crate) fn calculate_global_min(&self, tracker: &UpdateTracker) -> u64 { - let storage_nodes = self.all_storage_nodes(); + // TODO: for TableFullReplication, counting gateway nodes might be + // necessary? Think about this more. + let storage_nodes = self.all_nongateway_nodes(); storage_nodes .iter() .map(|x| tracker.0.get(x).copied().unwrap_or(0)) diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index b0302b12..7d60bae6 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -92,6 +92,7 @@ impl LayoutManager { persist_cluster_layout, layout, change_notify, + table_sync_version: Mutex::new(HashMap::new()), system_endpoint, rpc_helper, })) diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs index 8133672a..f45a3c35 100644 --- a/src/rpc/layout/version.rs +++ b/src/rpc/layout/version.rs @@ -98,15 +98,13 @@ impl LayoutVersion { } /// Get the list of partitions and the first hash of a partition key that would fall in it - pub fn partitions(&self) -> Vec<(Partition, Hash)> { - (0..(1 << PARTITION_BITS)) - .map(|i| { - let top = (i as u16) << (16 - PARTITION_BITS); - let mut location = [0u8; 32]; - location[..2].copy_from_slice(&u16::to_be_bytes(top)[..]); - (i as u16, Hash::from(location)) - }) - .collect::<Vec<_>>() + pub fn partitions(&self) -> impl Iterator<Item = (Partition, Hash)> + '_ { + (0..(1 << PARTITION_BITS)).map(|i| { + let top = (i as u16) << (16 - PARTITION_BITS); + let mut location = [0u8; 32]; + location[..2].copy_from_slice(&u16::to_be_bytes(top)[..]); + (i as u16, Hash::from(location)) + }) } /// Return the n servers in which data for this hash should be replicated |