diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/rpc/layout/helper.rs | 43 | ||||
-rw-r--r-- | src/rpc/layout/manager.rs | 2 | ||||
-rw-r--r-- | src/rpc/layout/mod.rs | 2 | ||||
-rw-r--r-- | src/rpc/rpc_helper.rs | 2 | ||||
-rw-r--r-- | src/rpc/system.rs | 4 | ||||
-rw-r--r-- | src/table/replication/sharded.rs | 2 | ||||
-rw-r--r-- | src/table/sync.rs | 16 |
7 files changed, 38 insertions, 33 deletions
diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs index 7e5d37e9..9fb738ea 100644 --- a/src/rpc/layout/helper.rs +++ b/src/rpc/layout/helper.rs @@ -10,7 +10,7 @@ use super::*; use crate::replication_mode::ReplicationMode; #[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)] -pub struct LayoutDigest { +pub struct RpcLayoutDigest { /// Cluster layout version pub current_version: u64, /// Number of active layout versions @@ -21,6 +21,13 @@ pub struct LayoutDigest { pub staging_hash: Hash, } +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub struct SyncLayoutDigest { + current: u64, + ack_map_min: u64, + min_stored: u64, +} + pub struct LayoutHelper { replication_mode: ReplicationMode, layout: Option<LayoutHistory>, @@ -150,20 +157,20 @@ impl LayoutHelper { &self.all_nongateway_nodes } - pub fn all_ack(&self) -> u64 { + pub fn ack_map_min(&self) -> u64 { self.ack_map_min } - pub fn all_sync(&self) -> u64 { + pub fn sync_map_min(&self) -> u64 { self.sync_map_min } - pub fn sync_versions(&self) -> (u64, u64, u64) { - ( - self.layout().current().version, - self.all_ack(), - self.layout().min_stored(), - ) + pub fn sync_digest(&self) -> SyncLayoutDigest { + SyncLayoutDigest { + current: self.layout().current().version, + ack_map_min: self.ack_map_min(), + min_stored: self.layout().min_stored(), + } } pub fn read_nodes_of(&self, position: &Hash) -> Vec<Uuid> { @@ -206,8 +213,8 @@ impl LayoutHelper { self.staging_hash } - pub fn digest(&self) -> LayoutDigest { - LayoutDigest { + pub fn digest(&self) -> RpcLayoutDigest { + RpcLayoutDigest { current_version: self.current().version, active_versions: self.versions.len(), trackers_hash: self.trackers_hash, @@ -231,13 +238,13 @@ impl LayoutHelper { // 3. Acknowledge everyone has synced up to min(self.sync_map) self.sync_ack(local_node_id); - info!("ack_map: {:?}", self.update_trackers.ack_map); - info!("sync_map: {:?}", self.update_trackers.sync_map); - info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map); + debug!("ack_map: {:?}", self.update_trackers.ack_map); + debug!("sync_map: {:?}", self.update_trackers.sync_map); + debug!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map); } fn sync_first(&mut self, local_node_id: Uuid) { - let first_version = self.versions.first().as_ref().unwrap().version; + let first_version = self.min_stored(); self.update(|layout| { layout .update_trackers @@ -275,13 +282,13 @@ impl LayoutHelper { .versions .iter() .map(|x| x.version) - .take_while(|v| { + .skip_while(|v| { self.ack_lock .get(v) .map(|x| x.load(Ordering::Relaxed) == 0) .unwrap_or(true) }) - .max() - .unwrap_or(self.min_stored()) + .next() + .unwrap_or(self.current().version) } } diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index ec8a2a15..6747b79d 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -256,7 +256,7 @@ impl LayoutManager { // ---- RPC HANDLERS ---- - pub(crate) fn handle_advertise_status(self: &Arc<Self>, from: Uuid, remote: &LayoutDigest) { + pub(crate) fn handle_advertise_status(self: &Arc<Self>, from: Uuid, remote: &RpcLayoutDigest) { let local = self.layout().digest(); if remote.current_version > local.current_version || remote.active_versions != local.active_versions diff --git a/src/rpc/layout/mod.rs b/src/rpc/layout/mod.rs index 162e3c6e..33676c37 100644 --- a/src/rpc/layout/mod.rs +++ b/src/rpc/layout/mod.rs @@ -17,7 +17,7 @@ pub mod manager; // ---- re-exports ---- -pub use helper::{LayoutDigest, LayoutHelper}; +pub use helper::{LayoutHelper, RpcLayoutDigest, SyncLayoutDigest}; pub use manager::WriteLock; pub use version::*; diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 77a36ca1..ae3a19c4 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -502,7 +502,7 @@ impl RpcHelper { .rev() .chain(layout.old_versions.iter().rev()); for ver in ver_iter { - if ver.version > layout.all_sync() { + if ver.version > layout.sync_map_min() { continue; } let nodes = ver.nodes_of(position, ver.replication_factor); diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 41d76177..83cc6816 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -34,7 +34,7 @@ use crate::consul::ConsulDiscovery; #[cfg(feature = "kubernetes-discovery")] use crate::kubernetes::*; use crate::layout::{ - self, manager::LayoutManager, LayoutDigest, LayoutHelper, LayoutHistory, NodeRoleV, + self, manager::LayoutManager, LayoutHelper, LayoutHistory, NodeRoleV, RpcLayoutDigest, }; use crate::replication_mode::*; use crate::rpc_helper::*; @@ -132,7 +132,7 @@ pub struct NodeStatus { pub replication_factor: usize, /// Cluster layout digest - pub layout_digest: LayoutDigest, + pub layout_digest: RpcLayoutDigest, /// Disk usage on partition containing metadata directory (tuple: `(avail, total)`) #[serde(default)] diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index 55d0029d..8ba3700f 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -54,7 +54,7 @@ impl TableReplication for TableShardedReplication { fn sync_partitions(&self) -> SyncPartitions { let layout = self.system.cluster_layout(); - let layout_version = layout.all_ack(); + let layout_version = layout.ack_map_min(); let mut partitions = layout .current() diff --git a/src/table/sync.rs b/src/table/sync.rs index 1561a2e5..cd080df0 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -83,7 +83,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> { bg.spawn_worker(SyncWorker { syncer: self.clone(), layout_notify: self.system.layout_notify(), - layout_versions: self.system.cluster_layout().sync_versions(), + layout_digest: self.system.cluster_layout().sync_digest(), add_full_sync_rx, todo: None, next_full_sync: Instant::now() + Duration::from_secs(20), @@ -483,7 +483,7 @@ struct SyncWorker<F: TableSchema, R: TableReplication> { syncer: Arc<TableSyncer<F, R>>, layout_notify: Arc<Notify>, - layout_versions: (u64, u64, u64), + layout_digest: SyncLayoutDigest, add_full_sync_rx: mpsc::UnboundedReceiver<()>, next_full_sync: Instant, @@ -493,15 +493,13 @@ struct SyncWorker<F: TableSchema, R: TableReplication> { impl<F: TableSchema, R: TableReplication> SyncWorker<F, R> { fn check_add_full_sync(&mut self) { - let layout_versions = self.syncer.system.cluster_layout().sync_versions(); - if layout_versions != self.layout_versions { - self.layout_versions = layout_versions; + let layout_digest = self.syncer.system.cluster_layout().sync_digest(); + if layout_digest != self.layout_digest { + self.layout_digest = layout_digest; info!( - "({}) Layout versions changed (max={}, ack={}, min stored={}), adding full sync to syncer todo list", + "({}) Layout versions changed ({:?}), adding full sync to syncer todo list", F::TABLE_NAME, - layout_versions.0, - layout_versions.1, - layout_versions.2 + layout_digest, ); self.add_full_sync(); } |