aboutsummaryrefslogtreecommitdiff
path: root/src/rpc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc')
-rw-r--r--src/rpc/layout/helper.rs43
-rw-r--r--src/rpc/layout/manager.rs2
-rw-r--r--src/rpc/layout/mod.rs2
-rw-r--r--src/rpc/rpc_helper.rs2
-rw-r--r--src/rpc/system.rs4
5 files changed, 30 insertions, 23 deletions
diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs
index 7e5d37e9..9fb738ea 100644
--- a/src/rpc/layout/helper.rs
+++ b/src/rpc/layout/helper.rs
@@ -10,7 +10,7 @@ use super::*;
use crate::replication_mode::ReplicationMode;
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
-pub struct LayoutDigest {
+pub struct RpcLayoutDigest {
/// Cluster layout version
pub current_version: u64,
/// Number of active layout versions
@@ -21,6 +21,13 @@ pub struct LayoutDigest {
pub staging_hash: Hash,
}
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]
+pub struct SyncLayoutDigest {
+ current: u64,
+ ack_map_min: u64,
+ min_stored: u64,
+}
+
pub struct LayoutHelper {
replication_mode: ReplicationMode,
layout: Option<LayoutHistory>,
@@ -150,20 +157,20 @@ impl LayoutHelper {
&self.all_nongateway_nodes
}
- pub fn all_ack(&self) -> u64 {
+ pub fn ack_map_min(&self) -> u64 {
self.ack_map_min
}
- pub fn all_sync(&self) -> u64 {
+ pub fn sync_map_min(&self) -> u64 {
self.sync_map_min
}
- pub fn sync_versions(&self) -> (u64, u64, u64) {
- (
- self.layout().current().version,
- self.all_ack(),
- self.layout().min_stored(),
- )
+ pub fn sync_digest(&self) -> SyncLayoutDigest {
+ SyncLayoutDigest {
+ current: self.layout().current().version,
+ ack_map_min: self.ack_map_min(),
+ min_stored: self.layout().min_stored(),
+ }
}
pub fn read_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
@@ -206,8 +213,8 @@ impl LayoutHelper {
self.staging_hash
}
- pub fn digest(&self) -> LayoutDigest {
- LayoutDigest {
+ pub fn digest(&self) -> RpcLayoutDigest {
+ RpcLayoutDigest {
current_version: self.current().version,
active_versions: self.versions.len(),
trackers_hash: self.trackers_hash,
@@ -231,13 +238,13 @@ impl LayoutHelper {
// 3. Acknowledge everyone has synced up to min(self.sync_map)
self.sync_ack(local_node_id);
- info!("ack_map: {:?}", self.update_trackers.ack_map);
- info!("sync_map: {:?}", self.update_trackers.sync_map);
- info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map);
+ debug!("ack_map: {:?}", self.update_trackers.ack_map);
+ debug!("sync_map: {:?}", self.update_trackers.sync_map);
+ debug!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map);
}
fn sync_first(&mut self, local_node_id: Uuid) {
- let first_version = self.versions.first().as_ref().unwrap().version;
+ let first_version = self.min_stored();
self.update(|layout| {
layout
.update_trackers
@@ -275,13 +282,13 @@ impl LayoutHelper {
.versions
.iter()
.map(|x| x.version)
- .take_while(|v| {
+ .skip_while(|v| {
self.ack_lock
.get(v)
.map(|x| x.load(Ordering::Relaxed) == 0)
.unwrap_or(true)
})
- .max()
- .unwrap_or(self.min_stored())
+ .next()
+ .unwrap_or(self.current().version)
}
}
diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs
index ec8a2a15..6747b79d 100644
--- a/src/rpc/layout/manager.rs
+++ b/src/rpc/layout/manager.rs
@@ -256,7 +256,7 @@ impl LayoutManager {
// ---- RPC HANDLERS ----
- pub(crate) fn handle_advertise_status(self: &Arc<Self>, from: Uuid, remote: &LayoutDigest) {
+ pub(crate) fn handle_advertise_status(self: &Arc<Self>, from: Uuid, remote: &RpcLayoutDigest) {
let local = self.layout().digest();
if remote.current_version > local.current_version
|| remote.active_versions != local.active_versions
diff --git a/src/rpc/layout/mod.rs b/src/rpc/layout/mod.rs
index 162e3c6e..33676c37 100644
--- a/src/rpc/layout/mod.rs
+++ b/src/rpc/layout/mod.rs
@@ -17,7 +17,7 @@ pub mod manager;
// ---- re-exports ----
-pub use helper::{LayoutDigest, LayoutHelper};
+pub use helper::{LayoutHelper, RpcLayoutDigest, SyncLayoutDigest};
pub use manager::WriteLock;
pub use version::*;
diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs
index 77a36ca1..ae3a19c4 100644
--- a/src/rpc/rpc_helper.rs
+++ b/src/rpc/rpc_helper.rs
@@ -502,7 +502,7 @@ impl RpcHelper {
.rev()
.chain(layout.old_versions.iter().rev());
for ver in ver_iter {
- if ver.version > layout.all_sync() {
+ if ver.version > layout.sync_map_min() {
continue;
}
let nodes = ver.nodes_of(position, ver.replication_factor);
diff --git a/src/rpc/system.rs b/src/rpc/system.rs
index 41d76177..83cc6816 100644
--- a/src/rpc/system.rs
+++ b/src/rpc/system.rs
@@ -34,7 +34,7 @@ use crate::consul::ConsulDiscovery;
#[cfg(feature = "kubernetes-discovery")]
use crate::kubernetes::*;
use crate::layout::{
- self, manager::LayoutManager, LayoutDigest, LayoutHelper, LayoutHistory, NodeRoleV,
+ self, manager::LayoutManager, LayoutHelper, LayoutHistory, NodeRoleV, RpcLayoutDigest,
};
use crate::replication_mode::*;
use crate::rpc_helper::*;
@@ -132,7 +132,7 @@ pub struct NodeStatus {
pub replication_factor: usize,
/// Cluster layout digest
- pub layout_digest: LayoutDigest,
+ pub layout_digest: RpcLayoutDigest,
/// Disk usage on partition containing metadata directory (tuple: `(avail, total)`)
#[serde(default)]