aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/model/k2v/causality.rs2
-rw-r--r--src/model/k2v/item_table.rs8
-rw-r--r--src/rpc/rpc_helper.rs30
3 files changed, 26 insertions, 14 deletions
diff --git a/src/model/k2v/causality.rs b/src/model/k2v/causality.rs
index 8c76a32b..9a692870 100644
--- a/src/model/k2v/causality.rs
+++ b/src/model/k2v/causality.rs
@@ -15,7 +15,7 @@ pub fn make_node_id(node_id: Uuid) -> K2VNodeId {
u64::from_be_bytes(tmp)
}
-#[derive(PartialEq, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)]
pub struct CausalContext {
pub vector_clock: BTreeMap<K2VNodeId, u64>,
}
diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs
index baa1db4b..7860cb17 100644
--- a/src/model/k2v/item_table.rs
+++ b/src/model/k2v/item_table.rs
@@ -17,7 +17,7 @@ pub const CONFLICTS: &str = "conflicts";
pub const VALUES: &str = "values";
pub const BYTES: &str = "bytes";
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct K2VItem {
pub partition: K2VItemPartition,
pub sort_key: String,
@@ -25,19 +25,19 @@ pub struct K2VItem {
items: BTreeMap<K2VNodeId, DvvsEntry>,
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize, Hash, Eq)]
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Hash)]
pub struct K2VItemPartition {
pub bucket_id: Uuid,
pub partition_key: String,
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
struct DvvsEntry {
t_discard: u64,
values: Vec<(u64, DvvsValue)>,
}
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
+#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum DvvsValue {
Value(#[serde(with = "serde_bytes")] Vec<u8>),
Deleted,
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 857ed620..949aced6 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -44,8 +44,15 @@ pub struct RequestStrategy {
pub rs_interrupt_after_quorum: bool,
/// Request priority
pub rs_priority: RequestPriority,
- /// Deactivate timeout for this request
- pub rs_no_timeout: bool,
+ /// Custom timeout for this request
+ rs_timeout: Timeout,
+}
+
+#[derive(Copy, Clone)]
+enum Timeout {
+ None,
+ Default,
+ Custom(Duration),
}
impl RequestStrategy {
@@ -55,7 +62,7 @@ impl RequestStrategy {
rs_quorum: None,
rs_interrupt_after_quorum: false,
rs_priority: prio,
- rs_no_timeout: false,
+ rs_timeout: Timeout::Default,
}
}
/// Set quorum to be reached for request
@@ -71,7 +78,12 @@ impl RequestStrategy {
}
/// Deactivate timeout for this request
pub fn without_timeout(mut self) -> Self {
- self.rs_no_timeout = true;
+ self.rs_timeout = Timeout::None;
+ self
+ }
+ /// Set custom timeout for this request
+ pub fn with_custom_timeout(mut self, timeout: Duration) -> Self {
+ self.rs_timeout = Timeout::Custom(timeout);
self
}
}
@@ -138,10 +150,10 @@ impl RpcHelper {
.record_duration(&self.0.metrics.rpc_duration, &metric_tags);
let timeout = async {
- if strat.rs_no_timeout {
- futures::future::pending().await
- } else {
- tokio::time::sleep(self.0.rpc_timeout).await
+ match strat.rs_timeout {
+ Timeout::None => futures::future::pending().await,
+ Timeout::Default => tokio::time::sleep(self.0.rpc_timeout).await,
+ Timeout::Custom(t) => tokio::time::sleep(t).await,
}
};
@@ -412,7 +424,7 @@ impl RpcHelper {
.iter()
.find(|x| x.id.as_ref() == to.as_slice())
.and_then(|pi| pi.avg_ping)
- .unwrap_or_else(|| Duration::from_secs(1));
+ .unwrap_or_else(|| Duration::from_secs(10));
(
*to != self.0.our_node_id,
peer_zone != our_zone,