aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
Diffstat (limited to 'src/table')
-rw-r--r--src/table/replication/fullcopy.rs5
-rw-r--r--src/table/replication/sharded.rs16
-rw-r--r--src/table/sync.rs4
3 files changed, 14 insertions, 11 deletions
diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs
index 34807e3d..a5c83d0f 100644
--- a/src/table/replication/fullcopy.rs
+++ b/src/table/replication/fullcopy.rs
@@ -27,11 +27,10 @@ impl TableReplication for TableFullReplication {
}
fn write_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
- let layout = self.system.cluster_layout();
- layout.node_ids().to_vec()
+ self.system.cluster_layout().current().node_ids().to_vec()
}
fn write_quorum(&self) -> usize {
- let nmembers = self.system.cluster_layout().node_ids().len();
+ let nmembers = self.system.cluster_layout().current().node_ids().len();
if nmembers > self.max_faults {
nmembers - self.max_faults
} else {
diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs
index 60c95cb4..793d87fd 100644
--- a/src/table/replication/sharded.rs
+++ b/src/table/replication/sharded.rs
@@ -26,16 +26,20 @@ pub struct TableShardedReplication {
impl TableReplication for TableShardedReplication {
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> {
- let layout = self.system.cluster_layout();
- layout.nodes_of(hash, self.replication_factor)
+ self.system
+ .cluster_layout()
+ .current()
+ .nodes_of(hash, self.replication_factor)
}
fn read_quorum(&self) -> usize {
self.read_quorum
}
fn write_nodes(&self, hash: &Hash) -> Vec<Uuid> {
- let layout = self.system.cluster_layout();
- layout.nodes_of(hash, self.replication_factor)
+ self.system
+ .cluster_layout()
+ .current()
+ .nodes_of(hash, self.replication_factor)
}
fn write_quorum(&self) -> usize {
self.write_quorum
@@ -45,9 +49,9 @@ impl TableReplication for TableShardedReplication {
}
fn partition_of(&self, hash: &Hash) -> Partition {
- self.system.cluster_layout().partition_of(hash)
+ self.system.cluster_layout().current().partition_of(hash)
}
fn partitions(&self) -> Vec<(Partition, Hash)> {
- self.system.cluster_layout().partitions()
+ self.system.cluster_layout().current().partitions()
}
}
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 65eff7cd..620d83b9 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -492,8 +492,8 @@ impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSync
struct SyncWorker<F: TableSchema, R: TableReplication> {
syncer: Arc<TableSyncer<F, R>>,
- layout_watch: watch::Receiver<Arc<ClusterLayout>>,
- layout: Arc<ClusterLayout>,
+ layout_watch: watch::Receiver<Arc<LayoutHistory>>,
+ layout: Arc<LayoutHistory>,
add_full_sync_rx: mpsc::UnboundedReceiver<()>,
todo: Vec<TodoPartition>,
next_full_sync: Instant,