aboutsummaryrefslogtreecommitdiff
path: root/src/model
diff options
context:
space:
mode:
Diffstat (limited to 'src/model')
-rw-r--r--src/model/k2v/causality.rs2
-rw-r--r--src/model/k2v/item_table.rs8
-rw-r--r--src/model/k2v/rpc.rs36
3 files changed, 22 insertions, 24 deletions
diff --git a/src/model/k2v/causality.rs b/src/model/k2v/causality.rs
index 8c76a32b..9a692870 100644
--- a/src/model/k2v/causality.rs
+++ b/src/model/k2v/causality.rs
@@ -15,7 +15,7 @@ pub fn make_node_id(node_id: Uuid) -> K2VNodeId {
u64::from_be_bytes(tmp)
}
-#[derive(PartialEq, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)]
pub struct CausalContext {
pub vector_clock: BTreeMap<K2VNodeId, u64>,
}
diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs
index baa1db4b..7860cb17 100644
--- a/src/model/k2v/item_table.rs
+++ b/src/model/k2v/item_table.rs
@@ -17,7 +17,7 @@ pub const CONFLICTS: &str = "conflicts";
pub const VALUES: &str = "values";
pub const BYTES: &str = "bytes";
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct K2VItem {
pub partition: K2VItemPartition,
pub sort_key: String,
@@ -25,19 +25,19 @@ pub struct K2VItem {
items: BTreeMap<K2VNodeId, DvvsEntry>,
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize, Hash, Eq)]
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Hash)]
pub struct K2VItemPartition {
pub bucket_id: Uuid,
pub partition_key: String,
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
struct DvvsEntry {
t_discard: u64,
values: Vec<(u64, DvvsValue)>,
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum DvvsValue {
Value(#[serde(with = "serde_bytes")] Vec<u8>),
Deleted,
diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs
index 90101d0f..a74df277 100644
--- a/src/model/k2v/rpc.rs
+++ b/src/model/k2v/rpc.rs
@@ -23,7 +23,6 @@ use garage_rpc::system::System;
use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication};
-use garage_table::table::TABLE_RPC_TIMEOUT;
use garage_table::{PartitionKey, Table};
use crate::k2v::causality::*;
@@ -117,7 +116,6 @@ impl K2VRpcHandler {
}),
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(1)
- .with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
.await?;
@@ -169,7 +167,6 @@ impl K2VRpcHandler {
K2VRpc::InsertManyItems(items),
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(1)
- .with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
.await?;
@@ -205,22 +202,23 @@ impl K2VRpcHandler {
.replication
.write_nodes(&poll_key.partition.hash());
- let resps = self
- .system
- .rpc
- .try_call_many(
- &self.endpoint,
- &nodes[..],
- K2VRpc::PollItem {
- key: poll_key,
- causal_context,
- timeout_msec,
- },
- RequestStrategy::with_priority(PRIO_NORMAL)
- .with_quorum(self.item_table.data.replication.read_quorum())
- .with_timeout(Duration::from_millis(timeout_msec) + TABLE_RPC_TIMEOUT),
- )
- .await?;
+ let rpc = self.system.rpc.try_call_many(
+ &self.endpoint,
+ &nodes[..],
+ K2VRpc::PollItem {
+ key: poll_key,
+ causal_context,
+ timeout_msec,
+ },
+ RequestStrategy::with_priority(PRIO_NORMAL)
+ .with_quorum(self.item_table.data.replication.read_quorum())
+ .without_timeout(),
+ );
+ let timeout_duration = Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout();
+ let resps = select! {
+ r = rpc => r?,
+ _ = tokio::time::sleep(timeout_duration) => return Ok(None),
+ };
let mut resp: Option<K2VItem> = None;
for v in resps {