aboutsummaryrefslogtreecommitdiff
path: root/src/model
diff options
context:
space:
mode:
Diffstat (limited to 'src/model')
-rw-r--r--src/model/helper/bucket.rs12
-rw-r--r--src/model/index_counter.rs8
-rw-r--r--src/model/k2v/rpc.rs45
3 files changed, 36 insertions, 29 deletions
diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs
index 576d03f3..efa3e27b 100644
--- a/src/model/helper/bucket.rs
+++ b/src/model/helper/bucket.rs
@@ -450,10 +450,12 @@ impl<'a> BucketHelper<'a> {
#[cfg(feature = "k2v")]
{
- use garage_rpc::ring::Ring;
- use std::sync::Arc;
-
- let ring: Arc<Ring> = self.0.system.ring.borrow().clone();
+ let node_id_vec = self
+ .0
+ .system
+ .cluster_layout()
+ .all_nongateway_nodes()
+ .to_vec();
let k2vindexes = self
.0
.k2v
@@ -462,7 +464,7 @@ impl<'a> BucketHelper<'a> {
.get_range(
&bucket_id,
None,
- Some((DeletedFilter::NotDeleted, ring.layout.node_id_vec.clone())),
+ Some((DeletedFilter::NotDeleted, node_id_vec)),
10,
EnumerationOrder::Forward,
)
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index a46c165f..e8702bf1 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
use garage_db as db;
-use garage_rpc::ring::Ring;
+use garage_rpc::layout::LayoutHelper;
use garage_rpc::system::System;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
@@ -83,9 +83,9 @@ impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> {
}
impl<T: CountedItem> CounterEntry<T> {
- pub fn filtered_values(&self, ring: &Ring) -> HashMap<String, i64> {
- let nodes = &ring.layout.node_id_vec[..];
- self.filtered_values_with_nodes(nodes)
+ pub fn filtered_values(&self, layout: &LayoutHelper) -> HashMap<String, i64> {
+ let nodes = layout.all_nongateway_nodes();
+ self.filtered_values_with_nodes(&nodes)
}
pub fn filtered_values_with_nodes(&self, nodes: &[Uuid]) -> HashMap<String, i64> {
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index 37e142f6..3c759181 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -127,23 +127,21 @@ impl K2VRpcHandler {
.item_table
.data
.replication
- .write_nodes(&partition.hash());
+ .storage_nodes(&partition.hash());
who.sort();
self.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
- &who[..],
+ &who,
K2VRpc::InsertItem(InsertedItem {
partition,
sort_key,
causal_context,
value,
}),
- RequestStrategy::with_priority(PRIO_NORMAL)
- .with_quorum(1)
- .interrupt_after_quorum(true),
+ RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(1),
)
.await?;
@@ -168,7 +166,7 @@ impl K2VRpcHandler {
.item_table
.data
.replication
- .write_nodes(&partition.hash());
+ .storage_nodes(&partition.hash());
who.sort();
call_list.entry(who).or_default().push(InsertedItem {
@@ -187,14 +185,12 @@ impl K2VRpcHandler {
let call_futures = call_list.into_iter().map(|(nodes, items)| async move {
let resp = self
.system
- .rpc
+ .rpc_helper()
.try_call_many(
&self.endpoint,
&nodes[..],
K2VRpc::InsertManyItems(items),
- RequestStrategy::with_priority(PRIO_NORMAL)
- .with_quorum(1)
- .interrupt_after_quorum(true),
+ RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(1),
)
.await?;
Ok::<_, Error>((nodes, resp))
@@ -223,15 +219,16 @@ impl K2VRpcHandler {
},
sort_key,
};
+ // TODO figure this out with write sets, is it still appropriate???
let nodes = self
.item_table
.data
.replication
- .write_nodes(&poll_key.partition.hash());
+ .read_nodes(&poll_key.partition.hash());
- let rpc = self.system.rpc.try_call_many(
+ let rpc = self.system.rpc_helper().try_call_many(
&self.endpoint,
- &nodes[..],
+ &nodes,
K2VRpc::PollItem {
key: poll_key,
causal_context,
@@ -239,9 +236,11 @@ impl K2VRpcHandler {
},
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.item_table.data.replication.read_quorum())
+ .send_all_at_once(true)
.without_timeout(),
);
- let timeout_duration = Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout();
+ let timeout_duration =
+ Duration::from_millis(timeout_msec) + self.system.rpc_helper().rpc_timeout();
let resps = select! {
r = rpc => r?,
_ = tokio::time::sleep(timeout_duration) => return Ok(None),
@@ -283,11 +282,12 @@ impl K2VRpcHandler {
seen.restrict(&range);
// Prepare PollRange RPC to send to the storage nodes responsible for the parititon
+ // TODO figure this out with write sets, does it still work????
let nodes = self
.item_table
.data
.replication
- .write_nodes(&range.partition.hash());
+ .read_nodes(&range.partition.hash());
let quorum = self.item_table.data.replication.read_quorum();
let msg = K2VRpc::PollRange {
range,
@@ -300,7 +300,11 @@ impl K2VRpcHandler {
let rs = RequestStrategy::with_priority(PRIO_NORMAL).without_timeout();
let mut requests = nodes
.iter()
- .map(|node| self.system.rpc.call(&self.endpoint, *node, msg.clone(), rs))
+ .map(|node| {
+ self.system
+ .rpc_helper()
+ .call(&self.endpoint, *node, msg.clone(), rs)
+ })
.collect::<FuturesUnordered<_>>();
// Fetch responses. This procedure stops fetching responses when any of the following
@@ -316,8 +320,9 @@ impl K2VRpcHandler {
// kind: all items produced by that node until time ts have been returned, so we can
// bump the entry in the global vector clock and possibly remove some item-specific
// vector clocks)
- let mut deadline =
- Instant::now() + Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout();
+ let mut deadline = Instant::now()
+ + Duration::from_millis(timeout_msec)
+ + self.system.rpc_helper().rpc_timeout();
let mut resps = vec![];
let mut errors = vec![];
loop {
@@ -339,7 +344,7 @@ impl K2VRpcHandler {
}
if errors.len() > nodes.len() - quorum {
let errors = errors.iter().map(|e| format!("{}", e)).collect::<Vec<_>>();
- return Err(Error::Quorum(quorum, resps.len(), nodes.len(), errors).into());
+ return Err(Error::Quorum(quorum, None, resps.len(), nodes.len(), errors).into());
}
// Take all returned items into account to produce the response.