aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
Diffstat (limited to 'src/table')
-rw-r--r--src/table/data.rs3
-rw-r--r--src/table/gc.rs2
-rw-r--r--src/table/replication/fullcopy.rs9
-rw-r--r--src/table/replication/parameters.rs8
-rw-r--r--src/table/replication/sharded.rs24
-rw-r--r--src/table/sync.rs2
-rw-r--r--src/table/table.rs6
7 files changed, 28 insertions, 26 deletions
diff --git a/src/table/data.rs b/src/table/data.rs
index bbfdf58b..7f6b7847 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -254,7 +254,8 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
// of the GC algorithm, as in all cases GC is suspended if
// any node of the partition is unavailable.
let pk_hash = Hash::try_from(&tree_key[..32]).unwrap();
- let nodes = self.replication.write_nodes(&pk_hash);
+ // TODO: this probably breaks when the layout changes
+ let nodes = self.replication.storage_nodes(&pk_hash);
if nodes.first() == Some(&self.system.id) {
GcTodoEntry::new(tree_key, new_bytes_hash).save(&self.gc_todo)?;
}
diff --git a/src/table/gc.rs b/src/table/gc.rs
index 2135a358..002cfbf4 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -152,7 +152,7 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
let mut partitions = HashMap::new();
for entry in entries {
let pkh = Hash::try_from(&entry.key[..32]).unwrap();
- let mut nodes = self.data.replication.write_nodes(&pkh);
+ let mut nodes = self.data.replication.storage_nodes(&pkh);
nodes.retain(|x| *x != self.system.id);
nodes.sort();
diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs
index beaacc2b..cb5471af 100644
--- a/src/table/replication/fullcopy.rs
+++ b/src/table/replication/fullcopy.rs
@@ -27,6 +27,11 @@ pub struct TableFullReplication {
}
impl TableReplication for TableFullReplication {
+ fn storage_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
+ let layout = self.system.cluster_layout();
+ layout.current().all_nodes().to_vec()
+ }
+
fn read_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
vec![self.system.id]
}
@@ -34,8 +39,8 @@ impl TableReplication for TableFullReplication {
1
}
- fn write_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
- self.system.cluster_layout().current().all_nodes().to_vec()
+ fn write_sets(&self, hash: &Hash) -> Vec<Vec<Uuid>> {
+ vec![self.storage_nodes(hash)]
}
fn write_quorum(&self) -> usize {
let nmembers = self.system.cluster_layout().current().all_nodes().len();
diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs
index 2a7d3585..2f842409 100644
--- a/src/table/replication/parameters.rs
+++ b/src/table/replication/parameters.rs
@@ -6,21 +6,23 @@ pub trait TableReplication: Send + Sync + 'static {
// See examples in table_sharded.rs and table_fullcopy.rs
// To understand various replication methods
+ /// The entire list of all nodes that store a partition
+ fn storage_nodes(&self, hash: &Hash) -> Vec<Uuid>;
+
/// Which nodes to send read requests to
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid>;
/// Responses needed to consider a read succesfull
fn read_quorum(&self) -> usize;
/// Which nodes to send writes to
- fn write_nodes(&self, hash: &Hash) -> Vec<Uuid>;
- /// Responses needed to consider a write succesfull
+ fn write_sets(&self, hash: &Hash) -> Vec<Vec<Uuid>>;
+ /// Responses needed to consider a write succesfull in each set
fn write_quorum(&self) -> usize;
fn max_write_errors(&self) -> usize;
// Accessing partitions, for Merkle tree & sync
/// Get partition for data with given hash
fn partition_of(&self, hash: &Hash) -> Partition;
-
/// List of partitions and nodes to sync with in current layout
fn sync_partitions(&self) -> SyncPartitions;
}
diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs
index f02b1d66..1320a189 100644
--- a/src/table/replication/sharded.rs
+++ b/src/table/replication/sharded.rs
@@ -25,21 +25,19 @@ pub struct TableShardedReplication {
}
impl TableReplication for TableShardedReplication {
+ fn storage_nodes(&self, hash: &Hash) -> Vec<Uuid> {
+ self.system.cluster_layout().storage_nodes_of(hash)
+ }
+
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> {
- self.system
- .cluster_layout()
- .current()
- .nodes_of(hash, self.replication_factor)
+ self.system.cluster_layout().read_nodes_of(hash)
}
fn read_quorum(&self) -> usize {
self.read_quorum
}
- fn write_nodes(&self, hash: &Hash) -> Vec<Uuid> {
- self.system
- .cluster_layout()
- .current()
- .nodes_of(hash, self.replication_factor)
+ fn write_sets(&self, hash: &Hash) -> Vec<Vec<Uuid>> {
+ self.system.cluster_layout().write_sets_of(hash)
}
fn write_quorum(&self) -> usize {
self.write_quorum
@@ -60,13 +58,7 @@ impl TableReplication for TableShardedReplication {
.current()
.partitions()
.map(|(partition, first_hash)| {
- let mut storage_nodes = layout
- .write_sets_of(&first_hash)
- .map(|x| x.into_iter())
- .flatten()
- .collect::<Vec<_>>();
- storage_nodes.sort();
- storage_nodes.dedup();
+ let storage_nodes = layout.storage_nodes_of(&first_hash);
SyncPartition {
partition,
first_hash,
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 8c21db8b..b67cdd79 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -176,7 +176,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
let nodes = self
.data
.replication
- .write_nodes(begin)
+ .storage_nodes(begin)
.into_iter()
.collect::<Vec<_>>();
if nodes.contains(&self.system.id) {
diff --git a/src/table/table.rs b/src/table/table.rs
index 997fd7dc..bf08d5a0 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -119,7 +119,8 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
async fn insert_internal(&self, e: &F::E) -> Result<(), Error> {
let hash = e.partition_key().hash();
- let who = self.data.replication.write_nodes(&hash);
+ // TODO: use write sets
+ let who = self.data.replication.storage_nodes(&hash);
let e_enc = Arc::new(ByteBuf::from(e.encode()?));
let rpc = TableRpc::<F>::Update(vec![e_enc]);
@@ -171,7 +172,8 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
for entry in entries.into_iter() {
let entry = entry.borrow();
let hash = entry.partition_key().hash();
- let who = self.data.replication.write_nodes(&hash);
+ // TODO: use write sets
+ let who = self.data.replication.storage_nodes(&hash);
let e_enc = Arc::new(ByteBuf::from(entry.encode()?));
for node in who {
call_list.entry(node).or_default().push(e_enc.clone());