aboutsummaryrefslogtreecommitdiff
path: root/src/rpc/layout
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-11-11 12:08:32 +0100
committerAlex Auvolat <alex@adnab.me>2023-11-11 12:08:32 +0100
commitce89d1ddabe3b9e638b0173949726522ae9a0311 (patch)
tree81be9981c5a6ed155d5500165dbaee5dfa8db7e9 /src/rpc/layout
parentdf36cf3099f6010c4fc62109b85d4d1e62f160cc (diff)
downloadgarage-ce89d1ddabe3b9e638b0173949726522ae9a0311.tar.gz
garage-ce89d1ddabe3b9e638b0173949726522ae9a0311.zip
table sync: adapt to new layout history
Diffstat (limited to 'src/rpc/layout')
-rw-r--r--src/rpc/layout/history.rs21
-rw-r--r--src/rpc/layout/manager.rs1
-rw-r--r--src/rpc/layout/version.rs16
3 files changed, 23 insertions, 15 deletions
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs
index dbb02269..185dbb27 100644
--- a/src/rpc/layout/history.rs
+++ b/src/rpc/layout/history.rs
@@ -47,11 +47,19 @@ impl LayoutHistory {
// ------------------ who stores what now? ---------------
- pub fn max_ack(&self) -> u64 {
+ pub fn all_ack(&self) -> u64 {
self.calculate_global_min(&self.update_trackers.ack_map)
}
- pub fn all_storage_nodes(&self) -> HashSet<Uuid> {
+ pub fn min_stored(&self) -> u64 {
+ self.versions.first().as_ref().unwrap().version
+ }
+
+ pub fn sync_versions(&self) -> (u64, u64, u64) {
+ (self.current().version, self.all_ack(), self.min_stored())
+ }
+
+ pub fn all_nongateway_nodes(&self) -> HashSet<Uuid> {
// TODO: cache this
self.versions
.iter()
@@ -71,11 +79,10 @@ impl LayoutHistory {
version.nodes_of(position, version.replication_factor)
}
- pub fn write_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> {
+ pub fn write_sets_of<'a>(&'a self, position: &'a Hash) -> impl Iterator<Item = Vec<Uuid>> + 'a {
self.versions
.iter()
- .map(|x| x.nodes_of(position, x.replication_factor))
- .collect::<Vec<_>>()
+ .map(move |x| x.nodes_of(position, x.replication_factor))
}
// ------------------ update tracking ---------------
@@ -129,7 +136,9 @@ impl LayoutHistory {
}
pub(crate) fn calculate_global_min(&self, tracker: &UpdateTracker) -> u64 {
- let storage_nodes = self.all_storage_nodes();
+ // TODO: for TableFullReplication, counting gateway nodes might be
+ // necessary? Think about this more.
+ let storage_nodes = self.all_nongateway_nodes();
storage_nodes
.iter()
.map(|x| tracker.0.get(x).copied().unwrap_or(0))
diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs
index b0302b12..7d60bae6 100644
--- a/src/rpc/layout/manager.rs
+++ b/src/rpc/layout/manager.rs
@@ -92,6 +92,7 @@ impl LayoutManager {
persist_cluster_layout,
layout,
change_notify,
+ table_sync_version: Mutex::new(HashMap::new()),
system_endpoint,
rpc_helper,
}))
diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs
index 8133672a..f45a3c35 100644
--- a/src/rpc/layout/version.rs
+++ b/src/rpc/layout/version.rs
@@ -98,15 +98,13 @@ impl LayoutVersion {
}
/// Get the list of partitions and the first hash of a partition key that would fall in it
- pub fn partitions(&self) -> Vec<(Partition, Hash)> {
- (0..(1 << PARTITION_BITS))
- .map(|i| {
- let top = (i as u16) << (16 - PARTITION_BITS);
- let mut location = [0u8; 32];
- location[..2].copy_from_slice(&u16::to_be_bytes(top)[..]);
- (i as u16, Hash::from(location))
- })
- .collect::<Vec<_>>()
+ pub fn partitions(&self) -> impl Iterator<Item = (Partition, Hash)> + '_ {
+ (0..(1 << PARTITION_BITS)).map(|i| {
+ let top = (i as u16) << (16 - PARTITION_BITS);
+ let mut location = [0u8; 32];
+ location[..2].copy_from_slice(&u16::to_be_bytes(top)[..]);
+ (i as u16, Hash::from(location))
+ })
}
/// Return the n servers in which data for this hash should be replicated