diff options
Diffstat (limited to 'src/model')
-rw-r--r-- | src/model/Cargo.toml | 2 | ||||
-rw-r--r-- | src/model/helper/bucket.rs | 12 | ||||
-rw-r--r-- | src/model/index_counter.rs | 8 | ||||
-rw-r--r-- | src/model/k2v/rpc.rs | 45 |
4 files changed, 37 insertions, 30 deletions
diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 2e5b047d..33898e20 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "garage_model" -version = "0.9.3" +version = "0.10.0" authors = ["Alex Auvolat <alex@adnab.me>"] edition = "2018" license = "AGPL-3.0" diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs index f4e669c3..a712d683 100644 --- a/src/model/helper/bucket.rs +++ b/src/model/helper/bucket.rs @@ -112,10 +112,12 @@ impl<'a> BucketHelper<'a> { #[cfg(feature = "k2v")] { - use garage_rpc::ring::Ring; - use std::sync::Arc; - - let ring: Arc<Ring> = self.0.system.ring.borrow().clone(); + let node_id_vec = self + .0 + .system + .cluster_layout() + .all_nongateway_nodes() + .to_vec(); let k2vindexes = self .0 .k2v @@ -124,7 +126,7 @@ impl<'a> BucketHelper<'a> { .get_range( &bucket_id, None, - Some((DeletedFilter::NotDeleted, ring.layout.node_id_vec.clone())), + Some((DeletedFilter::NotDeleted, node_id_vec)), 10, EnumerationOrder::Forward, ) diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index c0bf38d8..aa13ee7b 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; use garage_db as db; -use garage_rpc::ring::Ring; +use garage_rpc::layout::LayoutHelper; use garage_rpc::system::System; use garage_util::background::BackgroundRunner; use garage_util::data::*; @@ -83,9 +83,9 @@ impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> { } impl<T: CountedItem> CounterEntry<T> { - pub fn filtered_values(&self, ring: &Ring) -> HashMap<String, i64> { - let nodes = &ring.layout.node_id_vec[..]; - self.filtered_values_with_nodes(nodes) + pub fn filtered_values(&self, layout: &LayoutHelper) -> HashMap<String, i64> { + let nodes = layout.all_nongateway_nodes(); + self.filtered_values_with_nodes(&nodes) } pub fn filtered_values_with_nodes(&self, nodes: &[Uuid]) -> HashMap<String, i64> { diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index 4ab44c22..e15f2df8 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -127,23 +127,21 @@ impl K2VRpcHandler { .item_table .data .replication - .write_nodes(&partition.hash()); + .storage_nodes(&partition.hash()); who.sort(); self.system - .rpc + .rpc_helper() .try_call_many( &self.endpoint, - &who[..], + &who, K2VRpc::InsertItem(InsertedItem { partition, sort_key, causal_context, value, }), - RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(1) - .interrupt_after_quorum(true), + RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(1), ) .await?; @@ -168,7 +166,7 @@ impl K2VRpcHandler { .item_table .data .replication - .write_nodes(&partition.hash()); + .storage_nodes(&partition.hash()); who.sort(); call_list.entry(who).or_default().push(InsertedItem { @@ -187,14 +185,12 @@ impl K2VRpcHandler { let call_futures = call_list.into_iter().map(|(nodes, items)| async move { let resp = self .system - .rpc + .rpc_helper() .try_call_many( &self.endpoint, &nodes[..], K2VRpc::InsertManyItems(items), - RequestStrategy::with_priority(PRIO_NORMAL) - .with_quorum(1) - .interrupt_after_quorum(true), + RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(1), ) .await?; Ok::<_, Error>((nodes, resp)) @@ -223,15 +219,16 @@ impl K2VRpcHandler { }, sort_key, }; + // TODO figure this out with write sets, is it still appropriate??? let nodes = self .item_table .data .replication - .write_nodes(&poll_key.partition.hash()); + .read_nodes(&poll_key.partition.hash()); - let rpc = self.system.rpc.try_call_many( + let rpc = self.system.rpc_helper().try_call_many( &self.endpoint, - &nodes[..], + &nodes, K2VRpc::PollItem { key: poll_key, causal_context, @@ -239,9 +236,11 @@ impl K2VRpcHandler { }, RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(self.item_table.data.replication.read_quorum()) + .send_all_at_once(true) .without_timeout(), ); - let timeout_duration = Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout(); + let timeout_duration = + Duration::from_millis(timeout_msec) + self.system.rpc_helper().rpc_timeout(); let resps = select! { r = rpc => r?, _ = tokio::time::sleep(timeout_duration) => return Ok(None), @@ -283,11 +282,12 @@ impl K2VRpcHandler { seen.restrict(&range); // Prepare PollRange RPC to send to the storage nodes responsible for the parititon + // TODO figure this out with write sets, does it still work???? let nodes = self .item_table .data .replication - .write_nodes(&range.partition.hash()); + .read_nodes(&range.partition.hash()); let quorum = self.item_table.data.replication.read_quorum(); let msg = K2VRpc::PollRange { range, @@ -300,7 +300,11 @@ impl K2VRpcHandler { let rs = RequestStrategy::with_priority(PRIO_NORMAL).without_timeout(); let mut requests = nodes .iter() - .map(|node| self.system.rpc.call(&self.endpoint, *node, msg.clone(), rs)) + .map(|node| { + self.system + .rpc_helper() + .call(&self.endpoint, *node, msg.clone(), rs) + }) .collect::<FuturesUnordered<_>>(); // Fetch responses. This procedure stops fetching responses when any of the following @@ -316,8 +320,9 @@ impl K2VRpcHandler { // kind: all items produced by that node until time ts have been returned, so we can // bump the entry in the global vector clock and possibly remove some item-specific // vector clocks) - let mut deadline = - Instant::now() + Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout(); + let mut deadline = Instant::now() + + Duration::from_millis(timeout_msec) + + self.system.rpc_helper().rpc_timeout(); let mut resps = vec![]; let mut errors = vec![]; loop { @@ -339,7 +344,7 @@ impl K2VRpcHandler { } if errors.len() > nodes.len() - quorum { let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>(); - return Err(Error::Quorum(quorum, resps.len(), nodes.len(), errors).into()); + return Err(Error::Quorum(quorum, None, resps.len(), nodes.len(), errors).into()); } // Take all returned items into account to produce the response. |