diff options
Diffstat (limited to 'src/rpc')
-rw-r--r-- | src/rpc/layout/history.rs | 19 | ||||
-rw-r--r-- | src/rpc/layout/version.rs | 21 | ||||
-rw-r--r-- | src/rpc/system.rs | 3 |
3 files changed, 27 insertions, 16 deletions
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 69348873..dce492c9 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -98,13 +98,26 @@ impl LayoutHistory { .find(|x| x.version == sync_min) .or(self.versions.last()) .unwrap(); - version.nodes_of(position, version.replication_factor) + version + .nodes_of(position, version.replication_factor) + .collect() } - pub fn write_sets_of<'a>(&'a self, position: &'a Hash) -> impl Iterator<Item = Vec<Uuid>> + 'a { + pub fn write_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> { self.versions .iter() - .map(move |x| x.nodes_of(position, x.replication_factor)) + .map(|x| x.nodes_of(position, x.replication_factor).collect()) + .collect() + } + + pub fn storage_nodes_of(&self, position: &Hash) -> Vec<Uuid> { + let mut ret = vec![]; + for version in self.versions.iter() { + ret.extend(version.nodes_of(position, version.replication_factor)); + } + ret.sort(); + ret.dedup(); + ret } // ------------------ update tracking --------------- diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs index 2cbdcee2..912ee538 100644 --- a/src/rpc/layout/version.rs +++ b/src/rpc/layout/version.rs @@ -107,25 +107,24 @@ impl LayoutVersion { } /// Return the n servers in which data for this hash should be replicated - pub fn nodes_of(&self, position: &Hash, n: usize) -> Vec<Uuid> { + pub fn nodes_of(&self, position: &Hash, n: usize) -> impl Iterator<Item = Uuid> + '_ { assert_eq!(n, self.replication_factor); let data = &self.ring_assignment_data; - if data.len() != self.replication_factor * (1 << PARTITION_BITS) { + let partition_nodes = if data.len() == self.replication_factor * (1 << PARTITION_BITS) { + let partition_idx = self.partition_of(position) as usize; + let partition_start = partition_idx * self.replication_factor; + let partition_end = (partition_idx + 1) * self.replication_factor; + &data[partition_start..partition_end] + } else { warn!("Ring not yet ready, read/writes will be lost!"); - return vec![]; - } - - let partition_idx = self.partition_of(position) as usize; - let partition_start = partition_idx * self.replication_factor; - let partition_end = (partition_idx + 1) * self.replication_factor; - let partition_nodes = &data[partition_start..partition_end]; + &[] + }; partition_nodes .iter() - .map(|i| self.node_id_vec[*i as usize]) - .collect::<Vec<_>>() + .map(move |i| self.node_id_vec[*i as usize]) } // ===================== internal information extractors ====================== diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 86c02e86..31d78bf6 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -449,8 +449,7 @@ impl System { .iter() .map(|(_, h)| { let pn = layout.current().nodes_of(h, replication_factor); - pn.iter() - .filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false)) + pn.filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false)) .count() }) .collect::<Vec<usize>>(); |