aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Auvolat <alex@adnab.me>2023-11-14 14:28:16 +0100
committerAlex Auvolat <alex@adnab.me>2023-11-14 14:28:16 +0100
commit3b361d2959e3d577bdae6f8a5ccb0c9d5526b7ea (patch)
treef5448d44c7d5705c1e31912ca6d101c5998523ef /src
parent866196750fca74c1911ade2a90611f3663e60046 (diff)
downloadgarage-3b361d2959e3d577bdae6f8a5ccb0c9d5526b7ea.tar.gz
garage-3b361d2959e3d577bdae6f8a5ccb0c9d5526b7ea.zip
layout: prepare for write sets
Diffstat (limited to 'src')
-rw-r--r--src/block/manager.rs3
-rw-r--r--src/block/resync.rs2
-rw-r--r--src/model/k2v/rpc.rs10
-rw-r--r--src/rpc/layout/history.rs19
-rw-r--r--src/rpc/layout/version.rs21
-rw-r--r--src/rpc/system.rs3
-rw-r--r--src/table/data.rs3
-rw-r--r--src/table/gc.rs2
-rw-r--r--src/table/replication/fullcopy.rs9
-rw-r--r--src/table/replication/parameters.rs8
-rw-r--r--src/table/replication/sharded.rs24
-rw-r--r--src/table/sync.rs2
-rw-r--r--src/table/table.rs6
13 files changed, 64 insertions, 48 deletions
diff --git a/src/block/manager.rs b/src/block/manager.rs
index 72b4ea66..2bb9c23d 100644
--- a/src/block/manager.rs
+++ b/src/block/manager.rs
@@ -354,7 +354,8 @@ impl BlockManager {
/// Send block to nodes that should have it
pub async fn rpc_put_block(&self, hash: Hash, data: Bytes) -> Result<(), Error> {
- let who = self.replication.write_nodes(&hash);
+ // TODO: use quorums among latest write set
+ let who = self.replication.storage_nodes(&hash);
let (header, bytes) = DataBlock::from_buffer(data, self.compression_level)
.await
diff --git a/src/block/resync.rs b/src/block/resync.rs
index fedcd6f5..122d0142 100644
--- a/src/block/resync.rs
+++ b/src/block/resync.rs
@@ -377,7 +377,7 @@ impl BlockResyncManager {
info!("Resync block {:?}: offloading and deleting", hash);
let existing_path = existing_path.unwrap();
- let mut who = manager.replication.write_nodes(hash);
+ let mut who = manager.replication.storage_nodes(hash);
if who.len() < manager.replication.write_quorum() {
return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string()));
}
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index 2f548ad7..aa3323d5 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -127,7 +127,7 @@ impl K2VRpcHandler {
.item_table
.data
.replication
- .write_nodes(&partition.hash());
+ .storage_nodes(&partition.hash());
who.sort();
self.system
@@ -168,7 +168,7 @@ impl K2VRpcHandler {
.item_table
.data
.replication
- .write_nodes(&partition.hash());
+ .storage_nodes(&partition.hash());
who.sort();
call_list.entry(who).or_default().push(InsertedItem {
@@ -223,11 +223,12 @@ impl K2VRpcHandler {
},
sort_key,
};
+ // TODO figure this out with write sets, does it still work????
let nodes = self
.item_table
.data
.replication
- .write_nodes(&poll_key.partition.hash());
+ .read_nodes(&poll_key.partition.hash());
let rpc = self.system.rpc_helper().try_call_many(
&self.endpoint,
@@ -284,11 +285,12 @@ impl K2VRpcHandler {
seen.restrict(&range);
// Prepare PollRange RPC to send to the storage nodes responsible for the parititon
+ // TODO figure this out with write sets, does it still work????
let nodes = self
.item_table
.data
.replication
- .write_nodes(&range.partition.hash());
+ .read_nodes(&range.partition.hash());
let quorum = self.item_table.data.replication.read_quorum();
let msg = K2VRpc::PollRange {
range,
diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs
index 69348873..dce492c9 100644
--- a/src/rpc/layout/history.rs
+++ b/src/rpc/layout/history.rs
@@ -98,13 +98,26 @@ impl LayoutHistory {
.find(|x| x.version == sync_min)
.or(self.versions.last())
.unwrap();
- version.nodes_of(position, version.replication_factor)
+ version
+ .nodes_of(position, version.replication_factor)
+ .collect()
}
- pub fn write_sets_of<'a>(&'a self, position: &'a Hash) -> impl Iterator<Item = Vec<Uuid>> + 'a {
+ pub fn write_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> {
self.versions
.iter()
- .map(move |x| x.nodes_of(position, x.replication_factor))
+ .map(|x| x.nodes_of(position, x.replication_factor).collect())
+ .collect()
+ }
+
+ pub fn storage_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
+ let mut ret = vec![];
+ for version in self.versions.iter() {
+ ret.extend(version.nodes_of(position, version.replication_factor));
+ }
+ ret.sort();
+ ret.dedup();
+ ret
}
// ------------------ update tracking ---------------
diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs
index 2cbdcee2..912ee538 100644
--- a/src/rpc/layout/version.rs
+++ b/src/rpc/layout/version.rs
@@ -107,25 +107,24 @@ impl LayoutVersion {
}
/// Return the n servers in which data for this hash should be replicated
- pub fn nodes_of(&self, position: &Hash, n: usize) -> Vec<Uuid> {
+ pub fn nodes_of(&self, position: &Hash, n: usize) -> impl Iterator<Item = Uuid> + '_ {
assert_eq!(n, self.replication_factor);
let data = &self.ring_assignment_data;
- if data.len() != self.replication_factor * (1 << PARTITION_BITS) {
+ let partition_nodes = if data.len() == self.replication_factor * (1 << PARTITION_BITS) {
+ let partition_idx = self.partition_of(position) as usize;
+ let partition_start = partition_idx * self.replication_factor;
+ let partition_end = (partition_idx + 1) * self.replication_factor;
+ &data[partition_start..partition_end]
+ } else {
warn!("Ring not yet ready, read/writes will be lost!");
- return vec![];
- }
-
- let partition_idx = self.partition_of(position) as usize;
- let partition_start = partition_idx * self.replication_factor;
- let partition_end = (partition_idx + 1) * self.replication_factor;
- let partition_nodes = &data[partition_start..partition_end];
+ &[]
+ };
partition_nodes
.iter()
- .map(|i| self.node_id_vec[*i as usize])
- .collect::<Vec<_>>()
+ .map(move |i| self.node_id_vec[*i as usize])
}
// ===================== internal information extractors ======================
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 86c02e86..31d78bf6 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -449,8 +449,7 @@ impl System {
.iter()
.map(|(_, h)| {
let pn = layout.current().nodes_of(h, replication_factor);
- pn.iter()
- .filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
+ pn.filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
.count()
})
.collect::<Vec<usize>>();
diff --git a/src/table/data.rs b/src/table/data.rs
index bbfdf58b..7f6b7847 100644
--- a/src/table/data.rs
+++ b/src/table/data.rs
@@ -254,7 +254,8 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
// of the GC algorithm, as in all cases GC is suspended if
// any node of the partition is unavailable.
let pk_hash = Hash::try_from(&tree_key[..32]).unwrap();
- let nodes = self.replication.write_nodes(&pk_hash);
+ // TODO: this probably breaks when the layout changes
+ let nodes = self.replication.storage_nodes(&pk_hash);
if nodes.first() == Some(&self.system.id) {
GcTodoEntry::new(tree_key, new_bytes_hash).save(&self.gc_todo)?;
}
diff --git a/src/table/gc.rs b/src/table/gc.rs
index 2135a358..002cfbf4 100644
--- a/src/table/gc.rs
+++ b/src/table/gc.rs
@@ -152,7 +152,7 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
let mut partitions = HashMap::new();
for entry in entries {
let pkh = Hash::try_from(&entry.key[..32]).unwrap();
- let mut nodes = self.data.replication.write_nodes(&pkh);
+ let mut nodes = self.data.replication.storage_nodes(&pkh);
nodes.retain(|x| *x != self.system.id);
nodes.sort();
diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs
index beaacc2b..cb5471af 100644
--- a/src/table/replication/fullcopy.rs
+++ b/src/table/replication/fullcopy.rs
@@ -27,6 +27,11 @@ pub struct TableFullReplication {
}
impl TableReplication for TableFullReplication {
+ fn storage_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
+ let layout = self.system.cluster_layout();
+ layout.current().all_nodes().to_vec()
+ }
+
fn read_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
vec![self.system.id]
}
@@ -34,8 +39,8 @@ impl TableReplication for TableFullReplication {
1
}
- fn write_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
- self.system.cluster_layout().current().all_nodes().to_vec()
+ fn write_sets(&self, hash: &Hash) -> Vec<Vec<Uuid>> {
+ vec![self.storage_nodes(hash)]
}
fn write_quorum(&self) -> usize {
let nmembers = self.system.cluster_layout().current().all_nodes().len();
diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs
index 2a7d3585..2f842409 100644
--- a/src/table/replication/parameters.rs
+++ b/src/table/replication/parameters.rs
@@ -6,21 +6,23 @@ pub trait TableReplication: Send + Sync + 'static {
// See examples in table_sharded.rs and table_fullcopy.rs
// To understand various replication methods
+ /// The entire list of all nodes that store a partition
+ fn storage_nodes(&self, hash: &Hash) -> Vec<Uuid>;
+
/// Which nodes to send read requests to
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid>;
/// Responses needed to consider a read succesfull
fn read_quorum(&self) -> usize;
/// Which nodes to send writes to
- fn write_nodes(&self, hash: &Hash) -> Vec<Uuid>;
- /// Responses needed to consider a write succesfull
+ fn write_sets(&self, hash: &Hash) -> Vec<Vec<Uuid>>;
+ /// Responses needed to consider a write succesfull in each set
fn write_quorum(&self) -> usize;
fn max_write_errors(&self) -> usize;
// Accessing partitions, for Merkle tree & sync
/// Get partition for data with given hash
fn partition_of(&self, hash: &Hash) -> Partition;
-
/// List of partitions and nodes to sync with in current layout
fn sync_partitions(&self) -> SyncPartitions;
}
diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs
index f02b1d66..1320a189 100644
--- a/src/table/replication/sharded.rs
+++ b/src/table/replication/sharded.rs
@@ -25,21 +25,19 @@ pub struct TableShardedReplication {
}
impl TableReplication for TableShardedReplication {
+ fn storage_nodes(&self, hash: &Hash) -> Vec<Uuid> {
+ self.system.cluster_layout().storage_nodes_of(hash)
+ }
+
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> {
- self.system
- .cluster_layout()
- .current()
- .nodes_of(hash, self.replication_factor)
+ self.system.cluster_layout().read_nodes_of(hash)
}
fn read_quorum(&self) -> usize {
self.read_quorum
}
- fn write_nodes(&self, hash: &Hash) -> Vec<Uuid> {
- self.system
- .cluster_layout()
- .current()
- .nodes_of(hash, self.replication_factor)
+ fn write_sets(&self, hash: &Hash) -> Vec<Vec<Uuid>> {
+ self.system.cluster_layout().write_sets_of(hash)
}
fn write_quorum(&self) -> usize {
self.write_quorum
@@ -60,13 +58,7 @@ impl TableReplication for TableShardedReplication {
.current()
.partitions()
.map(|(partition, first_hash)| {
- let mut storage_nodes = layout
- .write_sets_of(&first_hash)
- .map(|x| x.into_iter())
- .flatten()
- .collect::<Vec<_>>();
- storage_nodes.sort();
- storage_nodes.dedup();
+ let storage_nodes = layout.storage_nodes_of(&first_hash);
SyncPartition {
partition,
first_hash,
diff --git a/src/table/sync.rs b/src/table/sync.rs
index 8c21db8b..b67cdd79 100644
--- a/src/table/sync.rs
+++ b/src/table/sync.rs
@@ -176,7 +176,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
let nodes = self
.data
.replication
- .write_nodes(begin)
+ .storage_nodes(begin)
.into_iter()
.collect::<Vec<_>>();
if nodes.contains(&self.system.id) {
diff --git a/src/table/table.rs b/src/table/table.rs
index 997fd7dc..bf08d5a0 100644
--- a/src/table/table.rs
+++ b/src/table/table.rs
@@ -119,7 +119,8 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
async fn insert_internal(&self, e: &F::E) -> Result<(), Error> {
let hash = e.partition_key().hash();
- let who = self.data.replication.write_nodes(&hash);
+ // TODO: use write sets
+ let who = self.data.replication.storage_nodes(&hash);
let e_enc = Arc::new(ByteBuf::from(e.encode()?));
let rpc = TableRpc::<F>::Update(vec![e_enc]);
@@ -171,7 +172,8 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
for entry in entries.into_iter() {
let entry = entry.borrow();
let hash = entry.partition_key().hash();
- let who = self.data.replication.write_nodes(&hash);
+ // TODO: use write sets
+ let who = self.data.replication.storage_nodes(&hash);
let e_enc = Arc::new(ByteBuf::from(entry.encode()?));
for node in who {
call_list.entry(node).or_default().push(e_enc.clone());