aboutsummaryrefslogtreecommitdiff
path: root/src/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc')
-rw-r--r--src/rpc/layout/history.rs19
-rw-r--r--src/rpc/layout/version.rs21
-rw-r--r--src/rpc/system.rs3
3 files changed, 27 insertions, 16 deletions
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs
index 69348873..dce492c9 100644
--- a/src/rpc/layout/history.rs
+++ b/src/rpc/layout/history.rs
@@ -98,13 +98,26 @@ impl LayoutHistory {
.find(|x| x.version == sync_min)
.or(self.versions.last())
.unwrap();
- version.nodes_of(position, version.replication_factor)
+ version
+ .nodes_of(position, version.replication_factor)
+ .collect()
}
- pub fn write_sets_of<'a>(&'a self, position: &'a Hash) -> impl Iterator<Item = Vec<Uuid>> + 'a {
+ pub fn write_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> {
self.versions
.iter()
- .map(move |x| x.nodes_of(position, x.replication_factor))
+ .map(|x| x.nodes_of(position, x.replication_factor).collect())
+ .collect()
+ }
+
+ pub fn storage_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
+ let mut ret = vec![];
+ for version in self.versions.iter() {
+ ret.extend(version.nodes_of(position, version.replication_factor));
+ }
+ ret.sort();
+ ret.dedup();
+ ret
}
// ------------------ update tracking ---------------
diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs
index 2cbdcee2..912ee538 100644
--- a/src/rpc/layout/version.rs
+++ b/src/rpc/layout/version.rs
@@ -107,25 +107,24 @@ impl LayoutVersion {
}
/// Return the n servers in which data for this hash should be replicated
- pub fn nodes_of(&self, position: &Hash, n: usize) -> Vec<Uuid> {
+ pub fn nodes_of(&self, position: &Hash, n: usize) -> impl Iterator<Item = Uuid> + '_ {
assert_eq!(n, self.replication_factor);
let data = &self.ring_assignment_data;
- if data.len() != self.replication_factor * (1 << PARTITION_BITS) {
+ let partition_nodes = if data.len() == self.replication_factor * (1 << PARTITION_BITS) {
+ let partition_idx = self.partition_of(position) as usize;
+ let partition_start = partition_idx * self.replication_factor;
+ let partition_end = (partition_idx + 1) * self.replication_factor;
+ &data[partition_start..partition_end]
+ } else {
warn!("Ring not yet ready, read/writes will be lost!");
- return vec![];
- }
-
- let partition_idx = self.partition_of(position) as usize;
- let partition_start = partition_idx * self.replication_factor;
- let partition_end = (partition_idx + 1) * self.replication_factor;
- let partition_nodes = &data[partition_start..partition_end];
+ &[]
+ };
partition_nodes
.iter()
- .map(|i| self.node_id_vec[*i as usize])
- .collect::<Vec<_>>()
+ .map(move |i| self.node_id_vec[*i as usize])
}
// ===================== internal information extractors ======================
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 86c02e86..31d78bf6 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -449,8 +449,7 @@ impl System {
.iter()
.map(|(_, h)| {
let pn = layout.current().nodes_of(h, replication_factor);
- pn.iter()
- .filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
+ pn.filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
.count()
})
.collect::<Vec<usize>>();