aboutsummaryrefslogtreecommitdiff
path: root/src/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc')
-rw-r--r--src/rpc/layout/history.rs21
-rw-r--r--src/rpc/layout/manager.rs1
-rw-r--r--src/rpc/layout/version.rs16
-rw-r--r--src/rpc/system.rs2
4 files changed, 24 insertions, 16 deletions
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs
index dbb02269..185dbb27 100644
--- a/src/rpc/layout/history.rs
+++ b/src/rpc/layout/history.rs
@@ -47,11 +47,19 @@ impl LayoutHistory {
// ------------------ who stores what now? ---------------
- pub fn max_ack(&self) -> u64 {
+ pub fn all_ack(&self) -> u64 {
self.calculate_global_min(&self.update_trackers.ack_map)
}
- pub fn all_storage_nodes(&self) -> HashSet<Uuid> {
+ pub fn min_stored(&self) -> u64 {
+ self.versions.first().as_ref().unwrap().version
+ }
+
+ pub fn sync_versions(&self) -> (u64, u64, u64) {
+ (self.current().version, self.all_ack(), self.min_stored())
+ }
+
+ pub fn all_nongateway_nodes(&self) -> HashSet<Uuid> {
// TODO: cache this
self.versions
.iter()
@@ -71,11 +79,10 @@ impl LayoutHistory {
version.nodes_of(position, version.replication_factor)
}
- pub fn write_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> {
+ pub fn write_sets_of<'a>(&'a self, position: &'a Hash) -> impl Iterator<Item = Vec<Uuid>> + 'a {
self.versions
.iter()
- .map(|x| x.nodes_of(position, x.replication_factor))
- .collect::<Vec<_>>()
+ .map(move |x| x.nodes_of(position, x.replication_factor))
}
// ------------------ update tracking ---------------
@@ -129,7 +136,9 @@ impl LayoutHistory {
}
pub(crate) fn calculate_global_min(&self, tracker: &UpdateTracker) -> u64 {
- let storage_nodes = self.all_storage_nodes();
+ // TODO: for TableFullReplication, counting gateway nodes might be
+ // necessary? Think about this more.
+ let storage_nodes = self.all_nongateway_nodes();
storage_nodes
.iter()
.map(|x| tracker.0.get(x).copied().unwrap_or(0))
diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs
index b0302b12..7d60bae6 100644
--- a/src/rpc/layout/manager.rs
+++ b/src/rpc/layout/manager.rs
@@ -92,6 +92,7 @@ impl LayoutManager {
persist_cluster_layout,
layout,
change_notify,
+ table_sync_version: Mutex::new(HashMap::new()),
system_endpoint,
rpc_helper,
}))
diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs
index 8133672a..f45a3c35 100644
--- a/src/rpc/layout/version.rs
+++ b/src/rpc/layout/version.rs
@@ -98,15 +98,13 @@ impl LayoutVersion {
}
/// Get the list of partitions and the first hash of a partition key that would fall in it
- pub fn partitions(&self) -> Vec<(Partition, Hash)> {
- (0..(1 << PARTITION_BITS))
- .map(|i| {
- let top = (i as u16) << (16 - PARTITION_BITS);
- let mut location = [0u8; 32];
- location[..2].copy_from_slice(&u16::to_be_bytes(top)[..]);
- (i as u16, Hash::from(location))
- })
- .collect::<Vec<_>>()
+ pub fn partitions(&self) -> impl Iterator<Item = (Partition, Hash)> + '_ {
+ (0..(1 << PARTITION_BITS)).map(|i| {
+ let top = (i as u16) << (16 - PARTITION_BITS);
+ let mut location = [0u8; 32];
+ location[..2].copy_from_slice(&u16::to_be_bytes(top)[..]);
+ (i as u16, Hash::from(location))
+ })
}
/// Return the n servers in which data for this hash should be replicated
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 6ce13d0d..3418600b 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -442,7 +442,7 @@ impl System {
.filter(|(x, _, _)| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
.count();
- let partitions = layout.current().partitions();
+ let partitions = layout.current().partitions().collect::<Vec<_>>();
let partitions_n_up = partitions
.iter()
.map(|(_, h)| {