aboutsummaryrefslogtreecommitdiff
path: root/src/table
diff options
context:
space:
mode:
Diffstat (limited to 'src/table')
-rw-r--r--src/table/gc.rs2
-rw-r--r--src/table/replication/fullcopy.rs33
-rw-r--r--src/table/replication/parameters.rs13
-rw-r--r--src/table/replication/sharded.rs20
-rw-r--r--src/table/sync.rs11
-rw-r--r--src/table/table.rs10
6 files changed, 41 insertions, 48 deletions
diff --git a/src/table/gc.rs b/src/table/gc.rs
index d37fdf35..061c5045 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -130,7 +130,7 @@ where
let mut partitions = HashMap::new();
for (k, vhash, v) in entries {
let pkh = Hash::try_from(&k[..32]).unwrap();
- let mut nodes = self.aux.replication.write_nodes(&pkh, &self.aux.system);
+ let mut nodes = self.aux.replication.write_nodes(&pkh);
nodes.retain(|x| *x != self.aux.system.id);
nodes.sort();
diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs
index a5faece9..aea8c1f3 100644
--- a/src/table/replication/fullcopy.rs
+++ b/src/table/replication/fullcopy.rs
@@ -8,21 +8,10 @@ use crate::replication::*;
#[derive(Clone)]
pub struct TableFullReplication {
+ pub system: Arc<System>,
pub max_faults: usize,
}
-#[derive(Clone)]
-struct Neighbors {
- ring: Arc<Ring>,
- neighbors: Vec<UUID>,
-}
-
-impl TableFullReplication {
- pub fn new(max_faults: usize) -> Self {
- TableFullReplication { max_faults }
- }
-}
-
impl TableReplication for TableFullReplication {
// Full replication schema: all nodes store everything
// Writes are disseminated in an epidemic manner in the network
@@ -30,18 +19,23 @@ impl TableReplication for TableFullReplication {
// Advantage: do all reads locally, extremely fast
// Inconvenient: only suitable to reasonably small tables
- fn read_nodes(&self, _hash: &Hash, system: &System) -> Vec<UUID> {
- vec![system.id]
+ fn partition_of(&self, _hash: &Hash) -> u16 {
+ 0u16
+ }
+
+ fn read_nodes(&self, _hash: &Hash) -> Vec<UUID> {
+ vec![self.system.id]
}
fn read_quorum(&self) -> usize {
1
}
- fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID> {
- self.replication_nodes(hash, system.ring.borrow().as_ref())
+ fn write_nodes(&self, _hash: &Hash) -> Vec<UUID> {
+ let ring = self.system.ring.borrow();
+ ring.config.members.keys().cloned().collect::<Vec<_>>()
}
- fn write_quorum(&self, system: &System) -> usize {
- let nmembers = system.ring.borrow().config.members.len();
+ fn write_quorum(&self) -> usize {
+ let nmembers = self.system.ring.borrow().config.members.len();
if nmembers > self.max_faults {
nmembers - self.max_faults
} else {
@@ -52,9 +46,6 @@ impl TableReplication for TableFullReplication {
self.max_faults
}
- fn replication_nodes(&self, _hash: &Hash, ring: &Ring) -> Vec<UUID> {
- ring.config.members.keys().cloned().collect::<Vec<_>>()
- }
fn split_points(&self, _ring: &Ring) -> Vec<Hash> {
let mut ret = vec![];
ret.push([0u8; 32].into());
diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs
index 4607b050..ace82bd9 100644
--- a/src/table/replication/parameters.rs
+++ b/src/table/replication/parameters.rs
@@ -1,4 +1,3 @@
-use garage_rpc::membership::System;
use garage_rpc::ring::Ring;
use garage_util::data::*;
@@ -7,16 +6,18 @@ pub trait TableReplication: Send + Sync {
// See examples in table_sharded.rs and table_fullcopy.rs
// To understand various replication methods
+ // Partition number of data item (for Merkle tree)
+ fn partition_of(&self, hash: &Hash) -> u16;
+
// Which nodes to send reads from
- fn read_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>;
+ fn read_nodes(&self, hash: &Hash) -> Vec<UUID>;
fn read_quorum(&self) -> usize;
// Which nodes to send writes to
- fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>;
- fn write_quorum(&self, system: &System) -> usize;
+ fn write_nodes(&self, hash: &Hash) -> Vec<UUID>;
+ fn write_quorum(&self) -> usize;
fn max_write_errors(&self) -> usize;
- // Which are the nodes that do actually replicate the data
- fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID>;
+ // Get partition boundaries
fn split_points(&self, ring: &Ring) -> Vec<Hash>;
}
diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs
index 886c7c08..966be31a 100644
--- a/src/table/replication/sharded.rs
+++ b/src/table/replication/sharded.rs
@@ -1,3 +1,5 @@
+use std::sync::Arc;
+
use garage_rpc::membership::System;
use garage_rpc::ring::Ring;
use garage_util::data::*;
@@ -6,6 +8,7 @@ use crate::replication::*;
#[derive(Clone)]
pub struct TableShardedReplication {
+ pub system: Arc<System>,
pub replication_factor: usize,
pub read_quorum: usize,
pub write_quorum: usize,
@@ -19,28 +22,29 @@ impl TableReplication for TableShardedReplication {
// - reads are done on all of the nodes that replicate the data
// - writes as well
- fn read_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID> {
- let ring = system.ring.borrow().clone();
+ fn partition_of(&self, hash: &Hash) -> u16 {
+ self.system.ring.borrow().partition_of(hash)
+ }
+
+ fn read_nodes(&self, hash: &Hash) -> Vec<UUID> {
+ let ring = self.system.ring.borrow().clone();
ring.walk_ring(&hash, self.replication_factor)
}
fn read_quorum(&self) -> usize {
self.read_quorum
}
- fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID> {
- let ring = system.ring.borrow().clone();
+ fn write_nodes(&self, hash: &Hash) -> Vec<UUID> {
+ let ring = self.system.ring.borrow();
ring.walk_ring(&hash, self.replication_factor)
}
- fn write_quorum(&self, _system: &System) -> usize {
+ fn write_quorum(&self) -> usize {
self.write_quorum
}
fn max_write_errors(&self) -> usize {
self.replication_factor - self.write_quorum
}
- fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID> {
- ring.walk_ring(&hash, self.replication_factor)
- }
fn split_points(&self, ring: &Ring) -> Vec<Hash> {
let mut ret = vec![];
diff --git a/src/table/sync.rs b/src/table/sync.rs
index f8fef53c..ac0305e2 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -218,10 +218,7 @@ where
let nodes = self
.aux
.replication
- .write_nodes(
- &hash_of_merkle_partition(partition.range.begin),
- &self.aux.system,
- )
+ .write_nodes(&hash_of_merkle_partition(partition.range.begin))
.into_iter()
.filter(|node| *node != my_id)
.collect::<Vec<_>>();
@@ -293,7 +290,7 @@ where
let nodes = self
.aux
.replication
- .write_nodes(&begin, &self.aux.system)
+ .write_nodes(&begin)
.into_iter()
.collect::<Vec<_>>();
if nodes.contains(&self.aux.system.id) {
@@ -303,7 +300,7 @@ where
);
break;
}
- if nodes.len() < self.aux.replication.write_quorum(&self.aux.system) {
+ if nodes.len() < self.aux.replication.write_quorum() {
return Err(Error::Message(format!(
"Not offloading as we don't have a quorum of nodes to write to."
)));
@@ -616,7 +613,7 @@ impl SyncTodo {
let begin_hash = hash_of_merkle_partition(begin);
let end_hash = hash_of_merkle_partition_opt(end);
- let nodes = aux.replication.replication_nodes(&begin_hash, &ring);
+ let nodes = aux.replication.write_nodes(&begin_hash);
let retain = nodes.contains(&my_id);
if !retain {
diff --git a/src/table/table.rs b/src/table/table.rs
index 2d3c5fe9..2ce5868f 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -91,7 +91,7 @@ where
pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
let hash = e.partition_key().hash();
- let who = self.aux.replication.write_nodes(&hash, &self.aux.system);
+ let who = self.aux.replication.write_nodes(&hash);
//eprintln!("insert who: {:?}", who);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?));
@@ -101,7 +101,7 @@ where
.try_call_many(
&who[..],
rpc,
- RequestStrategy::with_quorum(self.aux.replication.write_quorum(&self.aux.system))
+ RequestStrategy::with_quorum(self.aux.replication.write_quorum())
.with_timeout(TABLE_RPC_TIMEOUT),
)
.await?;
@@ -113,7 +113,7 @@ where
for entry in entries.iter() {
let hash = entry.partition_key().hash();
- let who = self.aux.replication.write_nodes(&hash, &self.aux.system);
+ let who = self.aux.replication.write_nodes(&hash);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?));
for node in who {
if !call_list.contains_key(&node) {
@@ -150,7 +150,7 @@ where
sort_key: &F::S,
) -> Result<Option<F::E>, Error> {
let hash = partition_key.hash();
- let who = self.aux.replication.read_nodes(&hash, &self.aux.system);
+ let who = self.aux.replication.read_nodes(&hash);
//eprintln!("get who: {:?}", who);
let rpc = TableRPC::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
@@ -207,7 +207,7 @@ where
limit: usize,
) -> Result<Vec<F::E>, Error> {
let hash = partition_key.hash();
- let who = self.aux.replication.read_nodes(&hash, &self.aux.system);
+ let who = self.aux.replication.read_nodes(&hash);
let rpc = TableRPC::<F>::ReadRange(partition_key.clone(), begin_sort_key, filter, limit);