From 12d1dbfc6b884be488e2d79c0b9e3c47490f5442 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 8 Nov 2023 15:41:24 +0100 Subject: remove Ring and use ClusterLayout everywhere --- src/model/helper/bucket.rs | 6 +++--- src/model/index_counter.rs | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) (limited to 'src/model') diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs index 576d03f3..d43d7e96 100644 --- a/src/model/helper/bucket.rs +++ b/src/model/helper/bucket.rs @@ -450,10 +450,10 @@ impl<'a> BucketHelper<'a> { #[cfg(feature = "k2v")] { - use garage_rpc::ring::Ring; + use garage_rpc::layout::ClusterLayout; use std::sync::Arc; - let ring: Arc = self.0.system.ring.borrow().clone(); + let layout: Arc = self.0.system.layout_watch.borrow().clone(); let k2vindexes = self .0 .k2v @@ -462,7 +462,7 @@ impl<'a> BucketHelper<'a> { .get_range( &bucket_id, None, - Some((DeletedFilter::NotDeleted, ring.layout.node_id_vec.clone())), + Some((DeletedFilter::NotDeleted, layout.node_id_vec.clone())), 10, EnumerationOrder::Forward, ) diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index a46c165f..d514cb06 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; use garage_db as db; -use garage_rpc::ring::Ring; +use garage_rpc::layout::ClusterLayout; use garage_rpc::system::System; use garage_util::background::BackgroundRunner; use garage_util::data::*; @@ -83,8 +83,8 @@ impl Entry for CounterEntry { } impl CounterEntry { - pub fn filtered_values(&self, ring: &Ring) -> HashMap { - let nodes = &ring.layout.node_id_vec[..]; + pub fn filtered_values(&self, layout: &ClusterLayout) -> HashMap { + let nodes = &layout.node_id_vec[..]; self.filtered_values_with_nodes(nodes) } -- cgit v1.2.3 From 4a9c94514f49aa4e9880a8e0f5cf5a52d11ae993 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 8 Nov 2023 16:41:00 +0100 Subject: avoid using layout_watch in System directly --- src/model/helper/bucket.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/model') diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs index d43d7e96..8cd5b27b 100644 --- a/src/model/helper/bucket.rs +++ b/src/model/helper/bucket.rs @@ -453,7 +453,7 @@ impl<'a> BucketHelper<'a> { use garage_rpc::layout::ClusterLayout; use std::sync::Arc; - let layout: Arc = self.0.system.layout_watch.borrow().clone(); + let layout: Arc = self.0.system.cluster_layout().clone(); let k2vindexes = self .0 .k2v -- cgit v1.2.3 From 8dccee3ccfe7793c42203f28c1e91c6f989b6899 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 8 Nov 2023 19:28:36 +0100 Subject: cluster layout: adapt all uses of ClusterLayout to LayoutHistory --- src/model/helper/bucket.rs | 9 ++++++--- src/model/index_counter.rs | 6 +++--- 2 files changed, 9 insertions(+), 6 deletions(-) (limited to 'src/model') diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs index 8cd5b27b..18904c8d 100644 --- a/src/model/helper/bucket.rs +++ b/src/model/helper/bucket.rs @@ -450,10 +450,10 @@ impl<'a> BucketHelper<'a> { #[cfg(feature = "k2v")] { - use garage_rpc::layout::ClusterLayout; + use garage_rpc::layout::LayoutHistory; use std::sync::Arc; - let layout: Arc = self.0.system.cluster_layout().clone(); + let layout: Arc = self.0.system.cluster_layout().clone(); let k2vindexes = self .0 .k2v @@ -462,7 +462,10 @@ impl<'a> BucketHelper<'a> { .get_range( &bucket_id, None, - Some((DeletedFilter::NotDeleted, layout.node_id_vec.clone())), + Some(( + DeletedFilter::NotDeleted, + layout.current().node_id_vec.clone(), + )), 10, EnumerationOrder::Forward, ) diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index d514cb06..9637cc4c 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; use garage_db as db; -use garage_rpc::layout::ClusterLayout; +use garage_rpc::layout::LayoutHistory; use garage_rpc::system::System; use garage_util::background::BackgroundRunner; use garage_util::data::*; @@ -83,8 +83,8 @@ impl Entry for CounterEntry { } impl CounterEntry { - pub fn filtered_values(&self, layout: &ClusterLayout) -> HashMap { - let nodes = &layout.node_id_vec[..]; + pub fn filtered_values(&self, layout: &LayoutHistory) -> HashMap { + let nodes = &layout.current().node_id_vec[..]; self.filtered_values_with_nodes(nodes) } -- cgit v1.2.3 From 8a2b1dd422fb57abe611d8c1cf3cb0b55f487189 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 9 Nov 2023 12:55:36 +0100 Subject: wip: split out layout management from System into separate LayoutManager --- src/model/k2v/rpc.rs | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) (limited to 'src/model') diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index 37e142f6..2f548ad7 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -131,7 +131,7 @@ impl K2VRpcHandler { who.sort(); self.system - .rpc + .rpc_helper() .try_call_many( &self.endpoint, &who[..], @@ -187,7 +187,7 @@ impl K2VRpcHandler { let call_futures = call_list.into_iter().map(|(nodes, items)| async move { let resp = self .system - .rpc + .rpc_helper() .try_call_many( &self.endpoint, &nodes[..], @@ -229,7 +229,7 @@ impl K2VRpcHandler { .replication .write_nodes(&poll_key.partition.hash()); - let rpc = self.system.rpc.try_call_many( + let rpc = self.system.rpc_helper().try_call_many( &self.endpoint, &nodes[..], K2VRpc::PollItem { @@ -241,7 +241,8 @@ impl K2VRpcHandler { .with_quorum(self.item_table.data.replication.read_quorum()) .without_timeout(), ); - let timeout_duration = Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout(); + let timeout_duration = + Duration::from_millis(timeout_msec) + self.system.rpc_helper().rpc_timeout(); let resps = select! { r = rpc => r?, _ = tokio::time::sleep(timeout_duration) => return Ok(None), @@ -300,7 +301,11 @@ impl K2VRpcHandler { let rs = RequestStrategy::with_priority(PRIO_NORMAL).without_timeout(); let mut requests = nodes .iter() - .map(|node| self.system.rpc.call(&self.endpoint, *node, msg.clone(), rs)) + .map(|node| { + self.system + .rpc_helper() + .call(&self.endpoint, *node, msg.clone(), rs) + }) .collect::>(); // Fetch responses. This procedure stops fetching responses when any of the following @@ -316,8 +321,9 @@ impl K2VRpcHandler { // kind: all items produced by that node until time ts have been returned, so we can // bump the entry in the global vector clock and possibly remove some item-specific // vector clocks) - let mut deadline = - Instant::now() + Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout(); + let mut deadline = Instant::now() + + Duration::from_millis(timeout_msec) + + self.system.rpc_helper().rpc_timeout(); let mut resps = vec![]; let mut errors = vec![]; loop { -- cgit v1.2.3 From bfb1845fdc981a370539d641a5d80f438f184f07 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 9 Nov 2023 14:12:05 +0100 Subject: layout: refactor to use a RwLock on LayoutHistory --- src/model/helper/bucket.rs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) (limited to 'src/model') diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs index 18904c8d..2a9c0fb1 100644 --- a/src/model/helper/bucket.rs +++ b/src/model/helper/bucket.rs @@ -450,10 +450,8 @@ impl<'a> BucketHelper<'a> { #[cfg(feature = "k2v")] { - use garage_rpc::layout::LayoutHistory; - use std::sync::Arc; - - let layout: Arc = self.0.system.cluster_layout().clone(); + // TODO: not only current + let node_id_vec = self.0.system.cluster_layout().current().node_ids().to_vec(); let k2vindexes = self .0 .k2v @@ -462,10 +460,7 @@ impl<'a> BucketHelper<'a> { .get_range( &bucket_id, None, - Some(( - DeletedFilter::NotDeleted, - layout.current().node_id_vec.clone(), - )), + Some((DeletedFilter::NotDeleted, node_id_vec)), 10, EnumerationOrder::Forward, ) -- cgit v1.2.3 From 8e292e06b3fde1d3b5b019a26eabd4f0d9ac22c3 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 14 Nov 2023 12:48:38 +0100 Subject: layout: some refactoring of nongateway nodes --- src/model/index_counter.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/model') diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 9637cc4c..2d968733 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -84,8 +84,8 @@ impl Entry for CounterEntry { impl CounterEntry { pub fn filtered_values(&self, layout: &LayoutHistory) -> HashMap { - let nodes = &layout.current().node_id_vec[..]; - self.filtered_values_with_nodes(nodes) + let nodes = layout.all_nongateway_nodes(); + self.filtered_values_with_nodes(&nodes) } pub fn filtered_values_with_nodes(&self, nodes: &[Uuid]) -> HashMap { -- cgit v1.2.3 From 1aab1f4e688ebc3f3adcb41c817c16c688a3291c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 14 Nov 2023 13:06:16 +0100 Subject: layout: refactoring of all_nodes --- src/model/helper/bucket.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'src/model') diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs index 2a9c0fb1..2cb53424 100644 --- a/src/model/helper/bucket.rs +++ b/src/model/helper/bucket.rs @@ -450,8 +450,12 @@ impl<'a> BucketHelper<'a> { #[cfg(feature = "k2v")] { - // TODO: not only current - let node_id_vec = self.0.system.cluster_layout().current().node_ids().to_vec(); + let node_id_vec = self + .0 + .system + .cluster_layout() + .all_nongateway_nodes() + .into_owned(); let k2vindexes = self .0 .k2v -- cgit v1.2.3 From 3b361d2959e3d577bdae6f8a5ccb0c9d5526b7ea Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 14 Nov 2023 14:28:16 +0100 Subject: layout: prepare for write sets --- src/model/k2v/rpc.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) (limited to 'src/model') diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index 2f548ad7..aa3323d5 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -127,7 +127,7 @@ impl K2VRpcHandler { .item_table .data .replication - .write_nodes(&partition.hash()); + .storage_nodes(&partition.hash()); who.sort(); self.system @@ -168,7 +168,7 @@ impl K2VRpcHandler { .item_table .data .replication - .write_nodes(&partition.hash()); + .storage_nodes(&partition.hash()); who.sort(); call_list.entry(who).or_default().push(InsertedItem { @@ -223,11 +223,12 @@ impl K2VRpcHandler { }, sort_key, }; + // TODO figure this out with write sets, does it still work???? let nodes = self .item_table .data .replication - .write_nodes(&poll_key.partition.hash()); + .read_nodes(&poll_key.partition.hash()); let rpc = self.system.rpc_helper().try_call_many( &self.endpoint, @@ -284,11 +285,12 @@ impl K2VRpcHandler { seen.restrict(&range); // Prepare PollRange RPC to send to the storage nodes responsible for the parititon + // TODO figure this out with write sets, does it still work???? let nodes = self .item_table .data .replication - .write_nodes(&range.partition.hash()); + .read_nodes(&range.partition.hash()); let quorum = self.item_table.data.replication.read_quorum(); let msg = K2VRpc::PollRange { range, -- cgit v1.2.3 From 90e1619b1e9f5d81e59da371f04717f0c4fe5afc Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 14 Nov 2023 15:40:46 +0100 Subject: table: take into account multiple write sets in inserts --- src/model/k2v/rpc.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) (limited to 'src/model') diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index aa3323d5..863a068a 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -134,16 +134,14 @@ impl K2VRpcHandler { .rpc_helper() .try_call_many( &self.endpoint, - &who[..], + &who, K2VRpc::InsertItem(InsertedItem { partition, sort_key, causal_context, value, }), - RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(1) - .interrupt_after_quorum(true), + RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(1), ) .await?; @@ -192,9 +190,7 @@ impl K2VRpcHandler { &self.endpoint, &nodes[..], K2VRpc::InsertManyItems(items), - RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(1) - .interrupt_after_quorum(true), + RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(1), ) .await?; Ok::<_, Error>((nodes, resp)) @@ -223,7 +219,7 @@ impl K2VRpcHandler { }, sort_key, }; - // TODO figure this out with write sets, does it still work???? + // TODO figure this out with write sets, is it still appropriate??? let nodes = self .item_table .data @@ -232,7 +228,7 @@ impl K2VRpcHandler { let rpc = self.system.rpc_helper().try_call_many( &self.endpoint, - &nodes[..], + &nodes, K2VRpc::PollItem { key: poll_key, causal_context, @@ -240,6 +236,7 @@ impl K2VRpcHandler { }, RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(self.item_table.data.replication.read_quorum()) + .send_all_at_once(true) .without_timeout(), ); let timeout_duration = -- cgit v1.2.3 From acd49de9f97bd27409232691262bd5827983388d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 15 Nov 2023 13:07:42 +0100 Subject: rpc: fix write set quorums --- src/model/k2v/rpc.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/model') diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index 863a068a..3c759181 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -344,7 +344,7 @@ impl K2VRpcHandler { } if errors.len() > nodes.len() - quorum { let errors = errors.iter().map(|e| format!("{}", e)).collect::>(); - return Err(Error::Quorum(quorum, resps.len(), nodes.len(), errors).into()); + return Err(Error::Quorum(quorum, None, resps.len(), nodes.len(), errors).into()); } // Take all returned items into account to produce the response. -- cgit v1.2.3 From 393c4d4515e0cdadadc8de8ae2df12e4371cff88 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 15 Nov 2023 14:20:50 +0100 Subject: layout: add helper for cached/external values to centralize recomputation --- src/model/helper/bucket.rs | 2 +- src/model/index_counter.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'src/model') diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs index 2cb53424..efa3e27b 100644 --- a/src/model/helper/bucket.rs +++ b/src/model/helper/bucket.rs @@ -455,7 +455,7 @@ impl<'a> BucketHelper<'a> { .system .cluster_layout() .all_nongateway_nodes() - .into_owned(); + .to_vec(); let k2vindexes = self .0 .k2v diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 2d968733..e8702bf1 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; use garage_db as db; -use garage_rpc::layout::LayoutHistory; +use garage_rpc::layout::LayoutHelper; use garage_rpc::system::System; use garage_util::background::BackgroundRunner; use garage_util::data::*; @@ -83,7 +83,7 @@ impl Entry for CounterEntry { } impl CounterEntry { - pub fn filtered_values(&self, layout: &LayoutHistory) -> HashMap { + pub fn filtered_values(&self, layout: &LayoutHelper) -> HashMap { let nodes = layout.all_nongateway_nodes(); self.filtered_values_with_nodes(&nodes) } -- cgit v1.2.3